This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d39e550258 [VL] Make VeloxResizeBatchesExec not inherit 
ColumnarToColumnarTransition (#11072)
d39e550258 is described below

commit d39e5502584d7135eb3e7296765b1bc08980a4df
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Nov 13 17:13:18 2025 +0000

    [VL] Make VeloxResizeBatchesExec not inherit ColumnarToColumnarTransition 
(#11072)
---
 .../ArrowColumnarToVeloxColumnarExec.scala         |  9 ++++++-
 .../gluten/execution/VeloxResizeBatchesExec.scala  |  7 +++++-
 .../gluten/execution/LoadArrowDataExec.scala       |  9 ++++++-
 .../gluten/execution/OffloadArrowDataExec.scala    |  9 ++++++-
 .../gluten/execution/ColumnarToColumnarExec.scala  | 24 ++++---------------
 .../execution/ColumnarToColumnarTransition.scala   |  9 ++++---
 .../GlutenColumnarToColumnarTransition.scala       | 28 +++++++++++++---------
 .../extension/columnar/transition/package.scala    |  2 +-
 .../columnar/transition/TransitionSuiteBase.scala  |  9 +++----
 9 files changed, 64 insertions(+), 42 deletions(-)

diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 9209886750..cca381ee84 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -19,12 +19,19 @@ package org.apache.gluten.execution
 import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
 import org.apache.gluten.backendsapi.velox.VeloxBatchType
 import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
+  extends ColumnarToColumnarExec(child)
+  with GlutenColumnarToColumnarTransition {
+
+  override protected val from: Convention.BatchType = ArrowNativeBatchType
+
+  override protected val to: Convention.BatchType = VeloxBatchType
+
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
   }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index a1ec54ffbc..acca0c7b68 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.iterator.ClosableIterator
 import org.apache.gluten.utils.VeloxBatchResizer
 
@@ -35,7 +36,7 @@ case class VeloxResizeBatchesExec(
     override val child: SparkPlan,
     minOutputBatchSize: Int,
     maxOutputBatchSize: Int)
-  extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) {
+  extends ColumnarToColumnarExec(child) {
 
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize, 
in.asJava).asScala
@@ -54,4 +55,8 @@ case class VeloxResizeBatchesExec(
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
     copy(child = newChild)
+
+  override def batchType(): Convention.BatchType = VeloxBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
 }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
index 1f1750113a..133d67b0f1 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
 
 import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
 import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
@@ -25,7 +26,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /** Converts input data with batch type [[ArrowNativeBatchType]] to type 
[[ArrowJavaBatchType]]. */
 case class LoadArrowDataExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatchType, ArrowJavaBatchType) {
+  extends ColumnarToColumnarExec(child)
+  with GlutenColumnarToColumnarTransition {
+
+  override protected val from: Convention.BatchType = ArrowNativeBatchType
+
+  override protected val to: Convention.BatchType = ArrowJavaBatchType
+
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
   }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
index 6e548adbf6..256c178fdb 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
 
 import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
 import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
@@ -25,7 +26,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 /** Converts input data with batch type [[ArrowJavaBatchType]] to type 
[[ArrowNativeBatchType]]. */
 case class OffloadArrowDataExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowJavaBatchType, ArrowNativeBatchType) {
+  extends ColumnarToColumnarExec(child)
+  with GlutenColumnarToColumnarTransition {
+
+  override protected val from: Convention.BatchType = ArrowJavaBatchType
+
+  override protected val to: Convention.BatchType = ArrowNativeBatchType
+
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance, 
b))
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index f19b898983..f825bc1f18 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -16,25 +16,20 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 import org.apache.gluten.iterator.Iterators
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import java.util.concurrent.atomic.AtomicLong
 
-abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: 
Convention.BatchType)
-  extends ColumnarToColumnarTransition
-  with GlutenPlan {
-
-  override def isSameConvention: Boolean = from == to
-
-  def child: SparkPlan
+abstract class ColumnarToColumnarExec(override val child: SparkPlan)
+  extends GlutenPlan
+  with UnaryExecNode {
 
   protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch]
 
@@ -51,17 +46,8 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
       "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
convert batches")
     )
 
-  override def batchType(): Convention.BatchType = to
-
-  override def rowType0(): Convention.RowType = {
-    Convention.RowType.None
-  }
-
-  override def requiredChildConvention(): Seq[ConventionReq] = {
-    List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
-  }
-
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numInputBatches = longMetric("numInputBatches")
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
index 8c4757b45c..8284db0f62 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
@@ -18,6 +18,9 @@ package org.apache.gluten.execution
 
 import org.apache.spark.sql.execution.UnaryExecNode
 
-trait ColumnarToColumnarTransition extends UnaryExecNode {
-  def isSameConvention: Boolean
-}
+/**
+ * A columnar-to-columnar transition. By implementing this trait, the class 
will be seen by
+ * [[org.apache.gluten.extension.columnar.transition.RemoveTransitions]] and 
removed when that rule
+ * is executed.
+ */
+trait ColumnarToColumnarTransition extends UnaryExecNode
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
similarity index 51%
copy from 
backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
copy to 
gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
index 9209886750..d2f62f0bf9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
@@ -16,19 +16,25 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
-import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.vectorized.ColumnarBatch
+/**
+ * A convenience trait for [[GlutenPlan]] to implement 
[[ColumnarToColumnarTransition]] at the same
+ * time. Note the implementation class will be seen by
+ * [[org.apache.gluten.extension.columnar.transition.RemoveTransitions]] and 
removed when that rule
+ * is executed.
+ */
+trait GlutenColumnarToColumnarTransition extends ColumnarToColumnarTransition 
with GlutenPlan {
+  protected val from: Convention.BatchType
+  protected val to: Convention.BatchType
+
+  override def batchType(): Convention.BatchType = to
 
-case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
-  override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
-    in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
+  override def rowType0(): Convention.RowType = {
+    Convention.RowType.None
   }
 
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 9f309b843f..dbec061b15 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -69,7 +69,7 @@ package object transition {
   object ColumnarToColumnarLike {
     def unapply(plan: SparkPlan): Option[SparkPlan] = {
       plan match {
-        case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention =>
+        case c2c: ColumnarToColumnarTransition =>
           Some(c2c.child)
         case _ => None
       }
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index 3724008509..827881ebc9 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
+import org.apache.gluten.execution.{ColumnarToColumnarExec, 
GlutenColumnarToColumnarTransition, GlutenPlan}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -146,10 +146,11 @@ object TransitionSuiteBase {
   }
 
   case class BatchToBatch(
-      from: Convention.BatchType,
-      to: Convention.BatchType,
+      override val from: Convention.BatchType,
+      override val to: Convention.BatchType,
       override val child: SparkPlan)
-    extends ColumnarToColumnarExec(from, to) {
+    extends ColumnarToColumnarExec(child)
+    with GlutenColumnarToColumnarTransition {
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to