This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 10182a52d [CORE] Rework planner C2R / R2C code with new transition 
facilities (#5767)
10182a52d is described below

commit 10182a52d409cb659169cbfe1d7f1869d9205dce
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon May 20 15:46:39 2024 +0800

    [CORE] Rework planner C2R / R2C code with new transition facilities (#5767)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  24 +--
 .../gluten/backendsapi/clickhouse/package.scala    |  37 ++++
 ...nClickHouseTPCDSParquetGraceHashJoinSuite.scala |   2 +-
 ...nClickHouseTPCDSParquetSortMergeJoinSuite.scala |   2 +-
 .../GlutenClickHouseTPCDSParquetSuite.scala        |   2 +-
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  50 ++---
 .../apache/gluten/backendsapi/velox/package.scala  |  43 ++++
 .../datasource/v2/ArrowBatchScanExec.scala         |   6 +
 .../python/ColumnarArrowEvalPythonExec.scala       |   2 +-
 .../sql/execution/ArrowFileSourceScanExec.scala    |   6 +
 .../sql/execution/VeloxParquetWriteSuite.scala     |   2 +-
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  21 +-
 .../gluten/expression/ExpressionConverter.scala    |  19 +-
 .../gluten/extension/ColumnarOverrides.scala       |   3 +-
 .../org/apache/gluten/extension/GlutenPlan.scala   |  17 +-
 .../extension/columnar/ColumnarTransitions.scala   | 109 ----------
 .../extension/columnar/ExpandFallbackPolicy.scala  |  15 +-
 .../extension/columnar/MiscColumnarRules.scala     |  32 +--
 .../columnar/enumerated/EnumeratedApplier.scala    |   8 +-
 .../columnar/enumerated/PushFilterToScan.scala     |   6 +
 .../columnar/heuristic/HeuristicApplier.scala      |   8 +-
 .../extension/columnar/transition/Convention.scala | 113 ++++++++++
 .../columnar/transition/ConventionFunc.scala       | 115 ++++++++++
 .../columnar/transition/ConventionReq.scala        |  54 +++++
 .../extension/columnar/transition/Transition.scala | 186 ++++++++++++++++
 .../columnar/transition/Transitions.scala          | 164 +++++++++++++++
 .../extension/columnar/transition/package.scala    |  58 +++++
 .../gluten/planner/cost/GlutenCostModel.scala      |   7 +-
 .../gluten/planner/property/Convention.scala       |  26 +--
 .../scala/org/apache/gluten/utils/PlanUtil.scala   |  46 +---
 .../ColumnarCollapseTransformStages.scala          |  15 +-
 .../datasources/GlutenWriterColumnarRules.scala    |   3 +-
 .../execution/WholeStageTransformerSuite.scala     |   3 +-
 .../columnar/transition/TransitionSuite.scala      | 234 +++++++++++++++++++++
 .../org/apache/gluten/test}/FallbackUtil.scala     |   8 +-
 .../apache/gluten/columnarbatch/ArrowBatch.scala   |  41 ++++
 gluten-ut/pom.xml                                  |   7 +
 .../sql/execution/FallbackStrategiesSuite.scala    |   2 +-
 .../benchmarks/ParquetReadBenchmark.scala          |   5 +-
 .../spark/sql/GlutenStringFunctionsSuite.scala     |   2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../benchmarks/ParquetReadBenchmark.scala          |   5 +-
 .../spark/sql/GlutenStringFunctionsSuite.scala     |   2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../benchmarks/ParquetReadBenchmark.scala          |   4 +-
 .../spark/sql/GlutenStringFunctionsSuite.scala     |   2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |   3 +-
 .../benchmarks/ParquetReadBenchmark.scala          |   5 +-
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   2 +
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |   6 +
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |   8 +-
 51 files changed, 1212 insertions(+), 334 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 465041621..cb706d817 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -25,6 +25,7 @@ import 
org.apache.gluten.expression.ConverterUtils.FunctionConfig
 import org.apache.gluten.extension.{CountDistinctWithoutExpand, 
FallbackBroadcastHashJoin, FallbackBroadcastHashJoinPrepQueryStage, 
RewriteToDateExpresstionRule}
 import org.apache.gluten.extension.columnar.AddTransformHintRule
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 import org.apache.gluten.utils.CHJoinValidateUtil
@@ -71,6 +72,9 @@ import scala.collection.mutable.ArrayBuffer
 
 class CHSparkPlanExecApi extends SparkPlanExecApi {
 
+  /** The columnar-batch type this backend is using. */
+  override def batchType: Convention.BatchType = CHBatch
+
   /** Transform GetArrayItem to Substrait. */
   override def genGetArrayItemExpressionNode(
       substraitExprName: String,
@@ -89,26 +93,6 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
       ConverterUtils.getTypeNode(original.dataType, original.nullable))
   }
 
-  /**
-   * Generate ColumnarToRowExecBase.
-   *
-   * @param child
-   * @return
-   */
-  override def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase = 
{
-    CHColumnarToRowExec(child)
-  }
-
-  /**
-   * Generate RowToColumnarExec.
-   *
-   * @param child
-   * @return
-   */
-  override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase = 
{
-    RowToCHNativeColumnarExec(child)
-  }
-
   override def genProjectExecTransformer(
       projectList: Seq[NamedExpression],
       child: SparkPlan): ProjectExecTransformer = {
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
new file mode 100644
index 000000000..8704fac7b
--- /dev/null
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/package.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi
+
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.execution.{CHColumnarToRowExec, 
RowToCHNativeColumnarExec, SparkPlan}
+
+package object clickhouse {
+  case object CHBatch extends Convention.BatchType {
+    fromRow(
+      () =>
+        (plan: SparkPlan) => {
+          RowToCHNativeColumnarExec(plan)
+        })
+
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          CHColumnarToRowExec(plan)
+        })
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
index 0fe04ea2a..0b7ad9a6d 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetGraceHashJoinSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Not}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
index bbedcda18..b1b9841a3 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSortMergeJoinSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.SparkConf
 
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
index 27efba648..a63e47888 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCDSParquetSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, 
Not}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index cf7d38d62..dc6bafdea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -21,12 +21,13 @@ import org.apache.gluten.backendsapi.SparkPlanExecApi
 import org.apache.gluten.datasource.ArrowConvertorRule
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
-import org.apache.gluten.execution.datasource.v2.ArrowBatchScanExec
 import org.apache.gluten.expression._
 import org.apache.gluten.expression.ConverterUtils.FunctionConfig
 import org.apache.gluten.expression.aggregate.{HLLAdapter, 
VeloxBloomFilterAggregate, VeloxCollectList, VeloxCollectSet}
-import org.apache.gluten.extension.{ArrowScanReplaceRule, 
BloomFilterMightContainJointRewriteRule, CollectRewriteRule, 
FlushableHashAggregateRule, HLLRewriteRule}
+import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar.TransformHints
+import org.apache.gluten.extension.columnar.transition.Convention
+import 
org.apache.gluten.extension.columnar.transition.ConventionFunc.BatchOverride
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, IfThenNode}
 import org.apache.gluten.vectorized.{ColumnarBatchSerializer, 
ColumnarBatchSerializeResult}
@@ -50,6 +51,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BuildSideRelation, 
HashedRelationBroadcastMode}
@@ -73,6 +75,22 @@ import scala.collection.mutable.ListBuffer
 
 class VeloxSparkPlanExecApi extends SparkPlanExecApi {
 
+  /** The columnar-batch type this backend is using. */
+  override def batchType: Convention.BatchType = {
+    VeloxBatch
+  }
+
+  /**
+   * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
+   * determine the convention (its row-based processing / columnar-batch 
processing support) of a
+   * plan with a user-defined function that accepts a plan then returns batch 
type it outputs.
+   */
+  override def batchTypeFunc(): BatchOverride = {
+    case i: InMemoryTableScanExec
+        if 
i.relation.cacheBuilder.serializer.isInstanceOf[ColumnarCachedBatchSerializer] 
=>
+      VeloxBatch
+  }
+
   /**
    * Transform GetArrayItem to Substrait.
    *
@@ -275,28 +293,6 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
     GenericExpressionTransformer(substraitExprName, children, expr)
   }
 
-  /**
-   * * Plans.
-   */
-
-  /**
-   * Generate ColumnarToRowExecBase.
-   *
-   * @param child
-   * @return
-   */
-  override def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase =
-    VeloxColumnarToRowExec(child)
-
-  /**
-   * Generate RowToColumnarExec.
-   *
-   * @param child
-   * @return
-   */
-  override def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase =
-    RowToVeloxColumnarExec(child)
-
   /**
    * Generate FilterExecTransformer.
    *
@@ -857,10 +853,4 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       case other => other
     }
   }
-
-  override def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): 
Boolean = plan match {
-    case _: ArrowFileSourceScanExec => true
-    case _: ArrowBatchScanExec => true
-    case _ => false
-  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
new file mode 100644
index 000000000..8ab68b7fe
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/package.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.backendsapi
+
+import org.apache.gluten.columnarbatch.ArrowBatch
+import org.apache.gluten.execution.{RowToVeloxColumnarExec, 
VeloxColumnarToRowExec}
+import org.apache.gluten.extension.columnar.transition.{Convention, 
TransitionDef}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+package object velox {
+  case object VeloxBatch extends Convention.BatchType {
+    fromRow(
+      () =>
+        (plan: SparkPlan) => {
+          RowToVeloxColumnarExec(plan)
+        })
+
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          VeloxColumnarToRowExec(plan)
+        })
+
+    // Velox batch is considered one-way compatible with Arrow batch.
+    // This is practically achieved by utilizing C++ API 
VeloxColumnarBatch::from at runtime.
+    fromBatch(ArrowBatch, TransitionDef.empty)
+  }
+}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
index 3c1c53820..ee0acbf3f 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/datasource/v2/ArrowBatchScanExec.scala
@@ -16,7 +16,9 @@
  */
 package org.apache.gluten.execution.datasource.v2
 
+import org.apache.gluten.columnarbatch.ArrowBatch
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -31,6 +33,10 @@ case class ArrowBatchScanExec(original: BatchScanExec)
 
   @transient lazy val batch: Batch = original.batch
 
+  override protected def batchType0(): Convention.BatchType = {
+    ArrowBatch
+  }
+
   override lazy val readerFactory: PartitionReaderFactory = 
original.readerFactory
 
   override lazy val inputRDD: RDD[InternalRow] = original.inputRDD
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
index d3112c974..fd8dfc25b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/python/ColumnarArrowEvalPythonExec.scala
@@ -293,7 +293,7 @@ case class ColumnarArrowEvalPythonExec(
               e =>
                 if (!e.isInstanceOf[AttributeReference]) {
                   throw new GlutenException(
-                    "ColumnarArrowEvalPythonExec should only has 
[AttributeReference] inputs.")
+                    "ColumnarArrowEvalPythonExec should only have 
[AttributeReference] inputs.")
                 } else if (allInputs.exists(_.semanticEquals(e))) {
                   allInputs.indexWhere(_.semanticEquals(e))
                 } else {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index 133bf88b3..e3298d704 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -16,7 +16,9 @@
  */
 package org.apache.spark.sql.execution
 
+import org.apache.gluten.columnarbatch.ArrowBatch
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -39,6 +41,10 @@ case class ArrowFileSourceScanExec(original: 
FileSourceScanExec)
 
   override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
 
+  override protected def batchType0(): Convention.BatchType = {
+    ArrowBatch
+  }
+
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
     val numOutputRows = longMetric("numOutputRows")
     val scanTime = longMetric("scanTime")
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
index c50c7bd75..2e4436a59 100644
--- 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/VeloxParquetWriteSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.execution.VeloxWholeStageTransformerSuite
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.SparkConf
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index c694d5804..a6228e671 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -19,6 +19,7 @@ package org.apache.gluten.backendsapi
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression._
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
 import org.apache.gluten.substrait.expression.{ExpressionBuilder, 
ExpressionNode, WindowFunctionNode}
 
 import org.apache.spark.ShuffleDependency
@@ -55,21 +56,15 @@ import scala.collection.JavaConverters._
 
 trait SparkPlanExecApi {
 
-  /**
-   * Generate ColumnarToRowExecBase.
-   *
-   * @param child
-   * @return
-   */
-  def genColumnarToRowExec(child: SparkPlan): ColumnarToRowExecBase
+  /** The columnar-batch type this backend is using. */
+  def batchType: Convention.BatchType
 
   /**
-   * Generate RowToColumnarExec.
-   *
-   * @param child
-   * @return
+   * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
+   * determine the convention (its row-based processing / columnar-batch 
processing support) of a
+   * plan with a user-defined function that accepts a plan then returns batch 
type it outputs.
    */
-  def genRowToColumnarExec(child: SparkPlan): RowToColumnarExecBase
+  def batchTypeFunc(): ConventionFunc.BatchOverride = PartialFunction.empty
 
   /**
    * Generate FilterExecTransformer.
@@ -735,6 +730,4 @@ trait SparkPlanExecApi {
     arrowEvalPythonExec
 
   def maybeCollapseTakeOrderedAndProject(plan: SparkPlan): SparkPlan = plan
-
-  def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = false
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index 5aacbed05..b64a23e86 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -19,10 +19,10 @@ package org.apache.gluten.expression
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.execution.{ColumnarToRowExecBase, 
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.test.TestStats
-import org.apache.gluten.utils.{DecimalArithmeticUtil, PlanUtil}
+import org.apache.gluten.utils.DecimalArithmeticUtil
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
@@ -680,20 +680,7 @@ object ExpressionConverter extends SQLConfHelper with 
Logging {
 
     def convertBroadcastExchangeToColumnar(
         exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
-      val newChild = exchange.child match {
-        // get WholeStageTransformer directly
-        case c2r: ColumnarToRowExecBase => c2r.child
-        // in fallback case
-        case plan: UnaryExecNode if !PlanUtil.isGlutenColumnarOp(plan) =>
-          plan.child match {
-            case _: ColumnarToRowExec =>
-              val wholeStageTransformer = 
exchange.find(_.isInstanceOf[WholeStageTransformer])
-              wholeStageTransformer.getOrElse(
-                
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan))
-            case _ =>
-              
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(plan)
-          }
-      }
+      val newChild = Transitions.toBackendBatchPlan(exchange.child)
       ColumnarBroadcastExchangeExec(exchange.mode, newChild)
     }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
index 63127727b..067976b63 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.{GlutenConfig, 
GlutenSparkExtensionsInjector}
 import org.apache.gluten.extension.columnar._
 import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.broadcast.Broadcast
@@ -115,7 +116,7 @@ case class ColumnarOverrideRules(session: SparkSession)
   override def postColumnarTransitions: Rule[SparkPlan] = plan => {
     val outputsColumnar = OutputsColumnarTester.inferOutputsColumnar(plan)
     val unwrapped = OutputsColumnarTester.unwrap(plan)
-    val vanillaPlan = ColumnarTransitions.insertTransitions(unwrapped, 
outputsColumnar)
+    val vanillaPlan = Transitions.insertTransitions(unwrapped, outputsColumnar)
     val applier: ColumnarRuleApplier = if (GlutenConfig.getConf.enableRas) {
       new EnumeratedApplier(session)
     } else {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 85901d21d..033e44b8c 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -20,6 +20,7 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.expression.TransformerState
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.PlanBuilder
 import org.apache.gluten.substrait.rel.RelNode
@@ -50,7 +51,7 @@ object ValidationResult {
 }
 
 /** Every Gluten Operator should extend this trait. */
-trait GlutenPlan extends SparkPlan with LogLevelUtil {
+trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with 
LogLevelUtil {
 
   private lazy val validationLogLevel = glutenConf.validationLogLevel
   private lazy val printStackOnValidationFailure = 
glutenConf.printStackOnValidationFailure
@@ -85,6 +86,20 @@ trait GlutenPlan extends SparkPlan with LogLevelUtil {
     }
   }
 
+  final override def batchType(): Convention.BatchType = {
+    if (!supportsColumnar) {
+      throw new UnsupportedOperationException(
+        s"Node $nodeName doesn't support columnar-batch processing")
+    }
+    val batchType = batchType0()
+    assert(batchType != Convention.BatchType.None)
+    batchType
+  }
+
+  protected def batchType0(): Convention.BatchType = {
+    BackendsApiManager.getSparkPlanExecApiInstance.batchType
+  }
+
   protected def doValidateInternal(): ValidationResult = ValidationResult.ok
 
   protected def doNativeValidation(context: SubstraitContext, node: RelNode): 
ValidationResult = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
deleted file mode 100644
index 5dd266433..000000000
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gluten.extension.columnar
-
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.utils.PlanUtil
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
-import org.apache.spark.sql.execution.{ApplyColumnarRulesAndInsertTransitions, 
ColumnarToRowExec, ColumnarToRowTransition, RowToColumnarExec, 
RowToColumnarTransition, SparkPlan}
-
-/** See rule code from vanilla Spark: 
[[ApplyColumnarRulesAndInsertTransitions]]. */
-case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan] 
{
-  private object RemoveRedundantTransitions extends Rule[SparkPlan] {
-    override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-      case ColumnarToRowExec(RowToColumnarExec(child)) => child
-      case RowToColumnarExec(ColumnarToRowExec(child)) => child
-    }
-  }
-
-  private val rules = List(
-    ApplyColumnarRulesAndInsertTransitions(List(), outputsColumnar),
-    RemoveRedundantTransitions)
-  override def apply(plan: SparkPlan): SparkPlan = rules.foldLeft(plan) {
-    case (p, r) => r.apply(p)
-  }
-}
-
-object RemoveTransitions extends Rule[SparkPlan] {
-  import ColumnarTransitions._
-  override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
-    case ColumnarToRowLike(child) => child
-    case RowToColumnarLike(child) => child
-  }
-}
-
-// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
-// to support vanilla columnar operators.
-case class InsertColumnarToColumnarTransitions(session: SparkSession) extends 
Rule[SparkPlan] {
-  @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
-  private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan = 
p.transformUp {
-    case plan if PlanUtil.isGlutenColumnarOp(plan) =>
-      plan.withNewChildren(plan.children.map {
-        case child if PlanUtil.isVanillaColumnarOp(child) =>
-          BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
-            ColumnarToRowExec(child))
-        case other => other
-      })
-  }
-
-  private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan = 
p.transformUp {
-    case plan if PlanUtil.isVanillaColumnarOp(plan) =>
-      plan.withNewChildren(plan.children.map {
-        case child if PlanUtil.isGlutenColumnarOp(child) =>
-          RowToColumnarExec(
-            
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
-        case other => other
-      })
-  }
-
-  def apply(plan: SparkPlan): SparkPlan = {
-    val newPlan = 
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
-    planChangeLogger.logRule(ruleName, plan, newPlan)
-    newPlan
-  }
-}
-
-object ColumnarTransitions {
-  def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan 
= {
-    InsertTransitions(outputsColumnar).apply(plan)
-  }
-
-  // Extractor for Spark/Gluten's C2R
-  object ColumnarToRowLike {
-    def unapply(plan: SparkPlan): Option[SparkPlan] = {
-      plan match {
-        case c2r: ColumnarToRowTransition =>
-          Some(c2r.child)
-        case _ => None
-      }
-    }
-  }
-
-  // Extractor for Spark/Gluten's R2C
-  object RowToColumnarLike {
-    def unapply(plan: SparkPlan): Option[SparkPlan] = {
-      plan match {
-        case c2r: RowToColumnarTransition =>
-          Some(c2r.child)
-        case _ => None
-      }
-    }
-  }
-}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
index 471141f49..6f8d7cde7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution.BroadcastHashJoinExecTransformerBase
 import org.apache.gluten.extension.GlutenPlan
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPostOverrides
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike, Transitions}
 import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.rdd.RDD
@@ -78,7 +78,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, 
originalPlan: SparkP
         case _: CommandResultExec | _: ExecutedCommandExec => // ignore
         // we plan exchange to columnar exchange in columnar rules and the 
exchange does not
         // support columnar, so the output columnar is always false in AQE 
postStageCreationRules
-        case ColumnarToRowExec(s: Exchange) if isAdaptiveContext =>
+        case ColumnarToRowLike(s: Exchange) if isAdaptiveContext =>
           countFallbackInternal(s)
         case u: UnaryExecNode
             if !PlanUtil.isGlutenColumnarOp(u) && 
PlanUtil.isGlutenTableCache(u.child) =>
@@ -86,15 +86,15 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, 
originalPlan: SparkP
           // which is a kind of `ColumnarToRowExec`.
           transitionCost = transitionCost + 1
           countFallbackInternal(u.child)
-        case ColumnarToRowExec(p: GlutenPlan) =>
+        case ColumnarToRowLike(p: GlutenPlan) =>
           logDebug(s"Find a columnar to row for gluten plan:\n$p")
           transitionCost = transitionCost + 1
           countFallbackInternal(p)
-        case r: RowToColumnarExec =>
+        case RowToColumnarLike(child) =>
           if (!ignoreRowToColumnar) {
             transitionCost = transitionCost + 1
           }
-          countFallbackInternal(r.child)
+          countFallbackInternal(child)
         case leafPlan: LeafExecNode if PlanUtil.isGlutenTableCache(leafPlan) =>
         case leafPlan: LeafExecNode if !PlanUtil.isGlutenColumnarOp(leafPlan) 
=>
           // Possible fallback for leaf node.
@@ -236,9 +236,8 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, 
originalPlan: SparkP
   }
 
   private def fallbackToRowBasedPlan(outputsColumnar: Boolean): SparkPlan = {
-    val transformPostOverrides = TransformPostOverrides()
-    val planWithTransitions = 
ColumnarTransitions.insertTransitions(originalPlan, outputsColumnar)
-    transformPostOverrides.apply(planWithTransitions)
+    val planWithTransitions = Transitions.insertTransitions(originalPlan, 
outputsColumnar)
+    planWithTransitions
   }
 
   private def countTransitionCostForVanillaSparkPlan(plan: SparkPlan): Int = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index 08c63000e..fab973ffb 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.gluten.backendsapi.BackendsApiManager
-import 
org.apache.gluten.extension.columnar.ColumnarTransitions.ColumnarToRowLike
+import org.apache.gluten.extension.columnar.transition.ColumnarToRowLike
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.sql.SparkSession
@@ -59,35 +58,6 @@ object MiscColumnarRules {
     }
   }
 
