This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 6004a9e761 [GLUTEN-9666][VL] Add BatchCarrierRow in preparation for 
replacement of FakeRow (#9708)
6004a9e761 is described below

commit 6004a9e76103b55b41017eebf27cd2a91dad6f1f
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed May 21 12:04:32 2025 +0100

    [GLUTEN-9666][VL] Add BatchCarrierRow in preparation for replacement of 
FakeRow (#9708)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  |   6 +-
 .../backendsapi/clickhouse/CHListenerApi.scala     |   4 +-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |   4 +-
 .../{CHBatch.scala => CHBatchType.scala}           |   8 +-
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   7 +-
 .../velox/VeloxBatchType.scala}                    |  16 ++-
 .../backendsapi/velox/VeloxCarrierRowType.scala}   |  12 +-
 .../backendsapi/velox/VeloxListenerApi.scala       |  10 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   9 +-
 .../ArrowColumnarToVeloxColumnarExec.scala         |   7 +-
 .../gluten/execution/ColumnarRangeExec.scala       |   4 +-
 ...c.scala => VeloxColumnarToCarrierRowExec.scala} |  20 +--
 .../api/python/ColumnarArrowEvalPythonExec.scala   |   6 +-
 .../spark/sql/execution/BaseArrowScanExec.scala    |   4 +-
 .../columnar/transition/VeloxTransitionSuite.scala | 147 ++++++++++++---------
 .../arrow/ArrowBatchTypes.scala}                   |  20 +--
 .../gluten/execution/LoadArrowDataExec.scala       |   6 +-
 .../gluten/execution/OffloadArrowDataExec.scala    |   6 +-
 .../enumerated/planner/plan/GlutenPlanModel.scala  |   2 +-
 .../extension/columnar/transition/Convention.scala |  12 +-
 .../columnar/transition/ConventionFunc.scala       |  10 +-
 .../columnar/transition/ConventionReq.scala        |   4 +-
 .../columnar/transition/Transitions.scala          |   2 +-
 .../execution/ColumnarToCarrierRowExecBase.scala   |  72 ++++++++++
 .../gluten/execution/ColumnarToRowExecBase.scala   |   2 +-
 .../columnar/batchcarrier/BatchCarrierRow.scala    |  95 +++++++++++++
 .../extension/columnar/cost/LegacyCoster.scala     |   4 +-
 .../extension/columnar/cost/RoughCoster.scala      |   3 +-
 .../scala/org/apache/gluten/utils/PlanUtil.scala   |   4 +-
 .../sql/execution/ColumnarWriteFilesExec.scala     |   2 +-
 .../datasources/GlutenWriterColumnarRules.scala    |   4 +-
 .../columnar/MiscColumnarRulesSuite.scala          |   6 +-
 .../columnar/transition/TransitionSuite.scala      |  60 +--------
 .../columnar/transition/TransitionSuiteBase.scala  |  75 ++++++++++-
 .../spark/sql/execution/datasources/FakeRow.scala  |   2 +
 35 files changed, 426 insertions(+), 229 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index ef83c80339..0c8ee9d4a6 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.GlutenBuildInfo._
 import org.apache.gluten.backendsapi._
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
 import org.apache.gluten.component.Component.BuildInfo
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.WriteFilesExecTransformer
@@ -72,13 +72,13 @@ object CHBackend {
   private class ConvFunc() extends ConventionFunc.Override {
     override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] 
= {
       case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
-        CHBatch
+        CHBatchType
     }
   }
 }
 
 object CHBackendSettings extends BackendSettingsApi with Logging {
-  override def primaryBatchType: Convention.BatchType = CHBatch
+  override def primaryBatchType: Convention.BatchType = CHBatchType
 
   private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD = 
"spark.gluten.sql.columnar.separate.scan.rdd.for.ch"
   private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD_DEFAULT = "false"
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
index 5e62126ff8..da0f037cee 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution.CHBroadcastBuildSideCache
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -73,7 +73,7 @@ class CHListenerApi extends ListenerApi with Logging {
   private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
     // Do row / batch type initializations.
     Convention.ensureSparkRowAndBatchTypesRegistered()
-    CHBatch.ensureRegistered()
+    CHBatchType.ensureRegistered()
     SparkDirectoryUtil.init(conf)
     val libPath =
       conf.get(GlutenConfig.GLUTEN_LIB_PATH.key, 
GlutenConfig.GLUTEN_LIB_PATH.defaultValueString)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 5f64238436..58e3036ab9 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.backendsapi.RuleApi
-import org.apache.gluten.columnarbatch.CHBatch
+import org.apache.gluten.columnarbatch.CHBatchType
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
@@ -125,7 +125,7 @@ object CHRuleApi {
             c.session)))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
