This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a2deffc1eb [GLUTEN-6920][VL] Following #8036, append some code
cleanups (#8058)
a2deffc1eb is described below
commit a2deffc1eb9ca1678076ae7e3807d85f63b81f40
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 27 16:54:37 2024 +0800
[GLUTEN-6920][VL] Following #8036, append some code cleanups (#8058)
---
.../api/python/ColumnarArrowEvalPythonExec.scala | 6 ++---
.../gluten/execution/ColumnarToColumnarExec.scala | 5 ++--
.../org/apache/gluten/execution/GlutenPlan.scala | 27 +++++++++++++++++++---
.../enumerated/planner/plan/GlutenPlanModel.scala | 2 +-
.../extension/columnar/transition/Convention.scala | 6 ++---
.../columnar/transition/Transitions.scala | 10 ++------
.../gluten/execution/ColumnarToRowExecBase.scala | 2 --
.../ColumnarCollapseTransformStages.scala | 2 +-
8 files changed, 36 insertions(+), 24 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 548dec13be..4a5f43b7fd 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
@@ -212,15 +211,14 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
- with GlutenPlan
- with KnownChildConvention {
+ with GlutenPlan {
override def batchType(): Convention.BatchType = ArrowJavaBatch
override def rowType0(): Convention.RowType = Convention.RowType.None
override def requiredChildConvention(): Seq[ConventionReq] = List(
- ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(ArrowJavaBatch)))
+ ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index e27b6a4e2f..ac1f8d6835 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -50,8 +50,9 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
Convention.RowType.None
}
- override def requiredChildConvention(): Seq[ConventionReq] = List(
- ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(from)))
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
+ }
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
index 460326d8d9..2cd408f67c 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
@@ -21,10 +21,31 @@ import
org.apache.gluten.extension.columnar.transition.{Convention, ConventionRe
import org.apache.spark.sql.execution.SparkPlan
+/**
+ * Base interface for Query plan that defined by backends.
+ *
+ * The following Spark APIs are marked final so forbidden from overriding:
+ * - supportsColumnar
+ * - supportsRowBased (Spark version >= 3.3)
+ *
+ * Instead, subclasses are expected to implement the following APIs:
+ * - batchType
+ * - rowType0
+ * - requiredChildConvention (optional)
+ *
+ * With implementations of the APIs provided, Gluten query planner will be
able to find and insert
+ * proper transitions between different plan nodes.
+ *
+ * Implementing `requiredChildConvention` is optional while the default
implementation is a sequence
+ * of convention reqs that are exactly the same with the output convention. If
it's not the case for
+ * some plan types, then the API should be overridden. For example, a typical
row-to-columnar
+ * transition is at the same time a query plan node that requires for row
input however produces
+ * columnar output.
+ */
trait GlutenPlan
extends SparkPlan
with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater
+ with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible
with ConventionReq.KnownChildConvention {
@@ -32,12 +53,12 @@ trait GlutenPlan
batchType() != Convention.BatchType.None
}
- override def batchType(): Convention.BatchType
-
final override val supportsRowBased: Boolean = {
rowType() != Convention.RowType.None
}
+ override def batchType(): Convention.BatchType
+
override def rowType0(): Convention.RowType
override def requiredChildConvention(): Seq[ConventionReq] = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index 20a238a47d..94059020ef 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -45,7 +45,7 @@ object GlutenPlanModel {
constraintSet: PropertySet[SparkPlan])
extends LeafExecNode
with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater
+ with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible {
private val req: Conv.Req =
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index b9fbe023b2..0e53875596 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -157,9 +157,9 @@ object Convention {
def rowType(): RowType
}
- trait KnownRowTypeForSpark33AndLater extends KnownRowType {
+ trait KnownRowTypeForSpark33OrLater extends KnownRowType {
this: SparkPlan =>
- import KnownRowTypeForSpark33AndLater._
+ import KnownRowTypeForSpark33OrLater._
final override def rowType(): RowType = {
if (lteSpark32) {
@@ -180,7 +180,7 @@ object Convention {
def rowType0(): RowType
}
- object KnownRowTypeForSpark33AndLater {
+ object KnownRowTypeForSpark33OrLater {
private val lteSpark32: Boolean = {
val v = SparkVersionUtil.majorMinorVersion()
SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index cec09ee7a1..297485d844 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -90,17 +90,11 @@ object Transitions {
}
def toRowPlan(plan: SparkPlan): SparkPlan = {
- enforceReq(
- plan,
- ConventionReq.of(
- ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
- ConventionReq.BatchType.Any))
+ enforceReq(plan, ConventionReq.row)
}
def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType):
SparkPlan = {
- enforceReq(
- plan,
- ConventionReq.of(ConventionReq.RowType.Any,
ConventionReq.BatchType.Is(toBatchType)))
+ enforceReq(plan,
ConventionReq.ofBatch(ConventionReq.BatchType.Is(toBatchType)))
}
def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index 8bf1fbe447..5beaf49572 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
@@ -29,7 +28,6 @@ import
org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
abstract class ColumnarToRowExecBase(child: SparkPlan)
extends ColumnarToRowTransition
- with KnownChildConvention
with ValidatablePlan {
// Note: "metrics" is made transient to avoid sending driver-side metrics to
tasks.
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 67895b439c..2bdc1cf4b3 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -177,7 +177,7 @@ case class ColumnarCollapseTransformStages(
case class ColumnarInputAdapter(child: SparkPlan)
extends InputAdapterGenerateTreeStringShim
with Convention.KnownBatchType
- with Convention.KnownRowTypeForSpark33AndLater
+ with Convention.KnownRowTypeForSpark33OrLater
with GlutenPlan.SupportsRowBasedCompatible
with ConventionReq.KnownChildConvention {
override def output: Seq[Attribute] = child.output
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]