-  // This rule will try to convert the row-to-columnar and columnar-to-row
-  // into native implementations.
-  case class TransformPostOverrides() extends Rule[SparkPlan] {
-    @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
-    def replaceWithTransformerPlan(plan: SparkPlan): SparkPlan = 
plan.transformDown {
-      case RowToColumnarExec(child) =>
-        logDebug(s"ColumnarPostOverrides RowToColumnarExec(${child.getClass})")
-        
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
-      case c2r @ ColumnarToRowExec(child)
-          if PlanUtil.outputNativeColumnarData(child) &&
-            !PlanUtil.outputNativeColumnarSparkCompatibleData(child) =>
-        logDebug(s"ColumnarPostOverrides ColumnarToRowExec(${child.getClass})")
-        val nativeC2r = 
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child)
-        if (nativeC2r.doValidate().isValid) {
-          nativeC2r
-        } else {
-          c2r
-        }
-    }
-
-    // apply for the physical not final plan
-    def apply(plan: SparkPlan): SparkPlan = {
-      val newPlan = replaceWithTransformerPlan(plan)
-      planChangeLogger.logRule(ruleName, plan, newPlan)
-      newPlan
-    }
-  }
-
   // Remove topmost columnar-to-row otherwise AQE throws error.
   // See: 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec#newQueryStage
   //
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 92d64abf3..3f8ee8706 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.extension.columnar.enumerated
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPostOverrides}
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow}
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 import org.apache.gluten.extension.columnar.util.AdaptiveContext
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
@@ -151,10 +152,7 @@ class EnumeratedApplier(session: SparkSession)
    */
   private def postRules(): List[SparkSession => Rule[SparkPlan]] =
     List(
-      (_: SparkSession) => TransformPostOverrides(),
-      (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
-      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())
-    ) :::
+      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() 
:::
       List((_: SparkSession) => 
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
       SparkRuleUtil.extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarPostRules)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
