This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d39e550258 [VL] Make VeloxResizeBatchesExec not inherit
ColumnarToColumnarTransition (#11072)
d39e550258 is described below
commit d39e5502584d7135eb3e7296765b1bc08980a4df
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Nov 13 17:13:18 2025 +0000
[VL] Make VeloxResizeBatchesExec not inherit ColumnarToColumnarTransition
(#11072)
---
.../ArrowColumnarToVeloxColumnarExec.scala | 9 ++++++-
.../gluten/execution/VeloxResizeBatchesExec.scala | 7 +++++-
.../gluten/execution/LoadArrowDataExec.scala | 9 ++++++-
.../gluten/execution/OffloadArrowDataExec.scala | 9 ++++++-
.../gluten/execution/ColumnarToColumnarExec.scala | 24 ++++---------------
.../execution/ColumnarToColumnarTransition.scala | 9 ++++---
.../GlutenColumnarToColumnarTransition.scala | 28 +++++++++++++---------
.../extension/columnar/transition/package.scala | 2 +-
.../columnar/transition/TransitionSuiteBase.scala | 9 +++----
9 files changed, 64 insertions(+), 42 deletions(-)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 9209886750..cca381ee84 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -19,12 +19,19 @@ package org.apache.gluten.execution
import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
import org.apache.gluten.backendsapi.velox.VeloxBatchType
import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
+ extends ColumnarToColumnarExec(child)
+ with GlutenColumnarToColumnarTransition {
+
+ override protected val from: Convention.BatchType = ArrowNativeBatchType
+
+ override protected val to: Convention.BatchType = VeloxBatchType
+
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index a1ec54ffbc..acca0c7b68 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.ClosableIterator
import org.apache.gluten.utils.VeloxBatchResizer
@@ -35,7 +36,7 @@ case class VeloxResizeBatchesExec(
override val child: SparkPlan,
minOutputBatchSize: Int,
maxOutputBatchSize: Int)
- extends ColumnarToColumnarExec(VeloxBatchType, VeloxBatchType) {
+ extends ColumnarToColumnarExec(child) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
VeloxBatchResizer.create(minOutputBatchSize, maxOutputBatchSize,
in.asJava).asScala
@@ -54,4 +55,8 @@ case class VeloxResizeBatchesExec(
override def outputOrdering: Seq[SortOrder] = child.outputOrdering
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
copy(child = newChild)
+
+ override def batchType(): Convention.BatchType = VeloxBatchType
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
index 1f1750113a..133d67b0f1 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
@@ -25,7 +26,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
/** Converts input data with batch type [[ArrowNativeBatchType]] to type
[[ArrowJavaBatchType]]. */
case class LoadArrowDataExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatchType, ArrowJavaBatchType) {
+ extends ColumnarToColumnarExec(child)
+ with GlutenColumnarToColumnarTransition {
+
+ override protected val from: Convention.BatchType = ArrowNativeBatchType
+
+ override protected val to: Convention.BatchType = ArrowJavaBatchType
+
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
index 6e548adbf6..256c178fdb 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
@@ -25,7 +26,13 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
/** Converts input data with batch type [[ArrowJavaBatchType]] to type
[[ArrowNativeBatchType]]. */
case class OffloadArrowDataExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowJavaBatchType, ArrowNativeBatchType) {
+ extends ColumnarToColumnarExec(child)
+ with GlutenColumnarToColumnarTransition {
+
+ override protected val from: Convention.BatchType = ArrowJavaBatchType
+
+ override protected val to: Convention.BatchType = ArrowNativeBatchType
+
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance,
b))
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index f19b898983..f825bc1f18 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -16,25 +16,20 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
import org.apache.gluten.iterator.Iterators
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.vectorized.ColumnarBatch
import java.util.concurrent.atomic.AtomicLong
-abstract class ColumnarToColumnarExec(from: Convention.BatchType, to:
Convention.BatchType)
- extends ColumnarToColumnarTransition
- with GlutenPlan {
-
- override def isSameConvention: Boolean = from == to
-
- def child: SparkPlan
+abstract class ColumnarToColumnarExec(override val child: SparkPlan)
+ extends GlutenPlan
+ with UnaryExecNode {
protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch]
@@ -51,17 +46,8 @@ abstract class ColumnarToColumnarExec(from:
Convention.BatchType, to: Convention
"selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to
convert batches")
)
- override def batchType(): Convention.BatchType = to
-
- override def rowType0(): Convention.RowType = {
- Convention.RowType.None
- }
-
- override def requiredChildConvention(): Seq[ConventionReq] = {
- List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
- }
-
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val numInputRows = longMetric("numInputRows")
val numInputBatches = longMetric("numInputBatches")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
index 8c4757b45c..8284db0f62 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarTransition.scala
@@ -18,6 +18,9 @@ package org.apache.gluten.execution
import org.apache.spark.sql.execution.UnaryExecNode
-trait ColumnarToColumnarTransition extends UnaryExecNode {
- def isSameConvention: Boolean
-}
+/**
+ * A columnar-to-columnar transition. By implementing this trait, the class
will be seen by
+ * [[org.apache.gluten.extension.columnar.transition.RemoveTransitions]] and
removed when that rule
+ * is executed.
+ */
+trait ColumnarToColumnarTransition extends UnaryExecNode
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
similarity index 51%
copy from
backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
copy to
gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
index 9209886750..d2f62f0bf9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenColumnarToColumnarTransition.scala
@@ -16,19 +16,25 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
-import org.apache.gluten.backendsapi.velox.VeloxBatchType
-import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.vectorized.ColumnarBatch
+/**
+ * A convenience trait for [[GlutenPlan]] to implement
[[ColumnarToColumnarTransition]] at the same
+ * time. Note the implementation class will be seen by
+ * [[org.apache.gluten.extension.columnar.transition.RemoveTransitions]] and
removed when that rule
+ * is executed.
+ */
+trait GlutenColumnarToColumnarTransition extends ColumnarToColumnarTransition
with GlutenPlan {
+ protected val from: Convention.BatchType
+ protected val to: Convention.BatchType
+
+ override def batchType(): Convention.BatchType = to
-case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
- override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
- in.map(b => VeloxColumnarBatches.toVeloxBatch(b))
+ override def rowType0(): Convention.RowType = {
+ Convention.RowType.None
}
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(from)))
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
index 9f309b843f..dbec061b15 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -69,7 +69,7 @@ package object transition {
object ColumnarToColumnarLike {
def unapply(plan: SparkPlan): Option[SparkPlan] = {
plan match {
- case c2c: ColumnarToColumnarTransition if !c2c.isSameConvention =>
+ case c2c: ColumnarToColumnarTransition =>
Some(c2c.child)
case _ => None
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index 3724008509..827881ebc9 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
+import org.apache.gluten.execution.{ColumnarToColumnarExec,
GlutenColumnarToColumnarTransition, GlutenPlan}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
@@ -146,10 +146,11 @@ object TransitionSuiteBase {
}
case class BatchToBatch(
- from: Convention.BatchType,
- to: Convention.BatchType,
+ override val from: Convention.BatchType,
+ override val to: Convention.BatchType,
override val child: SparkPlan)
- extends ColumnarToColumnarExec(from, to) {
+ extends ColumnarToColumnarExec(child)
+ with GlutenColumnarToColumnarTransition {
override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
copy(child = newChild)
override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]