-    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatch))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatchType))
     injector.injectPostTransform(c => RemoveDuplicatedColumns(c.session))
     injector.injectPostTransform(c => AddPreProjectionForHashJoin(c.session))
     injector.injectPostTransform(c => ReplaceSubStringComparison(c.session))
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
similarity index 85%
rename from 
backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
rename to 
backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
index 0c4b9b048d..68aea9b32a 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatch.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/columnarbatch/CHBatchType.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.{CHColumnarToRowExec, 
RowToCHNativeColumna
  * [[org.apache.gluten.extension.columnar.transition.TransitionDef]] instance. 
The scala allows an
  * compact way to implement trait using a lambda function.
  *
- * Here the detail definition is given in [[CHBatch.fromRow]].
+ * Here the detail definition is given in [[CHBatchType.fromRow]].
  * {{{
  *       fromRow(new TransitionDef {
  *       override def create(): Transition = new Transition {
@@ -37,9 +37,9 @@ import org.apache.spark.sql.execution.{CHColumnarToRowExec, 
RowToCHNativeColumna
  *     })
  * }}}
  */
-object CHBatch extends Convention.BatchType {
+object CHBatchType extends Convention.BatchType {
   override protected def registerTransitions(): Unit = {
-    fromRow(Convention.RowType.VanillaRow, RowToCHNativeColumnarExec.apply)
-    toRow(Convention.RowType.VanillaRow, CHColumnarToRowExec.apply)
+    fromRow(Convention.RowType.VanillaRowType, RowToCHNativeColumnarExec.apply)
+    toRow(Convention.RowType.VanillaRowType, CHColumnarToRowExec.apply)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 56e5a07239..7957be7697 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.GlutenBuildInfo._
 import org.apache.gluten.backendsapi._
-import org.apache.gluten.columnarbatch.VeloxBatch
 import org.apache.gluten.component.Component.BuildInfo
 import org.apache.gluten.config.{GlutenConfig, VeloxConfig}
 import org.apache.gluten.exception.GlutenNotSupportException
@@ -79,11 +78,11 @@ object VeloxBackend {
   private class ConvFunc() extends ConventionFunc.Override {
     override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] 
= {
       case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
-        VeloxBatch
+        VeloxBatchType
       case i: InMemoryTableScanExec
           if i.supportsColumnar && i.relation.cacheBuilder.serializer
             .isInstanceOf[ColumnarCachedBatchSerializer] =>
-        VeloxBatch
+        VeloxBatchType
     }
   }
 }
@@ -96,7 +95,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
   val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + 
".udfAllowTypeConversion"
 
   /** The columnar-batch type this backend is by default using. */
-  override def primaryBatchType: Convention.BatchType = VeloxBatch
+  override def primaryBatchType: Convention.BatchType = VeloxBatchType
 
   override def validateScanExec(
       format: ReadFileFormat,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
similarity index 61%
rename from 
backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
rename to 
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
index 0761a1a7a3..b580afa83d 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/columnarbatch/VeloxBatch.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBatchType.scala
@@ -14,16 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.backendsapi.velox
 
-import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, 
RowToVeloxColumnarExec, VeloxColumnarToRowExec}
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
+import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, 
RowToVeloxColumnarExec, VeloxColumnarToCarrierRowExec, VeloxColumnarToRowExec}
 import org.apache.gluten.extension.columnar.transition.{Convention, Transition}
 
-object VeloxBatch extends Convention.BatchType {
+object VeloxBatchType extends Convention.BatchType {
   override protected def registerTransitions(): Unit = {
-    fromRow(Convention.RowType.VanillaRow, RowToVeloxColumnarExec.apply)
-    toRow(Convention.RowType.VanillaRow, VeloxColumnarToRowExec.apply)
-    fromBatch(ArrowBatches.ArrowNativeBatch, 
ArrowColumnarToVeloxColumnarExec.apply)
-    toBatch(ArrowBatches.ArrowNativeBatch, Transition.empty)
+    fromRow(Convention.RowType.VanillaRowType, RowToVeloxColumnarExec.apply)
+    toRow(Convention.RowType.VanillaRowType, VeloxColumnarToRowExec.apply)
+    fromBatch(ArrowBatchTypes.ArrowNativeBatchType, 
ArrowColumnarToVeloxColumnarExec.apply)
+    toBatch(ArrowBatchTypes.ArrowNativeBatchType, Transition.empty)
+    toRow(VeloxCarrierRowType, VeloxColumnarToCarrierRowExec.apply)
   }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
similarity index 70%
copy from 
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to 
backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
index 1aacc1b954..19b7bf17fe 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxCarrierRowType.scala
@@ -14,16 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution
+package org.apache.gluten.backendsapi.velox
 
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
-trait BaseArrowScanExec extends GlutenPlan {
-  final override def batchType(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
-  }
-
-  final override def rowType0(): Convention.RowType = Convention.RowType.None
+object VeloxCarrierRowType extends Convention.RowType {
+  override protected[this] def registerTransitions(): Unit = {}
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
index af5e1533b2..6752a92baa 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.ListenerApi
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
-import org.apache.gluten.columnarbatch.VeloxBatch
+import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.config.VeloxConfig._
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
@@ -169,9 +168,10 @@ class VeloxListenerApi extends ListenerApi with Logging {
 
     // Do row / batch type initializations.
     Convention.ensureSparkRowAndBatchTypesRegistered()
-    ArrowJavaBatch.ensureRegistered()
-    ArrowNativeBatch.ensureRegistered()
-    VeloxBatch.ensureRegistered()
+    VeloxCarrierRowType.ensureRegistered()
+    ArrowJavaBatchType.ensureRegistered()
+    ArrowNativeBatchType.ensureRegistered()
+    VeloxBatchType.ensureRegistered()
 
     // Register columnar shuffle so can be considered when
     // `org.apache.spark.shuffle.GlutenShuffleManager` is set as Spark shuffle 
manager.
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 443ad147de..f7246605b8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.backendsapi.RuleApi
-import org.apache.gluten.columnarbatch.VeloxBatch
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
@@ -103,7 +102,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
-    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
 
     // Gluten columnar: Fallback policies.
     injector.injectFallbackPolicy(c => p => 
ExpandFallbackPolicy(c.caller.isAqe(), p))
@@ -119,7 +118,7 @@ object VeloxRuleApi {
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectFinal(
-      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatch)))
+      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatchType)))
     injector.injectFinal(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session))
     injector.injectFinal(_ => RemoveFallbackTagRule())
@@ -193,7 +192,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => 
HashAggregateIgnoreNullKeysRule.apply(c.session))
     injector.injectPostTransform(_ => CollectLimitTransformerRule())
     injector.injectPostTransform(_ => CollectTailTransformerRule())
-    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatchType))
     injector.injectPostTransform(c => RemoveTopmostColumnarToRow(c.session, 
c.caller.isAqe()))
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
@@ -202,7 +201,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
     injector.injectPostTransform(c => 
RemoveGlutenTableCacheColumnarToRow(c.session))
     injector.injectPostTransform(
-      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatch)))
+      c => PreventBatchTypeMismatchInTableCache(c.caller.isCache(), 
Set(VeloxBatchType)))
     injector.injectPostTransform(c => 
GlutenAutoAdjustStageResourceProfile(c.glutenConf, c.session))
     injector.injectPostTransform(c => GlutenFallbackReporter(c.glutenConf, 
c.session))
     injector.injectPostTransform(_ => RemoveFallbackTagRule())
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
index 0ab51772e1..49d5659996 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowNativeBatchType
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
 
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
+  extends ColumnarToColumnarExec(ArrowNativeBatchType, VeloxBatchType) {
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map {
       b =>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
index 3fb86a8aae..58a64d891f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.vectorized.ArrowWritableColumnVector
@@ -57,7 +57,7 @@ case class ColumnarRangeExec(
 ) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, 
outputAttributes, child) {
 
   override def batchType(): Convention.BatchType = {
-    ArrowJavaBatch
+    ArrowJavaBatchType
   }
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
similarity index 62%
copy from 
backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
copy to 
backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
index 0ab51772e1..e4a2c7554a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ArrowColumnarToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToCarrierRowExec.scala
@@ -16,21 +16,15 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.{VeloxBatch, VeloxColumnarBatches}
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowNativeBatch
+import org.apache.gluten.backendsapi.velox.{VeloxBatchType, 
VeloxCarrierRowType}
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.vectorized.ColumnarBatch
 
-case class ArrowColumnarToVeloxColumnarExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatch, VeloxBatch) {
-  override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
-    in.map {
-      b =>
-        val out = VeloxColumnarBatches.toVeloxBatch(b)
-        out
-    }
-  }
+case class VeloxColumnarToCarrierRowExec(override val child: SparkPlan)
+  extends ColumnarToCarrierRowExecBase {
+  override protected def fromBatchType(): Convention.BatchType = VeloxBatchType
+  override def rowType0(): Convention.RowType = VeloxCarrierRowType
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    ArrowColumnarToVeloxColumnarExec(child = newChild)
+    copy(child = newChild)
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index 7564500aae..5e9dd2f815 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.api.python
 
-import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.ArrowJavaBatchType
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.execution.ValidatablePlan
 import org.apache.gluten.extension.ValidationResult
@@ -213,7 +213,7 @@ case class ColumnarArrowEvalPythonExec(
   extends EvalPythonExec
   with ValidatablePlan {
 
-  override def batchType(): Convention.BatchType = ArrowJavaBatch
+  override def batchType(): Convention.BatchType = ArrowJavaBatchType
 
   override def rowType0(): Convention.RowType = Convention.RowType.None
 
@@ -234,7 +234,7 @@ case class ColumnarArrowEvalPythonExec(
   }
 
   override def requiredChildConvention(): Seq[ConventionReq] = List(
-    ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatch)))
+    ConventionReq.ofBatch(ConventionReq.BatchType.Is(ArrowJavaBatchType)))
 
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 1aacc1b954..ebccd6f5bb 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -16,13 +16,13 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatches
+import org.apache.gluten.backendsapi.arrow.ArrowBatchTypes
 import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
 trait BaseArrowScanExec extends GlutenPlan {
   final override def batchType(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
+    ArrowBatchTypes.ArrowJavaBatchType
   }
 
   final override def rowType0(): Convention.RowType = Convention.RowType.None
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index 37b075e862..79f4b918e1 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -16,201 +16,222 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backendsapi.velox.VeloxListenerApi
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
-import org.apache.gluten.columnarbatch.VeloxBatch
-import org.apache.gluten.execution.{ArrowColumnarToVeloxColumnarExec, 
LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
-import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
+import org.apache.gluten.backendsapi.velox.{VeloxBatchType, 
VeloxCarrierRowType, VeloxListenerApi}
+import org.apache.gluten.execution._
+import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatchType
 import org.apache.gluten.test.MockVeloxBackend
 
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec}
 import org.apache.spark.sql.test.SharedSparkSession
 
-class VeloxTransitionSuite extends SharedSparkSession {
-  import VeloxTransitionSuite._
+class VeloxTransitionSuite extends SharedSparkSession with TransitionSuiteBase 
{
+  import TransitionSuiteBase._
 
   private val api = new VeloxListenerApi()
 
   test("Vanilla C2R - outputs row") {
-    val in = BatchLeaf(VanillaBatch)
+    val in = BatchLeaf(VanillaBatchType)
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatch)))
+    assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatchType)))
   }
 
   test("Vanilla C2R - requires row input") {
-    val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(VanillaBatch))
+    val in = RowUnary(Convention.RowType.VanillaRowType, 
BatchLeaf(VanillaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
-      out == RowUnary(Convention.RowType.VanillaRow, 
ColumnarToRowExec(BatchLeaf(VanillaBatch))))
+      out == RowUnary(
+        Convention.RowType.VanillaRowType,
+        ColumnarToRowExec(BatchLeaf(VanillaBatchType))))
   }
 
   test("Vanilla R2C - requires vanilla input") {
-    val in = BatchUnary(VanillaBatch, RowLeaf(Convention.RowType.VanillaRow))
+    val in = BatchUnary(VanillaBatchType, 
RowLeaf(Convention.RowType.VanillaRowType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(
-      out == ColumnarToRowExec(
-        BatchUnary(VanillaBatch, 
RowToColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))))
+    assert(out == ColumnarToRowExec(
+      BatchUnary(VanillaBatchType, 
RowToColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))))
   }
 
   test("ArrowNative C2R - outputs row") {
-    val in = BatchLeaf(ArrowNativeBatch)
+    val in = BatchLeaf(ArrowNativeBatchType)
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(out == 
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
+    assert(out == 
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType))))
   }
 
   test("ArrowNative C2R - requires row input") {
-    val in = RowUnary(Convention.RowType.VanillaRow, 
BatchLeaf(ArrowNativeBatch))
+    val in = RowUnary(Convention.RowType.VanillaRowType, 
BatchLeaf(ArrowNativeBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == RowUnary(
-        Convention.RowType.VanillaRow,
-        ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
+        Convention.RowType.VanillaRowType,
+        ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType)))))
   }
 
   test("ArrowNative R2C - requires Arrow input") {
-    val in = BatchUnary(ArrowNativeBatch, 
RowLeaf(Convention.RowType.VanillaRow))
+    val in = BatchUnary(ArrowNativeBatchType, 
RowLeaf(Convention.RowType.VanillaRowType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         LoadArrowDataExec(BatchUnary(
-          ArrowNativeBatch,
-          RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow))))))
+          ArrowNativeBatchType,
+          
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType))))))
   }
 
   test("ArrowNative-to-Velox C2C") {
-    val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
+    val in = BatchUnary(VeloxBatchType, BatchLeaf(ArrowNativeBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     // No explicit transition needed for ArrowNative-to-Velox.
     // FIXME: Add explicit transitions.
     //  See https://github.com/apache/incubator-gluten/issues/7313.
     assert(
       out == VeloxColumnarToRowExec(
-        BatchUnary(VeloxBatch, 
ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatch)))))
+        BatchUnary(
+          VeloxBatchType,
+          ArrowColumnarToVeloxColumnarExec(BatchLeaf(ArrowNativeBatchType)))))
   }
 
   test("Velox-to-ArrowNative C2C") {
-    val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
+    val in = BatchUnary(ArrowNativeBatchType, BatchLeaf(VeloxBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
-        LoadArrowDataExec(BatchUnary(ArrowNativeBatch, 
BatchLeaf(VeloxBatch)))))
+        LoadArrowDataExec(BatchUnary(ArrowNativeBatchType, 
BatchLeaf(VeloxBatchType)))))
   }
 
   test("Vanilla-to-ArrowNative C2C") {
-    val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
+    val in = BatchUnary(ArrowNativeBatchType, BatchLeaf(VanillaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         LoadArrowDataExec(BatchUnary(
-          ArrowNativeBatch,
-          
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
+          ArrowNativeBatchType,
+          
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType)))))))
   }
 
   test("ArrowNative-to-Vanilla C2C") {
-    val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
+    val in = BatchUnary(VanillaBatchType, BatchLeaf(ArrowNativeBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
-        BatchUnary(VanillaBatch, 
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
+        BatchUnary(VanillaBatchType, 
LoadArrowDataExec(BatchLeaf(ArrowNativeBatchType)))))
   }
 
   test("ArrowJava C2R - outputs row") {
-    val in = BatchLeaf(ArrowJavaBatch)
+    val in = BatchLeaf(ArrowJavaBatchType)
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
+    assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatchType)))
   }
 
   test("ArrowJava C2R - requires row input") {
-    val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(ArrowJavaBatch))
+    val in = RowUnary(Convention.RowType.VanillaRowType, 
BatchLeaf(ArrowJavaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
-      out == RowUnary(Convention.RowType.VanillaRow, 
ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
+      out == RowUnary(
+        Convention.RowType.VanillaRowType,
+        ColumnarToRowExec(BatchLeaf(ArrowJavaBatchType))))
   }
 
   test("ArrowJava R2C - requires Arrow input") {
-    val in = BatchUnary(ArrowJavaBatch, RowLeaf(Convention.RowType.VanillaRow))
+    val in = BatchUnary(ArrowJavaBatchType, 
RowLeaf(Convention.RowType.VanillaRowType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         BatchUnary(
-          ArrowJavaBatch,
-          
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow))))))
+          ArrowJavaBatchType,
+          
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType))))))
   }
 
   test("ArrowJava-to-Velox C2C") {
-    val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
+    val in = BatchUnary(VeloxBatchType, BatchLeaf(ArrowJavaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
         BatchUnary(
-          VeloxBatch,
-          
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatch))))))
+          VeloxBatchType,
+          
ArrowColumnarToVeloxColumnarExec(OffloadArrowDataExec(BatchLeaf(ArrowJavaBatchType))))))
   }
 
   test("Velox-to-ArrowJava C2C") {
-    val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
+    val in = BatchUnary(ArrowJavaBatchType, BatchLeaf(VeloxBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
-        BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
+        BatchUnary(ArrowJavaBatchType, 
LoadArrowDataExec(BatchLeaf(VeloxBatchType)))))
   }
 
   test("Vanilla-to-ArrowJava C2C") {
-    val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
+    val in = BatchUnary(ArrowJavaBatchType, BatchLeaf(VanillaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
-      out == ColumnarToRowExec(
-        BatchUnary(
-          ArrowJavaBatch,
-          
LoadArrowDataExec(RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch)))))))
+      out == ColumnarToRowExec(BatchUnary(
+        ArrowJavaBatchType,
+        
LoadArrowDataExec(RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType)))))))
   }
 
   test("ArrowJava-to-Vanilla C2C") {
-    val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
+    val in = BatchUnary(VanillaBatchType, BatchLeaf(ArrowJavaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
BatchLeaf(ArrowJavaBatch))))
+    assert(out == ColumnarToRowExec(BatchUnary(VanillaBatchType, 
BatchLeaf(ArrowJavaBatchType))))
   }
 
   test("Velox C2R - outputs row") {
-    val in = BatchLeaf(VeloxBatch)
+    val in = BatchLeaf(VeloxBatchType)
     val out = BackendTransitions.insert(in, outputsColumnar = false)
-    assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
+    assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatchType)))
   }
 
   test("Velox C2R - requires row input") {
-    val in = RowUnary(Convention.RowType.VanillaRow, BatchLeaf(VeloxBatch))
+    val in = RowUnary(Convention.RowType.VanillaRowType, 
BatchLeaf(VeloxBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
-      out == RowUnary(Convention.RowType.VanillaRow, 
VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
+      out == RowUnary(
+        Convention.RowType.VanillaRowType,
+        VeloxColumnarToRowExec(BatchLeaf(VeloxBatchType))))
   }
 
   test("Velox R2C - outputs Velox") {
-    val in = RowLeaf(Convention.RowType.VanillaRow)
+    val in = RowLeaf(Convention.RowType.VanillaRowType)
     val out = BackendTransitions.insert(in, outputsColumnar = true)
-    assert(out == 
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))
+    assert(out == 
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))
   }
 
   test("Velox R2C - requires Velox input") {
-    val in = BatchUnary(VeloxBatch, RowLeaf(Convention.RowType.VanillaRow))
+    val in = BatchUnary(VeloxBatchType, 
RowLeaf(Convention.RowType.VanillaRowType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
-        BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRow)))))
+        BatchUnary(
+          VeloxBatchType,
+          RowToVeloxColumnarExec(RowLeaf(Convention.RowType.VanillaRowType)))))
   }
 
   test("Vanilla-to-Velox C2C") {
-    val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
+    val in = BatchUnary(VeloxBatchType, BatchLeaf(VanillaBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
-        BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
+        BatchUnary(
+          VeloxBatchType,
+          
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatchType))))))
   }
 
   test("Velox-to-Vanilla C2C") {
-    val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
+    val in = BatchUnary(VanillaBatchType, BatchLeaf(VeloxBatchType))
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
+    assert(
+      out == ColumnarToRowExec(
+        BatchUnary(VanillaBatchType, 
LoadArrowDataExec(BatchLeaf(VeloxBatchType)))))
+  }
+
+  test("Velox-to-CarrierRow C2R") {
+    val in =
+      RowToRow(VeloxCarrierRowType, Convention.RowType.VanillaRowType, 
BatchLeaf(VeloxBatchType))
     val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
-      out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
+      out == RowToRow(
+        VeloxCarrierRowType,
+        Convention.RowType.VanillaRowType,
+        VeloxColumnarToCarrierRowExec(BatchLeaf(VeloxBatchType))))
   }
 
   override protected def beforeAll(): Unit = {
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
similarity index 78%
rename from 
gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
rename to 
gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
index c23d6eea79..a0cbb7b912 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/columnarbatch/ArrowBatches.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/backendsapi/arrow/ArrowBatchTypes.scala
@@ -14,27 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.columnarbatch
+package org.apache.gluten.backendsapi.arrow
 
 import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec}
 import org.apache.gluten.extension.columnar.transition.{Convention, Transition}