index 7306b734a..388668287 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
@@ -71,11 +71,17 @@ class PushFilterToScan(validator: Validator) extends 
RasRule[SparkPlan] {
   private object FilterAndScan {
     def unapply(node: SparkPlan): Option[(FilterExec, SparkPlan)] = node match 
{
       case f @ FilterExec(cond, ColumnarToRowExec(scan)) =>
+        ensureScan(scan)
         Some(f, scan)
       case f @ FilterExec(cond, scan) =>
+        ensureScan(scan)
         Some(f, scan)
       case _ =>
         None
     }
+
+    private def ensureScan(node: SparkPlan): Unit = {
+      assert(node.isInstanceOf[FileSourceScanExec] || 
node.isInstanceOf[BatchScanExec])
+    }
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 0e905ced1..2b5b18abb 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -19,8 +19,9 @@ package org.apache.gluten.extension.columnar.heuristic
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPreOverrides}
 import 
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
+import org.apache.gluten.extension.columnar.transition.{InsertTransitions, 
RemoveTransitions}
 import org.apache.gluten.extension.columnar.util.AdaptiveContext
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
@@ -146,10 +147,7 @@ class HeuristicApplier(session: SparkSession)
    */
   private def postRules(): List[SparkSession => Rule[SparkPlan]] =
     List(
-      (_: SparkSession) => TransformPostOverrides(),
-      (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
-      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())
-    ) :::
+      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())) :::
       
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() 
:::
       List((_: SparkSession) => 
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
       SparkRuleUtil.extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarPostRules)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
