This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 6004a9e761 [GLUTEN-9666][VL] Add BatchCarrierRow in preparation for
replacement of FakeRow (#9708)
6004a9e761 is described below
commit 6004a9e76103b55b41017eebf27cd2a91dad6f1f
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed May 21 12:04:32 2025 +0100
[GLUTEN-9666][VL] Add BatchCarrierRow in preparation for replacement of
FakeRow (#9708)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 6 +-
.../backendsapi/clickhouse/CHListenerApi.scala | 4 +-
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 4 +-
.../{CHBatch.scala => CHBatchType.scala} | 8 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 7 +-
.../velox/VeloxBatchType.scala} | 16 ++-
.../backendsapi/velox/VeloxCarrierRowType.scala} | 12 +-
.../backendsapi/velox/VeloxListenerApi.scala | 10 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 9 +-
.../ArrowColumnarToVeloxColumnarExec.scala | 7 +-
.../gluten/execution/ColumnarRangeExec.scala | 4 +-
...c.scala => VeloxColumnarToCarrierRowExec.scala} | 20 +--
.../api/python/ColumnarArrowEvalPythonExec.scala | 6 +-
.../spark/sql/execution/BaseArrowScanExec.scala | 4 +-
.../columnar/transition/VeloxTransitionSuite.scala | 147 ++++++++++++---------
.../arrow/ArrowBatchTypes.scala} | 20 +--
.../gluten/execution/LoadArrowDataExec.scala | 6 +-
.../gluten/execution/OffloadArrowDataExec.scala | 6 +-
.../enumerated/planner/plan/GlutenPlanModel.scala | 2 +-
.../extension/columnar/transition/Convention.scala | 12 +-
.../columnar/transition/ConventionFunc.scala | 10 +-
.../columnar/transition/ConventionReq.scala | 4 +-
.../columnar/transition/Transitions.scala | 2 +-
.../execution/ColumnarToCarrierRowExecBase.scala | 72 ++++++++++
.../gluten/execution/ColumnarToRowExecBase.scala | 2 +-
.../columnar/batchcarrier/BatchCarrierRow.scala | 95 +++++++++++++
.../extension/columnar/cost/LegacyCoster.scala | 4 +-
.../extension/columnar/cost/RoughCoster.scala | 3 +-
.../scala/org/apache/gluten/utils/PlanUtil.scala | 4 +-
.../sql/execution/ColumnarWriteFilesExec.scala | 2 +-
.../datasources/GlutenWriterColumnarRules.scala | 4 +-
.../columnar/MiscColumnarRulesSuite.scala | 6 +-
.../columnar/transition/TransitionSuite.scala | 60 +--------
.../columnar/transition/TransitionSuiteBase.scala | 75 ++++++++++-
.../spark/sql/execution/datasources/FakeRow.scala | 2 +
35 files changed, 426 insertions(+), 229 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index ef83c80339..0c8ee9d4a6 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.backendsapi._
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
import org.apache.gluten.component.Component.BuildInfo
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.WriteFilesExecTransformer
@@ -72,13 +72,13 @@ object CHBackend {
private class ConvFunc() extends ConventionFunc.Override {
override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType]
= {
case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
- CHBatch
+ CHBatchType
}
}
}
object CHBackendSettings extends BackendSettingsApi with Logging {
- override def primaryBatchType: Convention.BatchType = CHBatch
+ override def primaryBatchType: Convention.BatchType = CHBatchType
private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD =
"spark.gluten.sql.columnar.separate.scan.rdd.for.ch"
private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD_DEFAULT = "false"
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5e62126ff8..da0f037cee 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.CHBroadcastBuildSideCache
import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -73,7 +73,7 @@ class CHListenerApi extends ListenerApi with Logging {
private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
// Do row / batch type initializations.
Convention.ensureSparkRowAndBatchTypesRegistered()
- CHBatch.ensureRegistered()
+ CHBatchType.ensureRegistered()
SparkDirectoryUtil.init(conf)
val libPath =
conf.get(GlutenConfig.GLUTEN_LIB_PATH.key,
GlutenConfig.GLUTEN_LIB_PATH.defaultValueString)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 5f64238436..58e3036ab9 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.backendsapi.clickhouse
import org.apache.gluten.backendsapi.RuleApi
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
@@ -125,7 +125,7 @@ object CHRuleApi {
c.session)))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
- injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatch))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, CHBatchType))
injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
injector.injectPostTransform(c => ReplaceSubStringComparison(c.session))
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
similarity index 85%
rename from
backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
rename to
backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
index 0c4b9b048d..68aea9b32a 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{CHColumnarToRowExec,
RowToCHNativeColumna
* [[org.apache.gluten.extension.columnar.transition.TransitionDef]] instance.
The scala allows an
* compact way to implement trait using a lambda function.
*
- * Here the detail definition is given in [[CHBatch.fromRow]].
+ * Here the detail definition is given in [[CHBatchType.fromRow]].
* {{{
* fromRow(new TransitionDef {
* override def create(): Transition = new Transition {
@@ -37,9 +37,9 @@ import org.apache.spark.sql.execution.{CHColumnarToRowExec,
RowToCHNativeColumna
* })
* }}}
*/
-object CHBatch extends Convention.BatchType {
+object CHBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
- fromRow(Convention.RowType.VanillaRow, RowToCHNativeColumnarExec.apply)
- toRow(Convention.RowType.VanillaRow, CHColumnarToRowExec.apply)
+ fromRow(Convention.RowType.VanillaRowType, RowToCHNativeColumnarExec.apply)
+ toRow(Convention.RowType.VanillaRowType, CHColumnarToRowExec.apply)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 56e5a07239..7957be7697 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox
import org.apache.gluten.GlutenBuildInfo._
import org.apache.gluten.backendsapi._
-import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.component.Component.BuildInfo
import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
import org.apache.gluten.exception.GlutenNotSupportException
@@ -79,11 +78,11 @@ object VeloxBackend {
private class ConvFunc() extends ConventionFunc.Override {
override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType]
= {
case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
- VeloxBatch
+ VeloxBatchType
case i: InMemoryTableScanExec
if i.supportsColumnar && i.relation.cacheBuilder.serializer
.isInstanceOf[ColumnarCachedBatchSerializer] =>
- VeloxBatch
+ VeloxBatchType
}
}
}
@@ -96,7 +95,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX +
".udfAllowTypeConversion"
/** The columnar-batch type this backend is by default using. */
- override def primaryBatchType: Convention.BatchType = VeloxBatch
+ override def primaryBatchType: Convention.BatchType = VeloxBatchType
override def validateScanExec(
format: ReadFileFormat,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
similarity index 61%
rename from
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
rename to
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
index 0761a1a7a3..b580afa83d 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
@@ -14,16 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.backendsapi.velox
-import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec,
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
+import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec,
RowToVeloxColumnarExec, VeloxColumnarToCarrierRowExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, Transition}
-object VeloxBatch extends Convention.BatchType {
+object VeloxBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
- fromRow(Convention.RowType.VanillaRow, RowToVeloxColumnarExec.apply)
- toRow(Convention.RowType.VanillaRow, VeloxColumnarToRowExec.apply)
- fromBatch(ArrowBatches.ArrowNativeBatch,
ArrowColumnarToVeloxColumnarExec.apply)
- toBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
+ fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
+ toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
+ fromBatch(ArrowBatchTypes.ArrowNativeBatchType,
ArrowColumnarToVeloxColumnarExec.apply)
+ toBatch(ArrowBatchTypes.ArrowNativeBatchType, Transition.empty)
+ toRow(VeloxCarrierRowType, VeloxColumnarToCarrierRowExec.apply)
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
similarity index 70%
copy from
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
index 1aacc1b954..19b7bf17fe 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
@@ -14,16 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.spark.sql.execution
+package org.apache.gluten.backendsapi.velox
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
-trait BaseArrowScanExec extends GlutenPlan {
- final override def batchType(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
- }
-
- final override def rowType0(): Convention.RowType = Convention.RowType.None
+object VeloxCarrierRowType extends Convention.RowType {
+ override protected[this] def registerTransitions(): Unit = {}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index af5e1533b2..6752a92baa 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
-import org.apache.gluten.columnarbatch.VeloxBatch
+import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.VeloxConfig._
import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -169,9 +168,10 @@ class VeloxListenerApi extends ListenerApi with Logging {
// Do row / batch type initializations.
Convention.ensureSparkRowAndBatchTypesRegistered()
- ArrowJavaBatch.ensureRegistered()
- ArrowNativeBatch.ensureRegistered()
- VeloxBatch.ensureRegistered()
+ VeloxCarrierRowType.ensureRegistered()
+ ArrowJavaBatchType.ensureRegistered()
+ ArrowNativeBatchType.ensureRegistered()
+ VeloxBatchType.ensureRegistered()
// Register columnar shuffle so can be considered when
// `org.apache.spark.shuffle.GlutenShuffleManager` is set as Spark shuffle
manager.
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 443ad147de..f7246605b8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.backendsapi.velox
import org.apache.gluten.backendsapi.RuleApi
-import org.apache.gluten.columnarbatch.VeloxBatch
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.extension._
import org.apache.gluten.extension.columnar._
@@ -103,7 +102,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
- injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
// Gluten columnar: Fallback policies.
injector.injectFallbackPolicy(c => p =>
ExpandFallbackPolicy(c.caller.isAqe(), p))
@@ -119,7 +118,7 @@ object VeloxRuleApi {
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectFinal(
- c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatch)))
+ c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatchType)))
injector.injectFinal(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
injector.injectFinal(_ => RemoveFallbackTagRule())
@@ -193,7 +192,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c =>
HashAggregateIgnoreNullKeysRule.apply(c.session))
injector.injectPostTransform(_ => CollectLimitTransformerRule())
injector.injectPostTransform(_ => CollectTailTransformerRule())
- injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
+ injector.injectPostTransform(c =>
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session,
c.caller.isAqe()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
@@ -202,7 +201,7 @@ object VeloxRuleApi {
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
injector.injectPostTransform(
- c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatch)))
+ c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(),
Set(VeloxBatchType)))
injector.injectPostTransform(c =>
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf,
c.session))
injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 0ab51772e1..49d5659996 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -16,14 +16,15 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
+ extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map {
b =>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
index 3fb86a8aae..58a64d891f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType
import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
@@ -57,7 +57,7 @@ case class ColumnarRangeExec(
) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements,
outputAttributes, child) {
override def batchType(): Convention.BatchType = {
- ArrowJavaBatch
+ ArrowJavaBatchType
}
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
similarity index 62%
copy from
backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
copy to
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
index 0ab51772e1..e4a2c7554a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
@@ -16,21 +16,15 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch
+import org.apache.gluten.backendsapi.velox.{VeloxBatchType,
VeloxCarrierRowType}
+import org.apache.gluten.extension.columnar.transition.Convention
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.vectorized.ColumnarBatch
-case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
- override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
- in.map {
- b =>
- val out = VeloxColumnarBatches.toVeloxBatch(b)
- out
- }
- }
+case class VeloxColumnarToCarrierRowExec(override val child: SparkPlan)
+ extends ColumnarToCarrierRowExecBase {
+ override protected def fromBatchType(): Convention.BatchType = VeloxBatchType
+ override def rowType0(): Convention.RowType = VeloxCarrierRowType
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- ArrowColumnarToVeloxColumnarExec(child = newChild)
+ copy(child = newChild)
}
diff --git
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 7564500aae..5e9dd2f815 100644
---
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.api.python
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.execution.ValidatablePlan
import org.apache.gluten.extension.ValidationResult
@@ -213,7 +213,7 @@ case class ColumnarArrowEvalPythonExec(
extends EvalPythonExec
with ValidatablePlan {
- override def batchType(): Convention.BatchType = ArrowJavaBatch
+ override def batchType(): Convention.BatchType = ArrowJavaBatchType
override def rowType0(): Convention.RowType = Convention.RowType.None
@@ -234,7 +234,7 @@ case class ColumnarArrowEvalPythonExec(
}
override def requiredChildConvention(): Seq[ConventionReq] = List(
- ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))
+ ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatchType)))
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output
rows"),
diff --git
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 1aacc1b954..ebccd6f5bb 100644
---
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -16,13 +16,13 @@
*/
package org.apache.spark.sql.execution
-import org.apache.gluten.columnarbatch.ArrowBatches
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention
trait BaseArrowScanExec extends GlutenPlan {
final override def batchType(): Convention.BatchType = {
- ArrowBatches.ArrowJavaBatch
+ ArrowBatchTypes.ArrowJavaBatchType
}
final override def rowType0(): Convention.RowType = Convention.RowType.None
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index 37b075e862..79f4b918e1 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -16,201 +16,222 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.backendsapi.velox.VeloxListenerApi
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
-import org.apache.gluten.columnarbatch.VeloxBatch
-import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec,
LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec,
VeloxColumnarToRowExec}
-import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
+import org.apache.gluten.backendsapi.velox.{VeloxBatchType,
VeloxCarrierRowType, VeloxListenerApi}
+import org.apache.gluten.execution._
+import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatchType
import org.apache.gluten.test.MockVeloxBackend
import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec}
import org.apache.spark.sql.test.SharedSparkSession
-class VeloxTransitionSuite extends SharedSparkSession {
- import VeloxTransitionSuite._
+class VeloxTransitionSuite extends SharedSparkSession with TransitionSuiteBase
{
+ import TransitionSuiteBase._
private val api = new VeloxListenerApi()
test("Vanilla C2R - outputs row") {
- val in = BatchLeaf(VanillaBatch)
+ val in = BatchLeaf(VanillaBatchType)
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatch)))
+ assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatchType)))
}
test("Vanilla C2R - requires row input") {
- val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(VanillaBatch))
+ val in = RowUnary(Convention.RowType.VanillaRowType,
BatchLeaf(VanillaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
- out == RowUnary(Convention.RowType.VanillaRow,
ColumnarToRowExec(BatchLeaf(VanillaBatch))))
+ out == RowUnary(
+ Convention.RowType.VanillaRowType,
+ ColumnarToRowExec(BatchLeaf(VanillaBatchType))))
}
test("Vanilla R2C - requires vanilla input") {
- val in = BatchUnary(VanillaBatch, RowLeaf(Convention.RowType.VanillaRow))
+ val in = BatchUnary(VanillaBatchType,
RowLeaf(Convention.RowType.VanillaRowType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(
- out == ColumnarToRowExec(
- BatchUnary(VanillaBatch,
RowToColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))))
+ assert(out == ColumnarToRowExec(
+ BatchUnary(VanillaBatchType,
RowToColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))))
}
test("ArrowNative C2R - outputs row") {
- val in = BatchLeaf(ArrowNativeBatch)
+ val in = BatchLeaf(ArrowNativeBatchType)
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(out ==
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
+ assert(out ==
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType))))
}
test("ArrowNative C2R - requires row input") {
- val in = RowUnary(Convention.RowType.VanillaRow,
BatchLeaf(ArrowNativeBatch))
+ val in = RowUnary(Convention.RowType.VanillaRowType,
BatchLeaf(ArrowNativeBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == RowUnary(
- Convention.RowType.VanillaRow,
- ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
+ Convention.RowType.VanillaRowType,
+ ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType)))))
}
test("ArrowNative R2C - requires Arrow input") {
- val in = BatchUnary(ArrowNativeBatch,
RowLeaf(Convention.RowType.VanillaRow))
+ val in = BatchUnary(ArrowNativeBatchType,
RowLeaf(Convention.RowType.VanillaRowType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(
- ArrowNativeBatch,
- RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow))))))
+ ArrowNativeBatchType,
+
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType))))))
}
test("ArrowNative-to-Velox C2C") {
- val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
+ val in = BatchUnary(VeloxBatchType, BatchLeaf(ArrowNativeBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
// No explicit transition needed for ArrowNative-to-Velox.
// FIXME: Add explicit transitions.
// See https://github.com/apache/incubator-gluten/issues/7313.
assert(
out == VeloxColumnarToRowExec(
- BatchUnary(VeloxBatch,
ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatch)))))
+ BatchUnary(
+ VeloxBatchType,
+ ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatchType)))))
}
test("Velox-to-ArrowNative C2C") {
- val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
+ val in = BatchUnary(ArrowNativeBatchType, BatchLeaf(VeloxBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
- LoadArrowDataExec(BatchUnary(ArrowNativeBatch,
BatchLeaf(VeloxBatch)))))
+ LoadArrowDataExec(BatchUnary(ArrowNativeBatchType,
BatchLeaf(VeloxBatchType)))))
}
test("Vanilla-to-ArrowNative C2C") {
- val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
+ val in = BatchUnary(ArrowNativeBatchType, BatchLeaf(VanillaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
LoadArrowDataExec(BatchUnary(
- ArrowNativeBatch,
-
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
+ ArrowNativeBatchType,
+
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType)))))))
}
test("ArrowNative-to-Vanilla C2C") {
- val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
+ val in = BatchUnary(VanillaBatchType, BatchLeaf(ArrowNativeBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
- BatchUnary(VanillaBatch,
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
+ BatchUnary(VanillaBatchType,
LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType)))))
}
test("ArrowJava C2R - outputs row") {
- val in = BatchLeaf(ArrowJavaBatch)
+ val in = BatchLeaf(ArrowJavaBatchType)
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
+ assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatchType)))
}
test("ArrowJava C2R - requires row input") {
- val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(ArrowJavaBatch))
+ val in = RowUnary(Convention.RowType.VanillaRowType,
BatchLeaf(ArrowJavaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
- out == RowUnary(Convention.RowType.VanillaRow,
ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
+ out == RowUnary(
+ Convention.RowType.VanillaRowType,
+ ColumnarToRowExec(BatchLeaf(ArrowJavaBatchType))))
}
test("ArrowJava R2C - requires Arrow input") {
- val in = BatchUnary(ArrowJavaBatch, RowLeaf(Convention.RowType.VanillaRow))
+ val in = BatchUnary(ArrowJavaBatchType,
RowLeaf(Convention.RowType.VanillaRowType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
BatchUnary(
- ArrowJavaBatch,
-
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow))))))
+ ArrowJavaBatchType,
+
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType))))))
}
test("ArrowJava-to-Velox C2C") {
- val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
+ val in = BatchUnary(VeloxBatchType, BatchLeaf(ArrowJavaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
BatchUnary(
- VeloxBatch,
-
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch))))))
+ VeloxBatchType,
+
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatchType))))))
}
test("Velox-to-ArrowJava C2C") {
- val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
+ val in = BatchUnary(ArrowJavaBatchType, BatchLeaf(VeloxBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == ColumnarToRowExec(
- BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
+ BatchUnary(ArrowJavaBatchType,
LoadArrowDataExec(BatchLeaf(VeloxBatchType)))))
}
test("Vanilla-to-ArrowJava C2C") {
- val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
+ val in = BatchUnary(ArrowJavaBatchType, BatchLeaf(VanillaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
- out == ColumnarToRowExec(
- BatchUnary(
- ArrowJavaBatch,
-
LoadArrowDataExec(RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
+ out == ColumnarToRowExec(BatchUnary(
+ ArrowJavaBatchType,
+
LoadArrowDataExec(RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType)))))))
}
test("ArrowJava-to-Vanilla C2C") {
- val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
+ val in = BatchUnary(VanillaBatchType, BatchLeaf(ArrowJavaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch,
BatchLeaf(ArrowJavaBatch))))
+ assert(out == ColumnarToRowExec(BatchUnary(VanillaBatchType,
BatchLeaf(ArrowJavaBatchType))))
}
test("Velox C2R - outputs row") {
- val in = BatchLeaf(VeloxBatch)
+ val in = BatchLeaf(VeloxBatchType)
val out = BackendTransitions.insert(in, outputsColumnar = false)
- assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+ assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatchType)))
}
test("Velox C2R - requires row input") {
- val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(VeloxBatch))
+ val in = RowUnary(Convention.RowType.VanillaRowType,
BatchLeaf(VeloxBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
- out == RowUnary(Convention.RowType.VanillaRow,
VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+ out == RowUnary(
+ Convention.RowType.VanillaRowType,
+ VeloxColumnarToRowExec(BatchLeaf(VeloxBatchType))))
}
test("Velox R2C - outputs Velox") {
- val in = RowLeaf(Convention.RowType.VanillaRow)
+ val in = RowLeaf(Convention.RowType.VanillaRowType)
val out = BackendTransitions.insert(in, outputsColumnar = true)
- assert(out ==
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))
+ assert(out ==
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))
}
test("Velox R2C - requires Velox input") {
- val in = BatchUnary(VeloxBatch, RowLeaf(Convention.RowType.VanillaRow))
+ val in = BatchUnary(VeloxBatchType,
RowLeaf(Convention.RowType.VanillaRowType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
- BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))))
+ BatchUnary(
+ VeloxBatchType,
+ RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))))
}
test("Vanilla-to-Velox C2C") {
- val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+ val in = BatchUnary(VeloxBatchType, BatchLeaf(VanillaBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
out == VeloxColumnarToRowExec(
- BatchUnary(VeloxBatch,
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+ BatchUnary(
+ VeloxBatchType,
+
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType))))))
}
test("Velox-to-Vanilla C2C") {
- val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+ val in = BatchUnary(VanillaBatchType, BatchLeaf(VeloxBatchType))
+ val out = BackendTransitions.insert(in, outputsColumnar = false)
+ assert(
+ out == ColumnarToRowExec(
+ BatchUnary(VanillaBatchType,
LoadArrowDataExec(BatchLeaf(VeloxBatchType)))))
+ }
+
+ test("Velox-to-CarrierRow C2R") {
+ val in =
+ RowToRow(VeloxCarrierRowType, Convention.RowType.VanillaRowType,
BatchLeaf(VeloxBatchType))
val out = BackendTransitions.insert(in, outputsColumnar = false)
assert(
- out == ColumnarToRowExec(BatchUnary(VanillaBatch,
LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
+ out == RowToRow(
+ VeloxCarrierRowType,
+ Convention.RowType.VanillaRowType,
+ VeloxColumnarToCarrierRowExec(BatchLeaf(VeloxBatchType))))
}
override protected def beforeAll(): Unit = {
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
similarity index 78%
rename from
gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
rename to
gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
index c23d6eea79..a0cbb7b912 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
@@ -14,27 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.backendsapi.arrow
import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec}
import org.apache.gluten.extension.columnar.transition.{Convention, Transition}
-import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+import
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatchType
-object ArrowBatches {
+object ArrowBatchTypes {
/**
* ArrowJavaBatch stands for Gluten's Java Arrow-based columnar batch
implementation.
*
* ArrowJavaBatch should have
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
* populated in it. ArrowJavaBatch can be offloaded to ArrowNativeBatch
through API in
- * [[ColumnarBatches]].
+ * [[org.apache.gluten.columnarbatch.ColumnarBatches]].
*
* ArrowJavaBatch is compatible with vanilla batch since it provides valid
#get<type>(...)
* implementations.
*/
- object ArrowJavaBatch extends Convention.BatchType {
+ object ArrowJavaBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
- toBatch(VanillaBatch, Transition.empty)
+ toBatch(VanillaBatchType, Transition.empty)
}
}
@@ -43,12 +43,12 @@ object ArrowBatches {
*
* ArrowNativeBatch should have
[[org.apache.gluten.columnarbatch.IndicatorVector]] set as the
* first vector. ArrowNativeBatch can be loaded to ArrowJavaBatch through
API in
- * [[ColumnarBatches]].
+ * [[org.apache.gluten.columnarbatch.ColumnarBatches]].
*/
- object ArrowNativeBatch extends Convention.BatchType {
+ object ArrowNativeBatchType extends Convention.BatchType {
override protected def registerTransitions(): Unit = {
- fromBatch(ArrowJavaBatch, OffloadArrowDataExec.apply)
- toBatch(ArrowJavaBatch, LoadArrowDataExec.apply)
+ fromBatch(ArrowJavaBatchType, OffloadArrowDataExec.apply)
+ toBatch(ArrowJavaBatchType, LoadArrowDataExec.apply)
}
}
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
index 36f3c48d4e..1f1750113a 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -16,16 +16,16 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
+import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
-/** Converts input data with batch type [[ArrowNativeBatch]] to type
[[ArrowJavaBatch]]. */
+/** Converts input data with batch type [[ArrowNativeBatchType]] to type
[[ArrowJavaBatchType]]. */
case class LoadArrowDataExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowNativeBatch, ArrowJavaBatch) {
+ extends ColumnarToColumnarExec(ArrowNativeBatchType, ArrowJavaBatchType) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
}
diff --git
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
index 7b2184a240..6e548adbf6 100644
---
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
+++
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -16,16 +16,16 @@
*/
package org.apache.gluten.execution
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch,
ArrowNativeBatch}
+import
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType,
ArrowNativeBatchType}
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.vectorized.ColumnarBatch
-/** Converts input data with batch type [[ArrowJavaBatch]] to type
[[ArrowNativeBatch]]. */
+/** Converts input data with batch type [[ArrowJavaBatchType]] to type
[[ArrowNativeBatchType]]. */
case class OffloadArrowDataExec(override val child: SparkPlan)
- extends ColumnarToColumnarExec(ArrowJavaBatch, ArrowNativeBatch) {
+ extends ColumnarToColumnarExec(ArrowJavaBatchType, ArrowNativeBatchType) {
override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] = {
in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance,
b))
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index efecb48102..a4058e5c7b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -78,7 +78,7 @@ object GlutenPlanModel {
override val rowType0: Convention.RowType = {
val out = req.req.requiredRowType match {
- case ConventionReq.RowType.Any => Convention.RowType.VanillaRow
+ case ConventionReq.RowType.Any => Convention.RowType.VanillaRowType
case ConventionReq.RowType.Is(r) => r
}
out
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index e8aa8d2ad6..75ca8a775b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -33,9 +33,9 @@ sealed trait Convention {
object Convention {
def ensureSparkRowAndBatchTypesRegistered(): Unit = {
RowType.None.ensureRegistered()
- RowType.VanillaRow.ensureRegistered()
+ RowType.VanillaRowType.ensureRegistered()
BatchType.None.ensureRegistered()
- BatchType.VanillaBatch.ensureRegistered()
+ BatchType.VanillaBatchType.ensureRegistered()
}
implicit class ConventionOps(val conv: Convention) extends AnyVal {
@@ -130,7 +130,7 @@ object Convention {
final case object None extends RowType {
override protected[this] def registerTransitions(): Unit = {}
}
- final case object VanillaRow extends RowType {
+ final case object VanillaRowType extends RowType {
override protected[this] def registerTransitions(): Unit = {}
}
}
@@ -151,10 +151,10 @@ object Convention {
final case object None extends BatchType {
override protected[this] def registerTransitions(): Unit = {}
}
- final case object VanillaBatch extends BatchType {
+ final case object VanillaBatchType extends BatchType {
override protected[this] def registerTransitions(): Unit = {
- fromRow(RowType.VanillaRow, RowToColumnarExec.apply)
- toRow(RowType.VanillaRow, ColumnarToRowExec.apply)
+ fromRow(RowType.VanillaRowType, RowToColumnarExec.apply)
+ toRow(RowType.VanillaRowType, ColumnarToRowExec.apply)
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index f7e837b25d..83711e71f6 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -99,7 +99,7 @@ object ConventionFunc {
case k: Convention.KnownRowType =>
k.rowType()
case _ if SparkShimLoader.getSparkShims.supportsRowBased(plan) =>
- Convention.RowType.VanillaRow
+ Convention.RowType.VanillaRowType
case _ =>
Convention.RowType.None
}
@@ -131,7 +131,7 @@ object ConventionFunc {
case k: Convention.KnownBatchType =>
k.batchType()
case _ if plan.supportsColumnar =>
- Convention.BatchType.VanillaBatch
+ Convention.BatchType.VanillaBatchType
case _ =>
Convention.BatchType.None
}
@@ -166,13 +166,13 @@ object ConventionFunc {
case RowToColumnarLike(_) =>
Seq(
ConventionReq.of(
- ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRowType),
ConventionReq.BatchType.Any))
case ColumnarToRowExec(_) =>
Seq(
ConventionReq.of(
ConventionReq.RowType.Any,
- ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch)))
+ ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatchType)))
case write: DataWritingCommandExec if
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
// To align with
ApplyColumnarRulesAndInsertTransitions#insertTransitions
Seq(ConventionReq.any)
@@ -183,7 +183,7 @@ object ConventionFunc {
Seq.tabulate(u.children.size)(
_ =>
ConventionReq.of(
- ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+ ConventionReq.RowType.Is(Convention.RowType.VanillaRowType),
ConventionReq.BatchType.Any))
case other =>
// In the normal case, children's convention should follow parent
node's convention.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index 0f20656537..741fe04e65 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -52,8 +52,8 @@ object ConventionReq {
) extends ConventionReq
val any: ConventionReq = of(RowType.Any, BatchType.Any)
- val vanillaRow: ConventionReq =
ofRow(RowType.Is(Convention.RowType.VanillaRow))
- val vanillaBatch: ConventionReq =
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
+ val vanillaRow: ConventionReq =
ofRow(RowType.Is(Convention.RowType.VanillaRowType))
+ val vanillaBatch: ConventionReq =
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatchType))
def get(plan: SparkPlan): Seq[ConventionReq] =
ConventionFunc.create().conventionReqOf(plan)
def of(rowType: RowType, batchType: BatchType): ConventionReq =
Impl(rowType, batchType)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index e6bde44704..3e55b0e11a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -86,7 +86,7 @@ object RemoveTransitions extends Rule[SparkPlan] {
object Transitions {
def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
- InsertTransitions.create(outputsColumnar,
BatchType.VanillaBatch).apply(plan)
+ InsertTransitions.create(outputsColumnar,
BatchType.VanillaBatchType).apply(plan)
}
def toRowPlan(plan: SparkPlan): SparkPlan = {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
new file mode 100644
index 0000000000..979ddd6a71
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.batchcarrier.{BatchCarrierRow,
PlaceholderRow, TerminalRow}
+import org.apache.gluten.extension.columnar.transition.{Convention,
ConventionReq}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.ColumnarToRowTransition
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/** The operator that converts columnar batches to [[BatchCarrierRow]]s. */
+abstract class ColumnarToCarrierRowExecBase extends ColumnarToRowTransition
with GlutenPlan {
+
+ override def batchType(): Convention.BatchType = Convention.BatchType.None
+
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType())))
+ }
+
+ protected def fromBatchType(): Convention.BatchType
+
+ override lazy val metrics: Map[String, SQLMetric] =
+ Map(
+ "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of
input batches"),
+ "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of
output rows")
+ )
+
+ override protected def doExecute(): RDD[InternalRow] = {
+ val numInputBatches = longMetric("numInputBatches")
+ val numOutputRows = longMetric("numOutputRows")
+
+ child.executeColumnar().mapPartitions {
+ itr =>
+ itr.flatMap {
+ b: ColumnarBatch =>
+ numInputBatches += 1
+ val numRows = b.numRows()
+ if (numRows == 0) {
+ Nil
+ } else {
+ val carrierRows = new Array[BatchCarrierRow](numRows)
+ for (i <- 0 until numRows - 1) {
+ carrierRows(i) = new PlaceholderRow()
+ }
+ carrierRows(numRows - 1) = new TerminalRow(b)
+ numOutputRows += carrierRows.length
+ carrierRows
+ }
+ }
+ }
+ }
+
+ override def output: Seq[Attribute] = child.output
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index 5beaf49572..15103bbd84 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -42,7 +42,7 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
override def batchType(): Convention.BatchType = Convention.BatchType.None
- override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+ override def rowType0(): Convention.RowType =
Convention.RowType.VanillaRowType
override def requiredChildConvention(): Seq[ConventionReq] = {
List(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
new file mode 100644
index 0000000000..21ef26405f
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.batchcarrier
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An internal-row abstraction that is designed for columnar-based
computations to bypass Spark's
+ * row-based APIs / SPIs with zero copy.
+ *
+ * Two implementations are pre-defined:
+ *
+ * - TerminalRow
+ * - PlaceholderRow
+ *
+ * To bypass Spark's row APIs, one single columnar batch will be converted to
a series of
+ * PassiveRows, followed by one SentinelRow that actually wraps that columnar
batch. The total
+ * number of PlaceholderRows + the TerminalRow equates to size of the original
columnar batch.
+ */
+sealed abstract class BatchCarrierRow extends InternalRow {
+ override def numFields: Int = throw unsupported()
+
+ override def setNullAt(i: Int): Unit = throw unsupported()
+
+ override def update(i: Int, value: Any): Unit = throw unsupported()
+
+ override def copy(): InternalRow = throw unsupported()
+
+ override def isNullAt(ordinal: Int): Boolean = throw unsupported()
+
+ override def getBoolean(ordinal: Int): Boolean = throw unsupported()
+
+ override def getByte(ordinal: Int): Byte = throw unsupported()
+
+ override def getShort(ordinal: Int): Short = throw unsupported()
+
+ override def getInt(ordinal: Int): Int = throw unsupported()
+
+ override def getLong(ordinal: Int): Long = throw unsupported()
+
+ override def getFloat(ordinal: Int): Float = throw unsupported()
+
+ override def getDouble(ordinal: Int): Double = throw unsupported()
+
+ override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal =
throw unsupported()
+
+ override def getUTF8String(ordinal: Int): UTF8String = throw unsupported()
+
+ override def getBinary(ordinal: Int): Array[Byte] = throw unsupported()
+
+ override def getInterval(ordinal: Int): CalendarInterval = throw
unsupported()
+
+ override def getStruct(ordinal: Int, numFields: Int): InternalRow = throw
unsupported()
+
+ override def getArray(ordinal: Int): ArrayData = throw unsupported()
+
+ override def getMap(ordinal: Int): MapData = throw unsupported()
+
+ override def get(ordinal: Int, dataType: DataType): AnyRef = throw
unsupported()
+
+ private def unsupported() = {
+ new UnsupportedOperationException(
+ "Underlying columnar data is inaccessible from BatchCarrierRow")
+ }
+}
+
+/**
+ * A [[BatchCarrierRow]] implementation that is backed by a
+ * [[org.apache.spark.sql.vectorized.ColumnarBatch]].
+ */
+class TerminalRow(val batch: ColumnarBatch) extends BatchCarrierRow
+
+/**
+ * A [[BatchCarrierRow]] implementation with no data. The only function of
this row implementation
+ * is to provide row metadata to the receiver and to support correct
row-counting.
+ */
+class PlaceholderRow extends BatchCarrierRow
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
index a8e1524fc9..8c01c0bde7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
@@ -16,6 +16,7 @@
*/
package org.apache.gluten.extension.columnar.cost
+import org.apache.gluten.execution.ColumnarToCarrierRowExecBase
import
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
@@ -33,7 +34,8 @@ object LegacyCoster extends LongCoster {
// much as possible.
private def selfCostOf0(node: SparkPlan): Long = {
node match {
- case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0
+ case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0L
+ case _: ColumnarToCarrierRowExecBase => 0L
case ColumnarToRowLike(_) => 10L
case RowToColumnarLike(_) => 10L
case ColumnarToColumnarLike(_) => 5L
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
index caee696df6..ba26528499 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
@@ -16,7 +16,7 @@
*/
package org.apache.gluten.extension.columnar.cost
-import org.apache.gluten.execution.RowToColumnarExecBase
+import org.apache.gluten.execution.{ColumnarToCarrierRowExecBase,
RowToColumnarExecBase}
import
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike,
ColumnarToRowLike, RowToColumnarLike}
import org.apache.gluten.utils.PlanUtil
@@ -42,6 +42,7 @@ object RoughCoster extends LongCoster {
// Avoid moving computation back to native when transition has complex
types in schema.
// Such transitions are observed to be extremely expensive as of now.
Long.MaxValue
+ case _: ColumnarToCarrierRowExecBase => 0L
case ColumnarToRowLike(_) => 10L
case RowToColumnarLike(_) => 10L
case ColumnarToColumnarLike(_) => 5L
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index f2b1bd0900..fd5549f8cf 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -42,7 +42,7 @@ object PlanUtil {
}
def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
- Convention.get(plan).batchType == Convention.BatchType.VanillaBatch
+ Convention.get(plan).batchType == Convention.BatchType.VanillaBatchType
}
def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
@@ -50,6 +50,6 @@ object PlanUtil {
}
private def isGlutenBatchType(batchType: Convention.BatchType) = {
- batchType != Convention.BatchType.None && batchType !=
Convention.BatchType.VanillaBatch
+ batchType != Convention.BatchType.None && batchType !=
Convention.BatchType.VanillaBatchType
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 87a2f6d9c2..998f4f86b5 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -71,7 +71,7 @@ abstract class ColumnarWriteFilesExec protected (
*/
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
override def rowType0(): RowType = {
- RowType.VanillaRow
+ RowType.VanillaRowType
}
override def output: Seq[Attribute] = Seq.empty
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index d58026b123..21ad159680 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -61,7 +61,7 @@ case class FakeRowAdaptor(child: SparkPlan)
override def batchType(): Convention.BatchType =
BackendsApiManager.getSettings.primaryBatchType
- override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+ override def rowType0(): Convention.RowType =
Convention.RowType.VanillaRowType
override protected def doExecute(): RDD[InternalRow] = {
doExecuteColumnar().map(cb => new FakeRowEnhancement(cb))
@@ -86,8 +86,6 @@ case class FakeRowAdaptor(child: SparkPlan)
copy(child = newChild)
}
-case class MATERIALIZE_TAG()
-
object GlutenWriterColumnarRules {
// TODO: support ctas in Spark3.4, see
https://github.com/apache/spark/pull/39220
// TODO: support dynamic partition and bucket write
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
index 085f9d551b..81068c994f 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar
import org.apache.gluten.component.WithDummyBackend
import
org.apache.gluten.extension.columnar.MiscColumnarRules.PreventBatchTypeMismatchInTableCache
import org.apache.gluten.extension.columnar.transition.Convention
-import
org.apache.gluten.extension.columnar.transition.TransitionSuite.BatchToRow
+import
org.apache.gluten.extension.columnar.transition.TransitionSuiteBase.BatchToRow
import org.apache.spark.sql.test.SharedSparkSession
@@ -28,8 +28,8 @@ class MiscColumnarRulesSuite extends SharedSparkSession with
WithDummyBackend {
test("Fix ColumnarToRowRemovalGuard not able to be copied") {
val dummyPlan =
BatchToRow(
- Convention.BatchType.VanillaBatch,
- Convention.RowType.VanillaRow,
+ Convention.BatchType.VanillaBatchType,
+ Convention.RowType.VanillaRowType,
spark.range(1).queryExecution.sparkPlan)
val cloned =
PreventBatchTypeMismatchInTableCache(isCalledByTableCachePlaning = true,
Set.empty)
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index e4e1e88372..716451e2f2 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -18,18 +18,14 @@ package org.apache.gluten.extension.columnar.transition
import org.apache.gluten.component.WithDummyBackend
import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution._
import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.vectorized.ColumnarBatch
-class TransitionSuite extends SharedSparkSession with WithDummyBackend {
+class TransitionSuite extends SharedSparkSession with TransitionSuiteBase with
WithDummyBackend {
import TransitionSuite._
+ import TransitionSuiteBase._
override protected def sparkConf: SparkConf =
super.sparkConf
@@ -120,6 +116,8 @@ class TransitionSuite extends SharedSparkSession with
WithDummyBackend {
}
object TransitionSuite extends TransitionSuiteBase {
+ import TransitionSuiteBase._
+
private def insertTransitions(plan: SparkPlan, req: ConventionReq):
SparkPlan = {
InsertTransitions(req).apply(plan)
}
@@ -161,54 +159,4 @@ object TransitionSuite extends TransitionSuiteBase {
toBatch(BatchTypeB, RowToBatch(this, BatchTypeB, _))
}
}
-
- case class RowToBatch(
- fromRowType: Convention.RowType,
- toBatchType: Convention.BatchType,
- override val child: SparkPlan)
- extends RowToColumnarTransition
- with GlutenPlan {
- override def batchType(): Convention.BatchType = toBatchType
- override def rowType0(): Convention.RowType = Convention.RowType.None
- override def requiredChildConvention(): Seq[ConventionReq] = {
- List(ConventionReq.ofRow(ConventionReq.RowType.Is(fromRowType)))
- }
-
- override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
- copy(child = newChild)
- override protected def doExecute(): RDD[InternalRow] =
- throw new UnsupportedOperationException()
- override def output: Seq[Attribute] = child.output
- }
-
- case class BatchToRow(
- fromBatchType: Convention.BatchType,
- toRowType: Convention.RowType,
- override val child: SparkPlan)
- extends ColumnarToRowTransition
- with GlutenPlan {
- override def batchType(): Convention.BatchType = Convention.BatchType.None
- override def rowType0(): Convention.RowType = toRowType
- override def requiredChildConvention(): Seq[ConventionReq] = {
- List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType)))
- }
-
- override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
- copy(child = newChild)
- override protected def doExecute(): RDD[InternalRow] =
- throw new UnsupportedOperationException()
- override def output: Seq[Attribute] = child.output
- }
-
- case class BatchToBatch(
- from: Convention.BatchType,
- to: Convention.BatchType,
- override val child: SparkPlan)
- extends ColumnarToColumnarExec(from, to) {
- override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
- copy(child = newChild)
- override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
- override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] =
- throw new UnsupportedOperationException()
- }
}
diff --git
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index e67d03d9ba..3724008509 100644
---
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,14 +16,17 @@
*/
package org.apache.gluten.extension.columnar.transition
-import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{BinaryExecNode, LeafExecNode,
SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{BinaryExecNode,
ColumnarToRowTransition, LeafExecNode, RowToColumnarTransition, SparkPlan,
UnaryExecNode}
+import org.apache.spark.sql.vectorized.ColumnarBatch
-trait TransitionSuiteBase {
+trait TransitionSuiteBase {}
+
+object TransitionSuiteBase {
case class BatchLeaf(override val batchType: Convention.BatchType)
extends LeafExecNode
with GlutenPlan {
@@ -104,4 +107,70 @@ trait TransitionSuiteBase {
override def output: Seq[Attribute] = left.output ++ right.output
}
+ case class RowToBatch(
+ fromRowType: Convention.RowType,
+ toBatchType: Convention.BatchType,
+ override val child: SparkPlan)
+ extends RowToColumnarTransition
+ with GlutenPlan {
+ override def batchType(): Convention.BatchType = toBatchType
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofRow(ConventionReq.RowType.Is(fromRowType)))
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchToRow(
+ fromBatchType: Convention.BatchType,
+ toRowType: Convention.RowType,
+ override val child: SparkPlan)
+ extends ColumnarToRowTransition
+ with GlutenPlan {
+ override def batchType(): Convention.BatchType = Convention.BatchType.None
+ override def rowType0(): Convention.RowType = toRowType
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType)))
+ }
+
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] =
+ throw new UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ }
+
+ case class BatchToBatch(
+ from: Convention.BatchType,
+ to: Convention.BatchType,
+ override val child: SparkPlan)
+ extends ColumnarToColumnarExec(from, to) {
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override protected def mapIterator(in: Iterator[ColumnarBatch]):
Iterator[ColumnarBatch] =
+ throw new UnsupportedOperationException()
+ }
+
+ case class RowToRow(
+ from: Convention.RowType,
+ to: Convention.RowType,
+ override val child: SparkPlan)
+ extends UnaryExecNode
+ with GlutenPlan {
+ override def batchType(): Convention.BatchType = Convention.BatchType.None
+ override def rowType0(): Convention.RowType = to
+ override def requiredChildConvention(): Seq[ConventionReq] = {
+ List(ConventionReq.ofRow(ConventionReq.RowType.Is(from)))
+ }
+ override protected def doExecute(): RDD[InternalRow] = throw new
UnsupportedOperationException()
+ override def output: Seq[Attribute] = child.output
+ override protected def withNewChildInternal(newChild: SparkPlan):
SparkPlan =
+ copy(child = newChild)
+ }
}
diff --git
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
index 679a69334a..81b4c01b46 100644
---
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
@@ -24,6 +24,8 @@ import org.apache.spark.unsafe.types.{CalendarInterval,
UTF8String}
trait IFakeRowAdaptor
+/** Deprecated: Moving to new API `BatchCarrierRow` in `gluten-substrait`. */
+@deprecated
class FakeRow(@transient var batch: ColumnarBatch) extends InternalRow {
override def numFields: Int = throw new UnsupportedOperationException()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]