-import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatch
+import 
org.apache.gluten.extension.columnar.transition.Convention.BatchType.VanillaBatchType
 
-object ArrowBatches {
+object ArrowBatchTypes {
 
   /**
    * ArrowJavaBatch stands for Gluten's Java Arrow-based columnar batch 
implementation.
    *
    * ArrowJavaBatch should have 
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
    * populated in it. ArrowJavaBatch can be offloaded to ArrowNativeBatch 
through API in
-   * [[ColumnarBatches]].
+   * [[org.apache.gluten.columnarbatch.ColumnarBatches]].
    *
    * ArrowJavaBatch is compatible with vanilla batch since it provides valid 
#get<type>(...)
    * implementations.
    */
-  object ArrowJavaBatch extends Convention.BatchType {
+  object ArrowJavaBatchType extends Convention.BatchType {
     override protected def registerTransitions(): Unit = {
-      toBatch(VanillaBatch, Transition.empty)
+      toBatch(VanillaBatchType, Transition.empty)
     }
   }
 
@@ -43,12 +43,12 @@ object ArrowBatches {
    *
    * ArrowNativeBatch should have 
[[org.apache.gluten.columnarbatch.IndicatorVector]] set as the
    * first vector. ArrowNativeBatch can be loaded to ArrowJavaBatch through 
API in
-   * [[ColumnarBatches]].
+   * [[org.apache.gluten.columnarbatch.ColumnarBatches]].
    */
-  object ArrowNativeBatch extends Convention.BatchType {
+  object ArrowNativeBatchType extends Convention.BatchType {
     override protected def registerTransitions(): Unit = {
-      fromBatch(ArrowJavaBatch, OffloadArrowDataExec.apply)
-      toBatch(ArrowJavaBatch, LoadArrowDataExec.apply)
+      fromBatch(ArrowJavaBatchType, OffloadArrowDataExec.apply)
+      toBatch(ArrowJavaBatchType, LoadArrowDataExec.apply)
     }
   }
 }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
index 36f3c48d4e..1f1750113a 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/LoadArrowDataExec.scala
@@ -16,16 +16,16 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
+import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
-/** Converts input data with batch type [[ArrowNativeBatch]] to type 
[[ArrowJavaBatch]]. */
+/** Converts input data with batch type [[ArrowNativeBatchType]] to type 
[[ArrowJavaBatchType]]. */
 case class LoadArrowDataExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowNativeBatch, ArrowJavaBatch) {
+  extends ColumnarToColumnarExec(ArrowNativeBatchType, ArrowJavaBatchType) {
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map(b => ColumnarBatches.load(ArrowBufferAllocators.contextInstance, b))
   }
diff --git 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
index 7b2184a240..6e548adbf6 100644
--- 
a/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
+++ 
b/gluten-arrow/src/main/scala/org/apache/gluten/execution/OffloadArrowDataExec.scala
@@ -16,16 +16,16 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, 
ArrowNativeBatch}
+import 
org.apache.gluten.backendsapi.arrow.ArrowBatchTypes.{ArrowJavaBatchType, 
ArrowNativeBatchType}
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
-/** Converts input data with batch type [[ArrowJavaBatch]] to type 
[[ArrowNativeBatch]]. */
+/** Converts input data with batch type [[ArrowJavaBatchType]] to type 
[[ArrowNativeBatchType]]. */
 case class OffloadArrowDataExec(override val child: SparkPlan)
-  extends ColumnarToColumnarExec(ArrowJavaBatch, ArrowNativeBatch) {
+  extends ColumnarToColumnarExec(ArrowJavaBatchType, ArrowNativeBatchType) {
   override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] = {
     in.map(b => ColumnarBatches.offload(ArrowBufferAllocators.contextInstance, 
b))
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index efecb48102..a4058e5c7b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -78,7 +78,7 @@ object GlutenPlanModel {
 
     override val rowType0: Convention.RowType = {
       val out = req.req.requiredRowType match {
-        case ConventionReq.RowType.Any => Convention.RowType.VanillaRow
+        case ConventionReq.RowType.Any => Convention.RowType.VanillaRowType
         case ConventionReq.RowType.Is(r) => r
       }
       out
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index e8aa8d2ad6..75ca8a775b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -33,9 +33,9 @@ sealed trait Convention {
 object Convention {
   def ensureSparkRowAndBatchTypesRegistered(): Unit = {
     RowType.None.ensureRegistered()
-    RowType.VanillaRow.ensureRegistered()
+    RowType.VanillaRowType.ensureRegistered()
     BatchType.None.ensureRegistered()
-    BatchType.VanillaBatch.ensureRegistered()
+    BatchType.VanillaBatchType.ensureRegistered()
   }
 
   implicit class ConventionOps(val conv: Convention) extends AnyVal {
@@ -130,7 +130,7 @@ object Convention {
     final case object None extends RowType {
       override protected[this] def registerTransitions(): Unit = {}
     }
-    final case object VanillaRow extends RowType {
+    final case object VanillaRowType extends RowType {
       override protected[this] def registerTransitions(): Unit = {}
     }
   }
@@ -151,10 +151,10 @@ object Convention {
     final case object None extends BatchType {
       override protected[this] def registerTransitions(): Unit = {}
     }
-    final case object VanillaBatch extends BatchType {
+    final case object VanillaBatchType extends BatchType {
       override protected[this] def registerTransitions(): Unit = {
-        fromRow(RowType.VanillaRow, RowToColumnarExec.apply)
-        toRow(RowType.VanillaRow, ColumnarToRowExec.apply)
+        fromRow(RowType.VanillaRowType, RowToColumnarExec.apply)
+        toRow(RowType.VanillaRowType, ColumnarToRowExec.apply)
       }
     }
   }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index f7e837b25d..83711e71f6 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -99,7 +99,7 @@ object ConventionFunc {
         case k: Convention.KnownRowType =>
           k.rowType()
         case _ if SparkShimLoader.getSparkShims.supportsRowBased(plan) =>
-          Convention.RowType.VanillaRow
+          Convention.RowType.VanillaRowType
         case _ =>
           Convention.RowType.None
       }
@@ -131,7 +131,7 @@ object ConventionFunc {
         case k: Convention.KnownBatchType =>
           k.batchType()
         case _ if plan.supportsColumnar =>
-          Convention.BatchType.VanillaBatch
+          Convention.BatchType.VanillaBatchType
         case _ =>
           Convention.BatchType.None
       }
@@ -166,13 +166,13 @@ object ConventionFunc {
       case RowToColumnarLike(_) =>
         Seq(
           ConventionReq.of(
-            ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+            ConventionReq.RowType.Is(Convention.RowType.VanillaRowType),
             ConventionReq.BatchType.Any))
       case ColumnarToRowExec(_) =>
         Seq(
           ConventionReq.of(
             ConventionReq.RowType.Any,
-            ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch)))
+            ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatchType)))
       case write: DataWritingCommandExec if 
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
         // To align with 
ApplyColumnarRulesAndInsertTransitions#insertTransitions
         Seq(ConventionReq.any)
@@ -183,7 +183,7 @@ object ConventionFunc {
         Seq.tabulate(u.children.size)(
           _ =>
             ConventionReq.of(
-              ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+              ConventionReq.RowType.Is(Convention.RowType.VanillaRowType),
               ConventionReq.BatchType.Any))
       case other =>
         // In the normal case, children's convention should follow parent 
node's convention.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index 0f20656537..741fe04e65 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -52,8 +52,8 @@ object ConventionReq {
   ) extends ConventionReq
 
   val any: ConventionReq = of(RowType.Any, BatchType.Any)
-  val vanillaRow: ConventionReq = 
ofRow(RowType.Is(Convention.RowType.VanillaRow))
-  val vanillaBatch: ConventionReq = 
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
+  val vanillaRow: ConventionReq = 
ofRow(RowType.Is(Convention.RowType.VanillaRowType))
+  val vanillaBatch: ConventionReq = 
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatchType))
 
   def get(plan: SparkPlan): Seq[ConventionReq] = 
ConventionFunc.create().conventionReqOf(plan)
   def of(rowType: RowType, batchType: BatchType): ConventionReq = 
Impl(rowType, batchType)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index e6bde44704..3e55b0e11a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -86,7 +86,7 @@ object RemoveTransitions extends Rule[SparkPlan] {
 
 object Transitions {
   def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
-    InsertTransitions.create(outputsColumnar, 
BatchType.VanillaBatch).apply(plan)
+    InsertTransitions.create(outputsColumnar, 
BatchType.VanillaBatchType).apply(plan)
   }
 
   def toRowPlan(plan: SparkPlan): SparkPlan = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
new file mode 100644
index 0000000000..979ddd6a71
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToCarrierRowExecBase.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.batchcarrier.{BatchCarrierRow, 
PlaceholderRow, TerminalRow}
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.ColumnarToRowTransition
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+/** The operator that converts columnar batches to [[BatchCarrierRow]]s. */
+abstract class ColumnarToCarrierRowExecBase extends ColumnarToRowTransition 
with GlutenPlan {
+
+  override def batchType(): Convention.BatchType = Convention.BatchType.None
+
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType())))
+  }
+
+  protected def fromBatchType(): Convention.BatchType
+
+  override lazy val metrics: Map[String, SQLMetric] =
+    Map(
+      "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of 
input batches"),
+      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows")
+    )
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val numInputBatches = longMetric("numInputBatches")
+    val numOutputRows = longMetric("numOutputRows")
+
+    child.executeColumnar().mapPartitions {
+      itr =>
+        itr.flatMap {
+          b: ColumnarBatch =>
+            numInputBatches += 1
+            val numRows = b.numRows()
+            if (numRows == 0) {
+              Nil
+            } else {
+              val carrierRows = new Array[BatchCarrierRow](numRows)
+              for (i <- 0 until numRows - 1) {
+                carrierRows(i) = new PlaceholderRow()
+              }
+              carrierRows(numRows - 1) = new TerminalRow(b)
+              numOutputRows += carrierRows.length
+              carrierRows
+            }
+        }
+    }
+  }
+
+  override def output: Seq[Attribute] = child.output
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index 5beaf49572..15103bbd84 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -42,7 +42,7 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
 
   override def batchType(): Convention.BatchType = Convention.BatchType.None
 
-  override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+  override def rowType0(): Convention.RowType = 
Convention.RowType.VanillaRowType
 
   override def requiredChildConvention(): Seq[ConventionReq] = {
     List(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
new file mode 100644
index 0000000000..21ef26405f
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/batchcarrier/BatchCarrierRow.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.batchcarrier
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
+import org.apache.spark.sql.types.{DataType, Decimal}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}
+
+/**
+ * An internal-row abstraction that is designed for columnar-based 
computations to bypass Spark's
+ * row-based APIs / SPIs with zero copy.
+ *
+ * Two implementations are pre-defined:
+ *
+ *   - TerminalRow
+ *   - PlaceholderRow
+ *
+ * To bypass Spark's row APIs, one single columnar batch will be converted to 
a series of
+ * PassiveRows, followed by one SentinelRow that actually wraps that columnar 
batch. The total
+ * number of PlaceholderRows + the TerminalRow equates to size of the original 
columnar batch.
+ */
+sealed abstract class BatchCarrierRow extends InternalRow {
+  override def numFields: Int = throw unsupported()
+
+  override def setNullAt(i: Int): Unit = throw unsupported()
+
+  override def update(i: Int, value: Any): Unit = throw unsupported()
+
+  override def copy(): InternalRow = throw unsupported()
+
+  override def isNullAt(ordinal: Int): Boolean = throw unsupported()
+
+  override def getBoolean(ordinal: Int): Boolean = throw unsupported()
+
+  override def getByte(ordinal: Int): Byte = throw unsupported()
+
+  override def getShort(ordinal: Int): Short = throw unsupported()
+
+  override def getInt(ordinal: Int): Int = throw unsupported()
+
+  override def getLong(ordinal: Int): Long = throw unsupported()
+
+  override def getFloat(ordinal: Int): Float = throw unsupported()
+
+  override def getDouble(ordinal: Int): Double = throw unsupported()
+
+  override def getDecimal(ordinal: Int, precision: Int, scale: Int): Decimal = 
throw unsupported()
+
+  override def getUTF8String(ordinal: Int): UTF8String = throw unsupported()
+
+  override def getBinary(ordinal: Int): Array[Byte] = throw unsupported()
+
+  override def getInterval(ordinal: Int): CalendarInterval = throw 
unsupported()
+
+  override def getStruct(ordinal: Int, numFields: Int): InternalRow = throw 
unsupported()
+
+  override def getArray(ordinal: Int): ArrayData = throw unsupported()
+
+  override def getMap(ordinal: Int): MapData = throw unsupported()
+
+  override def get(ordinal: Int, dataType: DataType): AnyRef = throw 
unsupported()
+
+  private def unsupported() = {
+    new UnsupportedOperationException(
+      "Underlying columnar data is inaccessible from BatchCarrierRow")
+  }
+}
+
+/**
+ * A [[BatchCarrierRow]] implementation that is backed by a
+ * [[org.apache.spark.sql.vectorized.ColumnarBatch]].
+ */
+class TerminalRow(val batch: ColumnarBatch) extends BatchCarrierRow
+
+/**
+ * A [[BatchCarrierRow]] implementation with no data. The only function of 
this row implementation
+ * is to provide row metadata to the receiver and to support correct 
row-counting.
+ */
+class PlaceholderRow extends BatchCarrierRow
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
index a8e1524fc9..8c01c0bde7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/LegacyCoster.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.cost
 
+import org.apache.gluten.execution.ColumnarToCarrierRowExecBase
 import 
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike, 
ColumnarToRowLike, RowToColumnarLike}
 import org.apache.gluten.utils.PlanUtil
 
@@ -33,7 +34,8 @@ object LegacyCoster extends LongCoster {
   // much as possible.
   private def selfCostOf0(node: SparkPlan): Long = {
     node match {
-      case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0
+      case ColumnarWriteFilesExec.OnNoopLeafPath(_) => 0L
+      case _: ColumnarToCarrierRowExecBase => 0L
       case ColumnarToRowLike(_) => 10L
       case RowToColumnarLike(_) => 10L
       case ColumnarToColumnarLike(_) => 5L
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
index caee696df6..ba26528499 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/cost/RoughCoster.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.cost
 
-import org.apache.gluten.execution.RowToColumnarExecBase
+import org.apache.gluten.execution.{ColumnarToCarrierRowExecBase, 
RowToColumnarExecBase}
 import 
org.apache.gluten.extension.columnar.transition.{ColumnarToColumnarLike, 
ColumnarToRowLike, RowToColumnarLike}
 import org.apache.gluten.utils.PlanUtil
 
@@ -42,6 +42,7 @@ object RoughCoster extends LongCoster {
         // Avoid moving computation back to native when transition has complex 
types in schema.
         // Such transitions are observed to be extremely expensive as of now.
         Long.MaxValue
+      case _: ColumnarToCarrierRowExecBase => 0L
       case ColumnarToRowLike(_) => 10L
       case RowToColumnarLike(_) => 10L
       case ColumnarToColumnarLike(_) => 5L
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index f2b1bd0900..fd5549f8cf 100644
--- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -42,7 +42,7 @@ object PlanUtil {
   }
 
   def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
-    Convention.get(plan).batchType == Convention.BatchType.VanillaBatch
+    Convention.get(plan).batchType == Convention.BatchType.VanillaBatchType
   }
 
   def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
@@ -50,6 +50,6 @@ object PlanUtil {
   }
 
   private def isGlutenBatchType(batchType: Convention.BatchType) = {
-    batchType != Convention.BatchType.None && batchType != 
Convention.BatchType.VanillaBatch
+    batchType != Convention.BatchType.None && batchType != 
Convention.BatchType.VanillaBatchType
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 87a2f6d9c2..998f4f86b5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -71,7 +71,7 @@ abstract class ColumnarWriteFilesExec protected (
    */
   override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
   override def rowType0(): RowType = {
-    RowType.VanillaRow
+    RowType.VanillaRowType
   }
 
   override def output: Seq[Attribute] = Seq.empty
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index d58026b123..21ad159680 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -61,7 +61,7 @@ case class FakeRowAdaptor(child: SparkPlan)
 
   override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
 
-  override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+  override def rowType0(): Convention.RowType = 
Convention.RowType.VanillaRowType
 
   override protected def doExecute(): RDD[InternalRow] = {
     doExecuteColumnar().map(cb => new FakeRowEnhancement(cb))
@@ -86,8 +86,6 @@ case class FakeRowAdaptor(child: SparkPlan)
     copy(child = newChild)
 }
 
-case class MATERIALIZE_TAG()
-
 object GlutenWriterColumnarRules {
   // TODO: support ctas in Spark3.4, see 
https://github.com/apache/spark/pull/39220
   // TODO: support dynamic partition and bucket write
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
index 085f9d551b..81068c994f 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/MiscColumnarRulesSuite.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar
 import org.apache.gluten.component.WithDummyBackend
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.PreventBatchTypeMismatchInTableCache
 import org.apache.gluten.extension.columnar.transition.Convention
-import 
org.apache.gluten.extension.columnar.transition.TransitionSuite.BatchToRow
+import 
org.apache.gluten.extension.columnar.transition.TransitionSuiteBase.BatchToRow
 
 import org.apache.spark.sql.test.SharedSparkSession
 
@@ -28,8 +28,8 @@ class MiscColumnarRulesSuite extends SharedSparkSession with 
WithDummyBackend {
   test("Fix ColumnarToRowRemovalGuard not able to be copied") {
     val dummyPlan =
       BatchToRow(
-        Convention.BatchType.VanillaBatch,
-        Convention.RowType.VanillaRow,
+        Convention.BatchType.VanillaBatchType,
+        Convention.RowType.VanillaRowType,
         spark.range(1).queryExecution.sparkPlan)
     val cloned =
       PreventBatchTypeMismatchInTableCache(isCalledByTableCachePlaning = true, 
Set.empty)
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index e4e1e88372..716451e2f2 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -18,18 +18,14 @@ package org.apache.gluten.extension.columnar.transition
 
 import org.apache.gluten.component.WithDummyBackend
 import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
 
 import org.apache.spark.SparkConf
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.test.SharedSparkSession
-import org.apache.spark.sql.vectorized.ColumnarBatch
 
-class TransitionSuite extends SharedSparkSession with WithDummyBackend {
+class TransitionSuite extends SharedSparkSession with TransitionSuiteBase with 
WithDummyBackend {
   import TransitionSuite._
+  import TransitionSuiteBase._
 
   override protected def sparkConf: SparkConf =
     super.sparkConf
@@ -120,6 +116,8 @@ class TransitionSuite extends SharedSparkSession with 
WithDummyBackend {
 }
 
 object TransitionSuite extends TransitionSuiteBase {
+  import TransitionSuiteBase._
+
   private def insertTransitions(plan: SparkPlan, req: ConventionReq): 
SparkPlan = {
     InsertTransitions(req).apply(plan)
   }
@@ -161,54 +159,4 @@ object TransitionSuite extends TransitionSuiteBase {
       toBatch(BatchTypeB, RowToBatch(this, BatchTypeB, _))
     }
   }
-
-  case class RowToBatch(
-      fromRowType: Convention.RowType,
-      toBatchType: Convention.BatchType,
-      override val child: SparkPlan)
-    extends RowToColumnarTransition
-    with GlutenPlan {
-    override def batchType(): Convention.BatchType = toBatchType
-    override def rowType0(): Convention.RowType = Convention.RowType.None
-    override def requiredChildConvention(): Seq[ConventionReq] = {
-      List(ConventionReq.ofRow(ConventionReq.RowType.Is(fromRowType)))
-    }
-
-    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
-      copy(child = newChild)
-    override protected def doExecute(): RDD[InternalRow] =
-      throw new UnsupportedOperationException()
-    override def output: Seq[Attribute] = child.output
-  }
-
-  case class BatchToRow(
-      fromBatchType: Convention.BatchType,
-      toRowType: Convention.RowType,
-      override val child: SparkPlan)
-    extends ColumnarToRowTransition
-    with GlutenPlan {
-    override def batchType(): Convention.BatchType = Convention.BatchType.None
-    override def rowType0(): Convention.RowType = toRowType
-    override def requiredChildConvention(): Seq[ConventionReq] = {
-      List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType)))
-    }
-
-    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
-      copy(child = newChild)
-    override protected def doExecute(): RDD[InternalRow] =
-      throw new UnsupportedOperationException()
-    override def output: Seq[Attribute] = child.output
-  }
-
-  case class BatchToBatch(
-      from: Convention.BatchType,
-      to: Convention.BatchType,
-      override val child: SparkPlan)
-    extends ColumnarToColumnarExec(from, to) {
-    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
-      copy(child = newChild)
-    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
-    override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] =
-      throw new UnsupportedOperationException()
-  }
 }
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index e67d03d9ba..3724008509 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,14 +16,17 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{BinaryExecNode, LeafExecNode, 
SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.{BinaryExecNode, 
ColumnarToRowTransition, LeafExecNode, RowToColumnarTransition, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
-trait TransitionSuiteBase {
+trait TransitionSuiteBase {}
+
+object TransitionSuiteBase {
   case class BatchLeaf(override val batchType: Convention.BatchType)
     extends LeafExecNode
     with GlutenPlan {
@@ -104,4 +107,70 @@ trait TransitionSuiteBase {
     override def output: Seq[Attribute] = left.output ++ right.output
   }
 
+  case class RowToBatch(
+      fromRowType: Convention.RowType,
+      toBatchType: Convention.BatchType,
+      override val child: SparkPlan)
+    extends RowToColumnarTransition
+    with GlutenPlan {
+    override def batchType(): Convention.BatchType = toBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
+    override def requiredChildConvention(): Seq[ConventionReq] = {
+      List(ConventionReq.ofRow(ConventionReq.RowType.Is(fromRowType)))
+    }
+
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] =
+      throw new UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchToRow(
+      fromBatchType: Convention.BatchType,
+      toRowType: Convention.RowType,
+      override val child: SparkPlan)
+    extends ColumnarToRowTransition
+    with GlutenPlan {
+    override def batchType(): Convention.BatchType = Convention.BatchType.None
+    override def rowType0(): Convention.RowType = toRowType
+    override def requiredChildConvention(): Seq[ConventionReq] = {
+      List(ConventionReq.ofBatch(ConventionReq.BatchType.Is(fromBatchType)))
+    }
+
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] =
+      throw new UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchToBatch(
+      from: Convention.BatchType,
+      to: Convention.BatchType,
+      override val child: SparkPlan)
+    extends ColumnarToColumnarExec(from, to) {
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch] =
+      throw new UnsupportedOperationException()
+  }
+
+  case class RowToRow(
+      from: Convention.RowType,
+      to: Convention.RowType,
+      override val child: SparkPlan)
+    extends UnaryExecNode
+    with GlutenPlan {
+    override def batchType(): Convention.BatchType = Convention.BatchType.None
+    override def rowType0(): Convention.RowType = to
+    override def requiredChildConvention(): Seq[ConventionReq] = {
+      List(ConventionReq.ofRow(ConventionReq.RowType.Is(from)))
+    }
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+  }
 }
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
index 679a69334a..81b4c01b46 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/datasources/FakeRow.scala
@@ -24,6 +24,8 @@ import org.apache.spark.unsafe.types.{CalendarInterval, 
UTF8String}
 
 trait IFakeRowAdaptor
 
+/** Deprecated: Moving to new API `BatchCarrierRow` in `gluten-substrait`. */
+@deprecated
 class FakeRow(@transient var batch: ColumnarBatch) extends InternalRow {
   override def numFields: Int = throw new UnsupportedOperationException()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to