This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a2deffc1eb [GLUTEN-6920][VL] Following #8036, append some code 
cleanups (#8058)
a2deffc1eb is described below

commit a2deffc1eb9ca1678076ae7e3807d85f63b81f40
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 27 16:54:37 2024 +0800

    [GLUTEN-6920][VL] Following #8036, append some code cleanups (#8058)
---
 .../api/python/ColumnarArrowEvalPythonExec.scala   |  6 ++---
 .../gluten/execution/ColumnarToColumnarExec.scala  |  5 ++--
 .../org/apache/gluten/execution/GlutenPlan.scala   | 27 +++++++++++++++++++---
 .../enumerated/planner/plan/GlutenPlanModel.scala  |  2 +-
 .../extension/columnar/transition/Convention.scala |  6 ++---
 .../columnar/transition/Transitions.scala          | 10 ++------
 .../gluten/execution/ColumnarToRowExecBase.scala   |  2 --
 .../ColumnarCollapseTransformStages.scala          |  2 +-
 8 files changed, 36 insertions(+), 24 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 548dec13be..4a5f43b7fd 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -21,7 +21,6 @@ import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.utils.PullOutProjectHelper
@@ -212,15 +211,14 @@ case class ColumnarArrowEvalPythonExec(
     child: SparkPlan,
     evalType: Int)
   extends EvalPythonExec
-  with GlutenPlan
-  with KnownChildConvention {
+  with GlutenPlan {
 
   override def batchType(): Convention.BatchType = ArrowJavaBatch
 
   override def rowType0(): Convention.RowType = Convention.RowType.None
 
   override def requiredChildConvention(): Seq[ConventionReq] = List(
-    ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(ArrowJavaBatch)))
+    ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index e27b6a4e2f..ac1f8d6835 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -50,8 +50,9 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
     Convention.RowType.None
   }
 
-  override def requiredChildConvention(): Seq[ConventionReq] = List(
-    ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(from)))
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
+  }
 
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
index 460326d8d9..2cd408f67c 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
@@ -21,10 +21,31 @@ import 
org.apache.gluten.extension.columnar.transition.{Convention, ConventionRe
 
 import org.apache.spark.sql.execution.SparkPlan
 
+/**
+ * Base interface for Query plan that defined by backends.
+ *
+ * The following Spark APIs are marked final so forbidden from overriding:
+ *   - supportsColumnar
+ *   - supportsRowBased (Spark version >= 3.3)
+ *
+ * Instead, subclasses are expected to implement the following APIs:
+ *   - batchType
+ *   - rowType0
+ *   - requiredChildConvention (optional)
+ *
+ * With implementations of the APIs provided, Gluten query planner will be 
able to find and insert
+ * proper transitions between different plan nodes.
+ *
+ * Implementing `requiredChildConvention` is optional while the default 
implementation is a sequence
+ * of convention reqs that are exactly the same with the output convention. If 
it's not the case for
+ * some plan types, then the API should be overridden. For example, a typical 
row-to-columnar
+ * transition is at the same time a query plan node that requires for row 
input however produces
+ * columnar output.
+ */
 trait GlutenPlan
   extends SparkPlan
   with Convention.KnownBatchType
-  with Convention.KnownRowTypeForSpark33AndLater
+  with Convention.KnownRowTypeForSpark33OrLater
   with GlutenPlan.SupportsRowBasedCompatible
   with ConventionReq.KnownChildConvention {
 
@@ -32,12 +53,12 @@ trait GlutenPlan
     batchType() != Convention.BatchType.None
   }
 
-  override def batchType(): Convention.BatchType
-
   final override val supportsRowBased: Boolean = {
     rowType() != Convention.RowType.None
   }
 
+  override def batchType(): Convention.BatchType
+
   override def rowType0(): Convention.RowType
 
   override def requiredChildConvention(): Seq[ConventionReq] = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index 20a238a47d..94059020ef 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -45,7 +45,7 @@ object GlutenPlanModel {
       constraintSet: PropertySet[SparkPlan])
     extends LeafExecNode
     with Convention.KnownBatchType
-    with Convention.KnownRowTypeForSpark33AndLater
+    with Convention.KnownRowTypeForSpark33OrLater
     with GlutenPlan.SupportsRowBasedCompatible {
     private val req: Conv.Req = 
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index b9fbe023b2..0e53875596 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -157,9 +157,9 @@ object Convention {
     def rowType(): RowType
   }
 
-  trait KnownRowTypeForSpark33AndLater extends KnownRowType {
+  trait KnownRowTypeForSpark33OrLater extends KnownRowType {
     this: SparkPlan =>
-    import KnownRowTypeForSpark33AndLater._
+    import KnownRowTypeForSpark33OrLater._
 
     final override def rowType(): RowType = {
       if (lteSpark32) {
@@ -180,7 +180,7 @@ object Convention {
     def rowType0(): RowType
   }
 
-  object KnownRowTypeForSpark33AndLater {
+  object KnownRowTypeForSpark33OrLater {
     private val lteSpark32: Boolean = {
       val v = SparkVersionUtil.majorMinorVersion()
       SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index cec09ee7a1..297485d844 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -90,17 +90,11 @@ object Transitions {
   }
 
   def toRowPlan(plan: SparkPlan): SparkPlan = {
-    enforceReq(
-      plan,
-      ConventionReq.of(
-        ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
-        ConventionReq.BatchType.Any))
+    enforceReq(plan, ConventionReq.row)
   }
 
   def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType): 
SparkPlan = {
-    enforceReq(
-      plan,
-      ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(toBatchType)))
+    enforceReq(plan, 
ConventionReq.ofBatch(ConventionReq.BatchType.Is(toBatchType)))
   }
 
   def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index 8bf1fbe447..5beaf49572 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -29,7 +28,6 @@ import 
org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
 
 abstract class ColumnarToRowExecBase(child: SparkPlan)
   extends ColumnarToRowTransition
-  with KnownChildConvention
   with ValidatablePlan {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 67895b439c..2bdc1cf4b3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -177,7 +177,7 @@ case class ColumnarCollapseTransformStages(
 case class ColumnarInputAdapter(child: SparkPlan)
   extends InputAdapterGenerateTreeStringShim
   with Convention.KnownBatchType
-  with Convention.KnownRowTypeForSpark33AndLater
+  with Convention.KnownRowTypeForSpark33OrLater
   with GlutenPlan.SupportsRowBasedCompatible
   with ConventionReq.KnownChildConvention {
   override def output: Seq[Attribute] = child.output


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to