new file mode 100644
index 000000000..2774497d9
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
+
+/**
+ * Convention of a query plan consists of the row data type and columnar data 
type it supports to
+ * output.
+ */
+sealed trait Convention {
+  def rowType: Convention.RowType
+  def batchType: Convention.BatchType
+}
+
+object Convention {
+  implicit class ConventionOps(val conv: Convention) extends AnyVal {
+    def isNone: Boolean = {
+      conv.rowType == RowType.None && conv.batchType == BatchType.None
+    }
+
+    def &&(other: Convention): Convention = {
+      def rowType(): RowType = {
+        if (conv.rowType == other.rowType) {
+          return conv.rowType
+        }
+        RowType.None
+      }
+      def batchType(): BatchType = {
+        if (conv.batchType == other.batchType) {
+          return conv.batchType
+        }
+        BatchType.None
+      }
+      Convention.of(rowType(), batchType())
+    }
+  }
+
+  private case class Impl(override val rowType: RowType, override val 
batchType: BatchType)
+    extends Convention
+
+  def get(plan: SparkPlan): Convention = {
+    ConventionFunc.create().conventionOf(plan)
+  }
+
+  def of(rowType: RowType, batchType: BatchType): Convention = {
+    Impl(rowType, batchType)
+  }
+
+  sealed trait RowType
+
+  object RowType {
+    // None indicates that the plan doesn't support row-based processing.
+    final case object None extends RowType
+    final case object VanillaRow extends RowType
+  }
+
+  trait BatchType {
+    final def fromRow(transitionDef: TransitionDef): Unit = {
+      Transition.factory.update().defineFromRowTransition(this, transitionDef)
+    }
+
+    final def toRow(transitionDef: TransitionDef): Unit = {
+      Transition.factory.update().defineToRowTransition(this, transitionDef)
+    }
+
+    final def fromBatch(from: BatchType, transitionDef: TransitionDef): Unit = 
{
+      assert(from != this)
+      Transition.factory.update().defineBatchTransition(from, this, 
transitionDef)
+    }
+
+    final def toBatch(to: BatchType, transitionDef: TransitionDef): Unit = {
+      assert(to != this)
+      Transition.factory.update().defineBatchTransition(this, to, 
transitionDef)
+    }
+  }
+
+  object BatchType {
+    // None indicates that the plan doesn't support batch-based processing.
+    final case object None extends BatchType
+    final case object VanillaBatch extends BatchType {
+      fromRow(
+        () =>
+          (plan: SparkPlan) => {
+            RowToColumnarExec(plan)
+          })
+
+      toRow(
+        () =>
+          (plan: SparkPlan) => {
+            ColumnarToRowExec(plan)
+          })
+    }
+  }
+
+  trait KnownBatchType {
+    def batchType(): BatchType
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
new file mode 100644
index 000000000..28bd1d12c
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention.{BatchType, 
RowType}
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+
+/** ConventionFunc is a utility to derive [[Convention]] from a query plan. */
+trait ConventionFunc {
+  def conventionOf(plan: SparkPlan): Convention
+}
+
+object ConventionFunc {
+  type BatchOverride = PartialFunction[SparkPlan, BatchType]
+
+  // For testing, to make things work without a backend loaded.
+  private var ignoreBackend: Boolean = false
+
+  // Visible for testing
+  def ignoreBackend[T](body: => T): T = synchronized {
+    assert(!ignoreBackend)
+    ignoreBackend = true
+    try {
+      body
+    } finally {
+      ignoreBackend = false
+    }
+  }
+
+  def create(): ConventionFunc = {
+    synchronized {
+      if (ignoreBackend) {
+        // For testing
+        return new BuiltinFunc(PartialFunction.empty)
+      }
+    }
+    val batchOverride = 
BackendsApiManager.getSparkPlanExecApiInstance.batchTypeFunc()
+    new BuiltinFunc(batchOverride)
+  }
+
+  private class BuiltinFunc(o: BatchOverride) extends ConventionFunc {
+
+    override def conventionOf(plan: SparkPlan): Convention = {
+      val conv = conventionOf0(plan)
+      conv
+    }
+
+    private def conventionOf0(plan: SparkPlan): Convention = plan match {
+      case p if canPropagateConvention(p) =>
+        val childrenConventions = p.children.map(conventionOf0).distinct
+        if (childrenConventions.size > 1) {
+          childrenConventions.reduce(_ && _)
+        } else {
+          assert(childrenConventions.size == 1)
+          childrenConventions.head
+        }
+      case q: QueryStageExec => conventionOf0(q.plan)
+      case r: ReusedExchangeExec => conventionOf0(r.child)
+      case a: AdaptiveSparkPlanExec =>
+        val rowType = rowTypeOf(a)
+        val batchType = if (a.supportsColumnar) {
+          // By default, we execute columnar AQE with backend batch output.
+          // See 
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
+          BackendsApiManager.getSparkPlanExecApiInstance.batchType
+        } else {
+          BatchType.None
+        }
+        val conv = Convention.of(rowType, batchType)
+        conv
+      case other =>
+        val conv = Convention.of(rowTypeOf(other), batchTypeOf(other))
+        conv
+    }
+
+    private def rowTypeOf(plan: SparkPlan): RowType = {
+      if (!SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+        return RowType.None
+      }
+      RowType.VanillaRow
+    }
+
+    private def batchTypeOf(plan: SparkPlan): BatchType = {
+      if (!plan.supportsColumnar) {
+        return BatchType.None
+      }
+      o.applyOrElse(
+        plan,
+        (p: SparkPlan) =>
+          p match {
+            case g: Convention.KnownBatchType => g.batchType()
+            case _ => BatchType.VanillaBatch
+          }
+      )
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
new file mode 100644
index 000000000..aac2084a7
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+/**
+ * ConventionReq describes the requirement for [[Convention]]. This is mostly 
used in determining
+ * the acceptable conventions for its children of a parent plan node.
+ */
+sealed trait ConventionReq {
+  def requiredRowType: ConventionReq.RowType
+  def requiredBatchType: ConventionReq.BatchType
+}
+
+object ConventionReq {
+  sealed trait RowType
+
+  object RowType {
+    final case object Any extends RowType
+    final case class Is(t: Convention.RowType) extends RowType {
+      assert(t != Convention.RowType.None)
+    }
+  }
+
+  sealed trait BatchType
+
+  object BatchType {
+    final case object Any extends BatchType
+    final case class Is(t: Convention.BatchType) extends BatchType {
+      assert(t != Convention.BatchType.None)
+    }
+  }
+
+  private case class Impl(
+      override val requiredRowType: RowType,
+      override val requiredBatchType: BatchType
+  ) extends ConventionReq
+
+  val any: ConventionReq = Impl(RowType.Any, BatchType.Any)
+  def of(rowType: RowType, batchType: BatchType): ConventionReq = new 
Impl(rowType, batchType)
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
new file mode 100644
index 000000000..9b745f94d
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.exception.GlutenException
+
+import org.apache.spark.sql.execution.SparkPlan
+
+import scala.collection.mutable
+
+/**
+ * Transition is a simple function to convert a query plan to interested 
[[ConventionReq]].
+ *
+ * Transitions can be registered through the utility APIs in
+ * [[org.apache.gluten.extension.columnar.transition.Convention.BatchType]]'s 
definition.
+ */
+trait Transition {
+  def apply(plan: SparkPlan): SparkPlan
+}
+
+trait TransitionDef {
+  def create(): Transition
+}
+
+object TransitionDef {
+  val empty: TransitionDef = () => Transition.empty
+}
+
+object Transition {
+  val empty: Transition = (plan: SparkPlan) => plan
+  val factory: Factory = Factory.newBuiltin()
+
+  def notFound(plan: SparkPlan): GlutenException = {
+    new GlutenException(s"No viable transition found from plan's child to 
itself: $plan")
+  }
+
+  def notFound(plan: SparkPlan, required: ConventionReq): GlutenException = {
+    new GlutenException(s"No viable transition to [$required] found for plan: 
$plan")
+  }
+
+  private class ChainedTransition(first: Transition, second: Transition) 
extends Transition {
+    override def apply(plan: SparkPlan): SparkPlan = {
+      second(first(plan))
+    }
+  }
+
+  private def chain(first: Transition, second: Transition): Transition = {
+    new ChainedTransition(first, second)
+  }
+
+  trait Factory {
+    final def findTransition(
+        from: Convention,
+        to: ConventionReq,
+        otherwise: Exception): Transition = {
+      findTransition(from, to) {
+        throw otherwise
+      }
+    }
+
+    protected def findTransition(from: Convention, to: ConventionReq)(
+        orElse: => Transition): Transition
+    private[transition] def update(): MutableFactory
+  }
+
+  trait MutableFactory extends Factory {
+    def defineFromRowTransition(to: Convention.BatchType, transitionDef: 
TransitionDef): Unit
+    def defineToRowTransition(from: Convention.BatchType, transitionDef: 
TransitionDef): Unit
+    def defineBatchTransition(
+        from: Convention.BatchType,
+        to: Convention.BatchType,
+        transitionDef: TransitionDef): Unit
+  }
+
+  private object Factory {
+    def newBuiltin(): Factory = {
+      new BuiltinFactory
+    }
+
+    private class BuiltinFactory extends MutableFactory {
+      private val fromRowTransitions: mutable.Map[Convention.BatchType, 
TransitionDef] =
+        mutable.Map()
+      private val toRowTransitions: mutable.Map[Convention.BatchType, 
TransitionDef] = mutable.Map()
+      private val batchTransitions
+          : mutable.Map[(Convention.BatchType, Convention.BatchType), 
TransitionDef] =
+        mutable.Map()
+
+      override def defineFromRowTransition(
+          to: Convention.BatchType,
+          transitionDef: TransitionDef): Unit = {
+        assert(!fromRowTransitions.contains(to))
+        fromRowTransitions += to -> transitionDef
+      }
+
+      override def defineToRowTransition(
+          from: Convention.BatchType,
+          transitionDef: TransitionDef): Unit = {
+        assert(!toRowTransitions.contains(from))
+        toRowTransitions += from -> transitionDef
+      }
+
+      override def defineBatchTransition(
+          from: Convention.BatchType,
+          to: Convention.BatchType,
+          transitionDef: TransitionDef): Unit = {
+        assert(!batchTransitions.contains((from, to)))
+        batchTransitions += (from, to) -> transitionDef
+      }
+
+      override def findTransition(from: Convention, to: ConventionReq)(
+          orElse: => Transition): Transition = {
+        assert(
+          !from.isNone,
+          "#findTransition called with on a plan that doesn't support either 
row or columnar " +
+            "output")
+        val out = (to.requiredRowType, to.requiredBatchType) match {
+          case (ConventionReq.RowType.Is(toRowType), 
ConventionReq.BatchType.Is(toBatchType)) =>
+            if (from.rowType == toRowType && from.batchType == toBatchType) {
+              return Transition.empty
+            } else {
+              throw new UnsupportedOperationException(
+                "Transiting to plan that both have row and columnar-batch 
output is not yet " +
+                  "supported")
+            }
+          case (ConventionReq.RowType.Is(toRowType), 
ConventionReq.BatchType.Any) =>
+            from.rowType match {
+              case Convention.RowType.None =>
+                
toRowTransitions.get(from.batchType).map(_.create()).getOrElse(orElse)
+              case fromRowType =>
+                // We have only one single built-in row type.
+                assert(toRowType == fromRowType)
+                Transition.empty
+            }
+          case (ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(toBatchType)) =>
+            from.batchType match {
+              case Convention.BatchType.None =>
+                
fromRowTransitions.get(toBatchType).map(_.create()).getOrElse(orElse)
+              case fromBatchType =>
+                if (toBatchType == fromBatchType) {
+                  Transition.empty
+                } else {
+                  // Batch type conversion needed.
+                  //
+                  // We first look up for batch-to-batch transition. If found 
one, return that
+                  // transition to caller. Otherwise, look for from/to row 
transitions, then
+                  // return a bridged batch-to-row-to-batch transition.
+                  if (batchTransitions.contains((fromBatchType, toBatchType))) 
{
+                    // 1. Found batch-to-batch transition.
+                    batchTransitions((fromBatchType, toBatchType)).create()
+                  } else {
+                    // 2. Otherwise, build up batch-to-row-to-batch transition.
+                    val batchToRow =
+                      
toRowTransitions.get(fromBatchType).map(_.create()).getOrElse(orElse)
+                    val rowToBatch =
+                      
fromRowTransitions.get(toBatchType).map(_.create()).getOrElse(orElse)
+                    chain(batchToRow, rowToBatch)
+                  }
+                }
+            }
+          case (ConventionReq.RowType.Any, ConventionReq.BatchType.Any) =>
+            Transition.empty
+          case _ =>
+            throw new UnsupportedOperationException(
+              s"Illegal convention requirement: $ConventionReq")
+        }
+        out
+      }
+
+      override private[transition] def update(): MutableFactory = this
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
new file mode 100644
index 000000000..e0758cff7
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{SparkPlan, UnionExec}
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
+
+import scala.annotation.tailrec
+
+case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan] 
{
+  import InsertTransitions._
+  private val convFunc = ConventionFunc.create()
+
+  override def apply(plan: SparkPlan): SparkPlan = {
+    // Remove all transitions at first.
+    val removed = RemoveTransitions.apply(plan)
+    val filled = fillWithTransitions(removed)
+    if (!outputsColumnar) {
+      return Transitions.toRowPlan(filled)
+    }
+    Transitions.toBackendBatchPlan(filled)
+  }
+
+  private def fillWithTransitions(plan: SparkPlan): SparkPlan = 
plan.transformUp {
+    case p => applyForNode(p)
+  }
+
+  private def applyForNode(node: SparkPlan): SparkPlan = {
+    if (node.children.isEmpty) {
+      return node
+    }
+    val convReq = childrenConvReqOf(node)
+    val newChildren = node.children.map {
+      child =>
+        val from = convFunc.conventionOf(child)
+        if (from.isNone) {
+          // For example, a union op with row child and columnar child at the 
same time,
+          // The plan is actually not executable and we cannot tell about its 
convention.
+          child
+        } else {
+          val transition =
+            Transition.factory.findTransition(from, convReq, 
Transition.notFound(node))
+          val newChild = transition.apply(child)
+          newChild
+        }
+    }
+    node.withNewChildren(newChildren)
+  }
+
+  private def childrenConvReqOf(node: SparkPlan): ConventionReq = node match {
+    // TODO: Consider C2C transitions as well when we have some.
+    case ColumnarToRowLike(_) | RowToColumnarLike(_) =>
+      // C2R / R2C here since they are already removed by
+      // RemoveTransitions.
+      // It's current rule's mission to add C2Rs / R2Cs on demand.
+      throw new IllegalStateException("Unreachable code")
+    case write: DataWritingCommandExec if 
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
+      // To align with ApplyColumnarRulesAndInsertTransitions#insertTransitions
+      ConventionReq.any
+    case u: UnionExec =>
+      // We force vanilla union to output row data to get best compatibility 
with vanilla Spark.
+      // As a result it's a common practice to rewrite it with GlutenPlan for 
offloading.
+      ConventionReq.of(
+        ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+        ConventionReq.BatchType.Any)
+    case other =>
+      // In the normal case, children's convention should follow parent node's 
convention.
+      // Note, we don't have consider C2R / R2C here since they are already 
removed by
+      // RemoveTransitions.
+      val thisConv = convFunc.conventionOf(other)
+      thisConv.asReq()
+  }
+}
+
+object InsertTransitions {
+  implicit private class ConventionOps(conv: Convention) {
+    def asReq(): ConventionReq = {
+      val rowTypeReq = conv.rowType match {
+        case Convention.RowType.None => ConventionReq.RowType.Any
+        case r => ConventionReq.RowType.Is(r)
+      }
+
+      val batchTypeReq = conv.batchType match {
+        case Convention.BatchType.None => ConventionReq.BatchType.Any
+        case b => ConventionReq.BatchType.Is(b)
+      }
+      ConventionReq.of(rowTypeReq, batchTypeReq)
+    }
+  }
+}
+
+object RemoveTransitions extends Rule[SparkPlan] {
+  override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { case p 
=> removeForNode(p) }
+
+  @tailrec
+  private[transition] def removeForNode(plan: SparkPlan): SparkPlan = plan 
match {
+    // TODO: Consider C2C transitions as well when we have some.
+    case ColumnarToRowLike(child) => removeForNode(child)
+    case RowToColumnarLike(child) => removeForNode(child)
+    case other => other
+  }
+}
+
+object Transitions {
+  def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan 
= {
+    val out = InsertTransitions(outputsColumnar).apply(plan)
+    out
+  }
+
+  def toRowPlan(plan: SparkPlan): SparkPlan = {
+    val convFunc = ConventionFunc.create()
+    val req = ConventionReq.of(
+      ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+      ConventionReq.BatchType.Any)
+    val removed = RemoveTransitions.removeForNode(plan)
+    val transition = Transition.factory.findTransition(
+      convFunc.conventionOf(removed),
+      req,
+      Transition.notFound(removed, req))
+    val out = transition.apply(removed)
+    out
+  }
+
+  def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
+    val backendBatchType = 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
+    val out = toBatchPlan(plan, backendBatchType)
+    out
+  }
+
+  def toVanillaBatchPlan(plan: SparkPlan): SparkPlan = {
+    val out = toBatchPlan(plan, Convention.BatchType.VanillaBatch)
+    out
+  }
+
+  private def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType): 
SparkPlan = {
+    val convFunc = ConventionFunc.create()
+    val req = ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(toBatchType))
+    val removed = RemoveTransitions.removeForNode(plan)
+    val transition = Transition.factory.findTransition(
+      convFunc.conventionOf(removed),
+      req,
+      Transition.notFound(removed, req))
+    val out = transition.apply(removed)
+    out
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
new file mode 100644
index 000000000..b0d04c273
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/package.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AQEShuffleReadExec
+import org.apache.spark.sql.execution.debug.DebugExec
+
+package object transition {
+  // These 5 plan operators (as of Spark 3.5) are operators that have the
+  // same convention with their children.
+  //
+  // Extend this list in shim layer once Spark has more.
+  def canPropagateConvention(plan: SparkPlan): Boolean = plan match {
+    case p: DebugExec => true
+    case p: UnionExec => true
+    case p: AQEShuffleReadExec => true
+    case p: InputAdapter => true
+    case p: WholeStageCodegenExec => true
+    case _ => false
+  }
+
+  // Extractor for Spark/Gluten's C2R
+  object ColumnarToRowLike {
+    def unapply(plan: SparkPlan): Option[SparkPlan] = {
+      plan match {
+        case c2r: ColumnarToRowTransition =>
+          Some(c2r.child)
+        case _ => None
+      }
+    }
+  }
+
+  // Extractor for Spark/Gluten's R2C
+  object RowToColumnarLike {
+    def unapply(plan: SparkPlan): Option[SparkPlan] = {
+      plan match {
+        case c2r: RowToColumnarTransition =>
+          Some(c2r.child)
+        case _ => None
+      }
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
index 2920c0a39..fa69eedb5 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.gluten.planner.cost
 
-import org.apache.gluten.extension.columnar.{ColumnarTransitions, OffloadJoin}
+import org.apache.gluten.extension.columnar.OffloadJoin
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike}
 import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec
 import org.apache.gluten.ras.{Cost, CostModel}
 import org.apache.gluten.utils.PlanUtil
@@ -63,8 +64,8 @@ object GlutenCostModel {
           infLongCost
         case ColumnarToRowExec(child) => 3L
         case RowToColumnarExec(child) => 3L
-        case ColumnarTransitions.ColumnarToRowLike(child) => 3L
-        case ColumnarTransitions.RowToColumnarLike(child) => 3L
+        case ColumnarToRowLike(child) => 3L
+        case RowToColumnarLike(child) => 3L
         case p if PlanUtil.isGlutenColumnarOp(p) => 2L
         case p if PlanUtil.isVanillaColumnarOp(p) => 3L
         // Other row ops. Usually a vanilla row op.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
index 36751f4ca..5fe96ab79 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/planner/property/Convention.scala
@@ -16,9 +16,9 @@
  */
 package org.apache.gluten.planner.property
 
-import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.RowToColumnarExecBase
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.ColumnarTransitions
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike, Transitions}
 import org.apache.gluten.planner.plan.GlutenPlanModel.GroupLeafExec
 import org.apache.gluten.ras.{Property, PropertyDef}
 import org.apache.gluten.ras.rule.{RasRule, Shape, Shapes}
@@ -62,8 +62,8 @@ object ConventionDef extends PropertyDef[SparkPlan, 
Convention] {
     case g: GroupLeafExec => g.propertySet.get(ConventionDef)
     case ColumnarToRowExec(child) => Conventions.ROW_BASED
     case RowToColumnarExec(child) => Conventions.VANILLA_COLUMNAR
-    case ColumnarTransitions.ColumnarToRowLike(child) => Conventions.ROW_BASED
-    case ColumnarTransitions.RowToColumnarLike(child) => 
Conventions.GLUTEN_COLUMNAR
+    case ColumnarToRowLike(child) => Conventions.ROW_BASED
+    case RowToColumnarLike(child) => Conventions.GLUTEN_COLUMNAR
     case q: QueryStageExec => conventionOf(q.plan)
     case r: ReusedExchangeExec => conventionOf(r.child)
     case a: AdaptiveSparkPlanExec => conventionOf(a.executedPlan)
@@ -82,8 +82,8 @@ object ConventionDef extends PropertyDef[SparkPlan, 
Convention] {
       constraint: Property[SparkPlan],
       plan: SparkPlan): Seq[Convention] = plan match {
     case ColumnarToRowExec(child) => Seq(Conventions.VANILLA_COLUMNAR)
-    case ColumnarTransitions.ColumnarToRowLike(child) => 
Seq(Conventions.GLUTEN_COLUMNAR)
-    case ColumnarTransitions.RowToColumnarLike(child) => 
Seq(Conventions.ROW_BASED)
+    case ColumnarToRowLike(child) => Seq(Conventions.GLUTEN_COLUMNAR)
+    case RowToColumnarLike(child) => Seq(Conventions.ROW_BASED)
     case p if canPropagateConvention(p) =>
       p.children.map(_ => constraint.asInstanceOf[Convention])
     case other =>
@@ -127,22 +127,18 @@ case class ConventionEnforcerRule(reqConv: Convention) 
extends RasRule[SparkPlan
       case (Conventions.ROW_BASED, Conventions.VANILLA_COLUMNAR) =>
         List(RowToColumnarExec(node))
       case (Conventions.GLUTEN_COLUMNAR, Conventions.ROW_BASED) =>
-        
List(BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(node))
+        List(Transitions.toRowPlan(node))
       case (Conventions.ROW_BASED, Conventions.GLUTEN_COLUMNAR) =>
-        val attempt = 
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(node)
-        if (attempt.doValidate().isValid) {
+        val attempt = Transitions.toBackendBatchPlan(node)
+        if (attempt.asInstanceOf[RowToColumnarExecBase].doValidate().isValid) {
           List(attempt)
         } else {
           List.empty
         }
       case (Conventions.VANILLA_COLUMNAR, Conventions.GLUTEN_COLUMNAR) =>
-        List(
-          BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
-            ColumnarToRowExec(node)))
+        List(Transitions.toBackendBatchPlan(ColumnarToRowExec(node)))
       case (Conventions.GLUTEN_COLUMNAR, Conventions.VANILLA_COLUMNAR) =>
-        List(
-          RowToColumnarExec(
-            
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(node)))
+        List(RowToColumnarExec(Transitions.toRowPlan(node)))
       case _ => List.empty
     }
   }
diff --git a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala 
b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
index 4c02687a6..fe7eb5566 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/utils/PlanUtil.scala
@@ -17,18 +17,15 @@
 package org.apache.gluten.utils
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
-import org.apache.spark.sql.execution.exchange._
 
 object PlanUtil {
-  def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
-    // `ColumnarCachedBatchSerializer` is at velox module, so use class name 
here
-    i.relation.cacheBuilder.serializer.getClass.getSimpleName == 
"ColumnarCachedBatchSerializer" &&
-    i.supportsColumnar
+  private def isGlutenTableCacheInternal(i: InMemoryTableScanExec): Boolean = {
+    Convention.get(i).batchType == 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
   }
 
   def isGlutenTableCache(plan: SparkPlan): Boolean = {
@@ -42,44 +39,11 @@ object PlanUtil {
     }
   }
 
-  def outputNativeColumnarData(plan: SparkPlan): Boolean = {
-    plan match {
-      case a: AQEShuffleReadExec => outputNativeColumnarData(a.child)
-      case s: QueryStageExec => outputNativeColumnarData(s.plan)
-      case s: ReusedExchangeExec => outputNativeColumnarData(s.child)
-      case s: InputAdapter => outputNativeColumnarData(s.child)
-      case s: WholeStageCodegenExec => outputNativeColumnarData(s.child)
-      case s: AdaptiveSparkPlanExec => outputNativeColumnarData(s.executedPlan)
-      case i: InMemoryTableScanExec => PlanUtil.isGlutenTableCache(i)
-      case _: GlutenPlan => true
-      case _ => false
-    }
-  }
-
-  def outputNativeColumnarSparkCompatibleData(plan: SparkPlan): Boolean = {
-    
BackendsApiManager.getSparkPlanExecApiInstance.outputNativeColumnarSparkCompatibleData(plan)
-  }
-
   def isVanillaColumnarOp(plan: SparkPlan): Boolean = {
-    plan match {
-      case i: InMemoryTableScanExec =>
-        if (PlanUtil.isGlutenTableCache(i)) {
-          // `InMemoryTableScanExec` do not need extra RowToColumnar or 
ColumnarToRow
-          false
-        } else {
-          !plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
-        }
-      case a: AQEShuffleReadExec => isVanillaColumnarOp(a.child)
-      case s: QueryStageExec => isVanillaColumnarOp(s.plan)
-      case _: RowToColumnarExec => false
-      case _: InputAdapter => false
-      case _: WholeStageCodegenExec => false
-      case r: ReusedExchangeExec => isVanillaColumnarOp(r.child)
-      case _ => !plan.isInstanceOf[GlutenPlan] && plan.supportsColumnar
-    }
+    Convention.get(plan).batchType == Convention.BatchType.VanillaBatch
   }
 
   def isGlutenColumnarOp(plan: SparkPlan): Boolean = {
-    plan.isInstanceOf[GlutenPlan]
+    Convention.get(plan).batchType == 
BackendsApiManager.getSparkPlanExecApiInstance.batchType
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index fce07eab4..23746846e 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution._
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.rel.RelBuilder
@@ -158,16 +159,18 @@ case class ColumnarCollapseTransformStages(
   }
 }
 
-case class ColumnarInputAdapter(child: SparkPlan) extends UnaryExecNode {
+case class ColumnarInputAdapter(child: SparkPlan)
+  extends UnaryExecNode
+  with Convention.KnownBatchType {
   override def output: Seq[Attribute] = child.output
-  override def supportsColumnar: Boolean = child.supportsColumnar
-  override protected def doExecute(): RDD[InternalRow] =
-    child.execute()
+  override def supportsColumnar: Boolean = true
+  override def batchType(): Convention.BatchType =
+    BackendsApiManager.getSparkPlanExecApiInstance.batchType
+  override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
   override def outputPartitioning: Partitioning = child.outputPartitioning
   override def outputOrdering: Seq[SortOrder] = child.outputOrdering
-  override def vectorTypes: Option[Seq[String]] = child.vectorTypes
-  override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] = 
child.executeBroadcast()
+  override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] = 
child.doExecuteBroadcast()
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
     copy(child = newChild)
   // Node name's required to be "InputAdapter" to correctly draw UI graph.
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index f1adb09e2..859cca842 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.ColumnarToRowExecBase
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Transitions
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
@@ -68,7 +69,7 @@ case class FakeRowAdaptor(child: SparkPlan)
     if (child.supportsColumnar) {
       child.executeColumnar()
     } else {
-      val r2c = 
BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(child)
+      val r2c = Transitions.toBackendBatchPlan(child)
       r2c.executeColumnar()
     }
   }
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
 
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
index bb1867d96..8e8743857 100644
--- 
a/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
+++ 
b/gluten-core/src/test/scala/org/apache/gluten/execution/WholeStageTransformerSuite.scala
@@ -17,7 +17,8 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.utils.{Arm, FallbackUtil}
+import org.apache.gluten.test.FallbackUtil
+import org.apache.gluten.utils.Arm
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.Logging
diff --git 
a/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
 
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
new file mode 100644
index 000000000..5c6d692ae
--- /dev/null
+++ 
b/gluten-core/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.transition
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.extension.GlutenPlan
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.test.SharedSparkSession
+
+class TransitionSuite extends SharedSparkSession {
+  import TransitionSuite._
+  test("Trivial C2R") {
+    val in = BatchLeaf(TypeA)
+    val out = ConventionFunc.ignoreBackend {
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
+    assert(out == BatchToRow(TypeA, BatchLeaf(TypeA)))
+  }
+
+  test("Insert C2R") {
+    val in = RowUnary(BatchLeaf(TypeA))
+    val out = ConventionFunc.ignoreBackend {
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
+    assert(out == RowUnary(BatchToRow(TypeA, BatchLeaf(TypeA))))
+  }
+
+  test("Insert R2C") {
+    val in = BatchUnary(TypeA, RowLeaf())
+    val out = ConventionFunc.ignoreBackend {
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
+    assert(out == BatchToRow(TypeA, BatchUnary(TypeA, RowToBatch(TypeA, 
RowLeaf()))))
+  }
+
+  test("Insert C2R2C") {
+    val in = BatchUnary(TypeA, BatchLeaf(TypeB))
+    val out = ConventionFunc.ignoreBackend {
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
+    assert(
+      out == BatchToRow(
+        TypeA,
+        BatchUnary(TypeA, RowToBatch(TypeA, BatchToRow(TypeB, 
BatchLeaf(TypeB))))))
+  }
+
+  test("Insert C2C") {
+    val in = BatchUnary(TypeA, BatchLeaf(TypeC))
+    val out = ConventionFunc.ignoreBackend {
+      Transitions.insertTransitions(in, outputsColumnar = false)
+    }
+    assert(
+      out == BatchToRow(
+        TypeA,
+        BatchUnary(TypeA, BatchToBatch(from = TypeC, to = TypeA, 
BatchLeaf(TypeC)))))
+  }
+
+  test("No transitions found") {
+    val in = BatchUnary(TypeA, BatchLeaf(TypeD))
+    assertThrows[GlutenException] {
+      ConventionFunc.ignoreBackend {
+        Transitions.insertTransitions(in, outputsColumnar = false)
+      }
+    }
+  }
+}
+
+object TransitionSuite {
+  object TypeA extends Convention.BatchType {
+    fromRow(
+      () =>
+        (plan: SparkPlan) => {
+          RowToBatch(this, plan)
+        })
+
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          BatchToRow(this, plan)
+        })
+  }
+
+  object TypeB extends Convention.BatchType {
+    fromRow(
+      () =>
+        (plan: SparkPlan) => {
+          RowToBatch(this, plan)
+        })
+
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          BatchToRow(this, plan)
+        })
+  }
+
+  object TypeC extends Convention.BatchType {
+    fromRow(
+      () =>
+        (plan: SparkPlan) => {
+          RowToBatch(this, plan)
+        })
+
+    toRow(
+      () =>
+        (plan: SparkPlan) => {
+          BatchToRow(this, plan)
+        })
+
+    fromBatch(
+      TypeA,
+      () =>
+        (plan: SparkPlan) => {
+          BatchToBatch(TypeA, this, plan)
+        })
+
+    toBatch(
+      TypeA,
+      () =>
+        (plan: SparkPlan) => {
+          BatchToBatch(this, TypeA, plan)
+        })
+  }
+
+  object TypeD extends Convention.BatchType {}
+
+  case class RowToBatch(toBatchType: Convention.BatchType, override val child: 
SparkPlan)
+    extends RowToColumnarTransition
+    with GlutenPlan {
+    override def supportsColumnar: Boolean = true
+    override protected def batchType0(): Convention.BatchType = toBatchType
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] =
+      throw new UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchToRow(fromBatchType: Convention.BatchType, override val 
child: SparkPlan)
+    extends ColumnarToRowTransition {
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] =
+      throw new UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchToBatch(
+      from: Convention.BatchType,
+      to: Convention.BatchType,
+      override val child: SparkPlan)
+    extends UnaryExecNode
+    with GlutenPlan {
+    override def supportsColumnar: Boolean = true
+    override protected def batchType0(): Convention.BatchType = to
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchLeaf(override val batchType0: Convention.BatchType)
+    extends LeafExecNode
+    with GlutenPlan {
+    override def supportsColumnar: Boolean = true
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = List.empty
+  }
+
+  case class BatchUnary(
+      override val batchType0: Convention.BatchType,
+      override val child: SparkPlan)
+    extends UnaryExecNode
+    with GlutenPlan {
+    override def supportsColumnar: Boolean = true
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class BatchBinary(
+      override val batchType0: Convention.BatchType,
+      override val left: SparkPlan,
+      override val right: SparkPlan)
+    extends BinaryExecNode
+    with GlutenPlan {
+    override def supportsColumnar: Boolean = true
+    override protected def withNewChildrenInternal(
+        newLeft: SparkPlan,
+        newRight: SparkPlan): SparkPlan = copy(left = newLeft, right = 
newRight)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = left.output ++ right.output
+  }
+
+  case class RowLeaf() extends LeafExecNode {
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = List.empty
+  }
+
+  case class RowUnary(override val child: SparkPlan) extends UnaryExecNode {
+    override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
+      copy(child = newChild)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = child.output
+  }
+
+  case class RowBinary(override val left: SparkPlan, override val right: 
SparkPlan)
+    extends BinaryExecNode {
+    override protected def withNewChildrenInternal(
+        newLeft: SparkPlan,
+        newRight: SparkPlan): SparkPlan = copy(left = newLeft, right = 
newRight)
+    override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
+    override def output: Seq[Attribute] = left.output ++ right.output
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala 
b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
similarity index 92%
rename from 
gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala
rename to gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index c40cdd675..d2626ab27 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/utils/FallbackUtil.scala
+++ b/gluten-core/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -14,7 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.utils
+package org.apache.gluten.test
+
+import org.apache.gluten.extension.GlutenPlan
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution._
@@ -66,12 +68,12 @@ object FallbackUtil extends Logging with 
AdaptiveSparkPlanHelper {
     var fallbackOperator: Seq[SparkPlan] = null
     if (plan.isInstanceOf[AdaptiveSparkPlanExec]) {
       fallbackOperator = collectWithSubqueries(plan) {
-        case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
+        case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
           plan
       }
     } else {
       fallbackOperator = plan.collectWithSubqueries {
-        case plan if !PlanUtil.isGlutenColumnarOp(plan) && !skip(plan) =>
+        case plan if !plan.isInstanceOf[GlutenPlan] && !skip(plan) =>
           plan
       }
     }
diff --git 
a/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala 
b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
new file mode 100644
index 000000000..3f40793d9
--- /dev/null
+++ b/gluten-data/src/main/java/org/apache/gluten/columnarbatch/ArrowBatch.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.columnarbatch
+
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
+
+/**
+ * ArrowBatch stands for Gluten's Arrow-based columnar batch implementation. 
Vanilla Spark's
+ * ColumnarBatch consisting of 
[[org.apache.spark.sql.vectorized.ArrowColumnVector]]s is still
+ * treated as [[Convention.BatchType.VanillaBatch]].
+ *
+ * As of now, ArrowBatch should have 
[[org.apache.gluten.vectorized.ArrowWritableColumnVector]]s
+ * populated in it. ArrowBatch can be loaded from / offloaded to native to C++ 
ArrowColumnarBatch
+ * through API in [[ColumnarBatches]]. After being offloaded, ArrowBatch is no 
longer considered a
+ * legal ArrowBatch and cannot be accepted by trivial ColumnarToRowExec. To 
follow that rule, Any
+ * plan with this batch type should promise it emits loaded batch only.
+ */
+object ArrowBatch extends Convention.BatchType {
+  toRow(
+    () =>
+      (plan: SparkPlan) => {
+        ColumnarToRowExec(plan)
+      })
+}
diff --git a/gluten-ut/pom.xml b/gluten-ut/pom.xml
index 8015b5cec..2396087fc 100644
--- a/gluten-ut/pom.xml
+++ b/gluten-ut/pom.xml
@@ -42,6 +42,13 @@
       <version>${project.version}</version>
       <scope>compile</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.gluten</groupId>
+      <artifactId>gluten-core</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.spark</groupId>
       <artifactId>spark-core_${scala.binary.version}</artifactId>
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e2a99ef7e..7c7aa0879 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,8 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.InsertTransitions
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index 89a0ee18a..ad08318bb 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.sql.execution.benchmarks
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 import org.apache.gluten.vectorized.JniLibLoader
 
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
     val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
 
     // generate ColumnarToRow
-    val columnarToRowPlan =
-      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+    val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
 
     val newWholeStageRDD = newWholeStage.executeColumnar()
     val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
 import org.apache.spark.sql.functions._
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 31517a141..fff883d49 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
TRANSFORM_UNSUPPORTED, TransformHints}
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..7d8a29204 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.sql.execution.benchmarks
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 import org.apache.gluten.vectorized.JniLibLoader
 
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
     val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
 
     // generate ColumnarToRow
-    val columnarToRowPlan =
-      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+    val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
 
     val newWholeStageRDD = newWholeStage.executeColumnar()
     val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
 import org.apache.spark.sql.functions._
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 079620bf8..7976288dd 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
TRANSFORM_UNSUPPORTED, TransformHints}
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..b5481f4d8 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.sql.execution.benchmarks
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 import org.apache.gluten.vectorized.JniLibLoader
 
@@ -125,7 +125,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
 
     // generate ColumnarToRow
     val columnarToRowPlan =
-      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+      Transitions.toBackendBatchPlan(newWholeStage)
 
     val newWholeStageRDD = newWholeStage.executeColumnar()
     val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
index c5e4fa0a4..c58284e44 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenStringFunctionsSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql
 
-import org.apache.gluten.utils.FallbackUtil
+import org.apache.gluten.test.FallbackUtil
 
 import org.apache.spark.sql.catalyst.expressions.ExpressionEvalHelper
 import org.apache.spark.sql.functions._
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 079620bf8..7976288dd 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
TRANSFORM_UNSUPPORTED, TransformHints}
 import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.extension.columnar.transition.InsertTransitions
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index d99d52924..7d8a29204 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.sql.execution.benchmarks
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformer}
+import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 import org.apache.gluten.vectorized.JniLibLoader
 
@@ -124,8 +124,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
     val newWholeStage = wholeStageTransform.withNewChildren(Seq(fileScan))
 
     // generate ColumnarToRow
-    val columnarToRowPlan =
-      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(newWholeStage)
+    val columnarToRowPlan = Transitions.toRowPlan(newWholeStage)
 
     val newWholeStageRDD = newWholeStage.executeColumnar()
     val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index dbefc22ef..fd8cd24c3 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
GlobalLimitExec, SparkPlan, TakeOrderedAndProjectExec}
 import org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, 
PartitionDirectory, PartitionedFile, PartitioningAwareFileIndex, 
WriteJobDescription, WriteTaskResult}
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
@@ -232,4 +233,5 @@ trait SparkShims {
 
   def dateTimestampFormatInReadIsDefaultValue(csvOptions: CSVOptions, 
timeZone: String): Boolean
 
+  def isPlannedV1Write(write: DataWritingCommandExec): Boolean = false
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 0b045972a..6d70d67f3 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -40,11 +40,13 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeLike
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -420,4 +422,8 @@ class Spark34Shims extends SparkShims {
     csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
     csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
   }
+
+  override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
+    write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
+  }
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index c839c8c2a..00f9d62fd 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.{ExtendedAnalysisException, InternalRow}
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.csv.CSVOptions
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.aggregate.{BloomFilterAggregate, 
RegrIntercept, RegrR2, RegrReplacement, RegrSlope, RegrSXY, 
TypedImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
Distribution, KeyGroupedPartitioning, Partitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -40,12 +40,14 @@ import org.apache.spark.sql.connector.catalog.Table
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition, 
Scan}
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
 import org.apache.spark.sql.execution.window.{WindowGroupLimitExec, 
WindowGroupLimitExecShim}
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.{BlockId, BlockManagerId}
@@ -448,4 +450,8 @@ class Spark35Shims extends SparkShims {
     csvOptions.timestampFormatInRead == default.timestampFormatInRead &&
     csvOptions.timestampNTZFormatInRead == default.timestampNTZFormatInRead
   }
+
+  override def isPlannedV1Write(write: DataWritingCommandExec): Boolean = {
+    write.cmd.isInstanceOf[V1WriteCommand] && SQLConf.get.plannedWriteEnabled
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to