This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new e5b4b4ce08 [GLUTEN-6920][CORE] Move API `Backend#defaultBatchType` 
down to `BackendSettingsApi` in module gluten-substrait (#8016)
e5b4b4ce08 is described below

commit e5b4b4ce081f99090ac532b3d16ea31b2de12aac
Author: Hongze Zhang <[email protected]>
AuthorDate: Mon Nov 25 14:29:54 2024 +0800

    [GLUTEN-6920][CORE] Move API `Backend#defaultBatchType` down to 
`BackendSettingsApi` in module gluten-substrait (#8016)
---
 .../gluten/backendsapi/clickhouse/CHBackend.scala  | 17 ++++++-
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  |  3 +-
 .../gluten/backendsapi/velox/VeloxBackend.scala    | 10 +++--
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  5 ++-
 .../execution/ColumnarPartialProjectExec.scala     |  2 -
 .../gluten/execution/VeloxResizeBatchesExec.scala  |  1 -
 .../api/python/ColumnarArrowEvalPythonExec.scala   |  3 +-
 .../sql/execution/ArrowFileSourceScanExec.scala    |  2 -
 .../spark/sql/execution/BaseArrowScanExec.scala    |  3 +-
 .../columnar/transition/VeloxTransitionSuite.scala | 46 +++++++++----------
 .../scala/org/apache/gluten/backend/Backend.scala  |  5 +--
 .../gluten/execution/ColumnarToColumnarExec.scala  | 21 ++++++---
 .../gluten/extension/GlutenColumnarRule.scala      |  9 ++--
 .../columnar/enumerated/EnumeratedTransform.scala  |  4 +-
 .../enumerated/planner/cost/LongCostModel.scala    |  9 ++--
 .../enumerated/planner/plan/GlutenPlanModel.scala  | 19 ++++----
 .../enumerated/planner/property/Conv.scala         | 12 +++++
 .../extension/columnar/transition/Convention.scala | 52 +++++++++++++++++++++-
 .../columnar/transition/ConventionFunc.scala       | 52 ++++++++++++++--------
 .../columnar/transition/ConventionReq.scala        | 13 +++---
 .../extension/columnar/transition/Transition.scala | 19 +++++---
 .../columnar/transition/Transitions.scala          | 40 ++++++++---------
 .../gluten/extension/injector/GlutenInjector.scala |  4 +-
 .../org/apache/gluten/iterator/Iterators.scala     |  2 +-
 .../org/apache/spark/util/SparkVersionUtil.scala   | 18 ++++----
 .../gluten/backendsapi/BackendSettingsApi.scala    |  5 +++
 .../BasicPhysicalOperatorTransformer.scala         |  2 -
 .../CartesianProductExecTransformer.scala          |  1 -
 .../gluten/execution/ColumnarCoalesceExec.scala    |  2 -
 .../gluten/execution/ColumnarToRowExecBase.scala   | 11 +++--
 .../gluten/execution/RowToColumnarExecBase.scala   |  5 ++-
 .../TakeOrderedAndProjectExecTransformer.scala     |  1 -
 .../gluten/execution/WholeStageTransformer.scala   |  2 -
 .../org/apache/gluten/extension/GlutenPlan.scala   | 30 ++++++++-----
 .../extension/columnar/MiscColumnarRules.scala     |  4 +-
 .../columnar/heuristic/ExpandFallbackPolicy.scala  |  7 ++-
 .../columnar/transition/BackendTransitions.scala   | 22 ++++++---
 .../execution/ColumnarBroadcastExchangeExec.scala  |  2 -
 .../ColumnarCollapseTransformStages.scala          |  5 +--
 .../execution/ColumnarShuffleExchangeExec.scala    |  1 -
 .../sql/execution/ColumnarWriteFilesExec.scala     | 14 +++---
 .../datasources/GlutenWriterColumnarRules.scala    |  6 ++-
 .../columnar/transition/TransitionSuite.scala      | 15 +++----
 .../columnar/transition/TransitionSuiteBase.scala  | 11 ++---
 .../sql/execution/FallbackStrategiesSuite.scala    | 20 ++++-----
 .../sql/execution/FallbackStrategiesSuite.scala    | 20 ++++-----
 .../sql/execution/FallbackStrategiesSuite.scala    | 20 ++++-----
 .../benchmarks/ParquetReadBenchmark.scala          |  3 +-
 .../sql/execution/FallbackStrategiesSuite.scala    | 20 ++++-----
 .../sql/execution/AbstractFileSourceScanExec.scala |  7 ++-
 .../sql/execution/AbstractFileSourceScanExec.scala |  7 ++-
 .../sql/execution/AbstractFileSourceScanExec.scala | 12 ++---
 .../sql/execution/AbstractFileSourceScanExec.scala | 12 ++---
 53 files changed, 360 insertions(+), 278 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 061ec9856e..f6cacba42b 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.columnarbatch.CHBatch
 import org.apache.gluten.execution.WriteFilesExecTransformer
 import org.apache.gluten.expression.WindowFunctionsBuilder
 import org.apache.gluten.extension.ValidationResult
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat._
 
@@ -34,6 +34,8 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.datasources.FileFormat
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -45,10 +47,11 @@ import java.util.Locale
 import scala.util.control.Breaks.{break, breakable}
 
 class CHBackend extends SubstraitBackend {
+  import CHBackend._
   override def name(): String = CHConf.BACKEND_NAME
-  override def defaultBatchType: Convention.BatchType = CHBatch
   override def buildInfo(): Backend.BuildInfo =
     Backend.BuildInfo("ClickHouse", CH_BRANCH, CH_COMMIT, "UNKNOWN")
+  override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
   override def iteratorApi(): IteratorApi = new CHIteratorApi
   override def sparkPlanExecApi(): SparkPlanExecApi = new CHSparkPlanExecApi
   override def transformerApi(): TransformerApi = new CHTransformerApi
@@ -59,7 +62,17 @@ class CHBackend extends SubstraitBackend {
   override def settings(): BackendSettingsApi = CHBackendSettings
 }
 
+object CHBackend {
+  private class ConvFunc() extends ConventionFunc.Override {
+    override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] 
= {
+      case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
+        CHBatch
+    }
+  }
+}
+
 object CHBackendSettings extends BackendSettingsApi with Logging {
+  override def primaryBatchType: Convention.BatchType = CHBatch
 
   private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD = 
"spark.gluten.sql.columnar.separate.scan.rdd.for.ch"
   private val GLUTEN_CLICKHOUSE_SEP_SCAN_RDD_DEFAULT = "false"
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 081e815588..f6d2b85d9d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.columnarbatch.CHBatch
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
@@ -114,7 +115,7 @@ object CHRuleApi {
         intercept(
           
SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)(
             c.session)))
-    injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, CHBatch))
 
     // Gluten columnar: Fallback policies.
     injector.injectFallbackPolicy(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 03d5aa2549..e05fd92e32 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -37,6 +37,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
 import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, 
CharVarcharUtils}
 import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, 
SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import 
org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand
 import org.apache.spark.sql.execution.datasources.{FileFormat, 
InsertIntoHadoopFsRelationCommand}
@@ -52,10 +53,9 @@ import scala.util.control.Breaks.breakable
 class VeloxBackend extends SubstraitBackend {
   import VeloxBackend._
   override def name(): String = VeloxBackend.BACKEND_NAME
-  override def defaultBatchType: Convention.BatchType = VeloxBatch
-  override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
   override def buildInfo(): Backend.BuildInfo =
     Backend.BuildInfo("Velox", VELOX_BRANCH, VELOX_REVISION, 
VELOX_REVISION_TIME)
+  override def convFuncOverride(): ConventionFunc.Override = new ConvFunc()
   override def iteratorApi(): IteratorApi = new VeloxIteratorApi
   override def sparkPlanExecApi(): SparkPlanExecApi = new VeloxSparkPlanExecApi
   override def transformerApi(): TransformerApi = new VeloxTransformerApi
@@ -72,6 +72,8 @@ object VeloxBackend {
 
   private class ConvFunc() extends ConventionFunc.Override {
     override def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] 
= {
+      case a: AdaptiveSparkPlanExec if a.supportsColumnar =>
+        VeloxBatch
       case i: InMemoryTableScanExec
           if i.supportsColumnar && i.relation.cacheBuilder.serializer
             .isInstanceOf[ColumnarCachedBatchSerializer] =>
@@ -81,13 +83,15 @@ object VeloxBackend {
 }
 
 object VeloxBackendSettings extends BackendSettingsApi {
-
   val SHUFFLE_SUPPORTED_CODEC = Set("lz4", "zstd")
   val GLUTEN_VELOX_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + 
".udfLibraryPaths"
   val GLUTEN_VELOX_DRIVER_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + 
".driver.udfLibraryPaths"
   val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + 
".internal.udfLibraryPaths"
   val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + 
".udfAllowTypeConversion"
 
+  /** The columnar-batch type this backend is by default using. */
+  override def primaryBatchType: Convention.BatchType = VeloxBatch
+
   override def validateScanExec(
       format: ReadFileFormat,
       fields: Array[StructField],
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index c5f46dae67..0a62a41baa 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.velox
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.RuleApi
+import org.apache.gluten.columnarbatch.VeloxBatch
 import org.apache.gluten.datasource.ArrowConvertorRule
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
@@ -101,7 +102,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => EliminateLocalSort)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
-    injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
 
     // Gluten columnar: Fallback policies.
     injector.injectFallbackPolicy(
@@ -187,7 +188,7 @@ object VeloxRuleApi {
     injector.injectPostTransform(_ => EliminateLocalSort)
     injector.injectPostTransform(_ => CollapseProjectExecTransformer)
     injector.injectPostTransform(c => 
FlushableHashAggregateRule.apply(c.session))
-    injector.injectPostTransform(c => InsertTransitions(c.outputsColumnar))
+    injector.injectPostTransform(c => 
InsertTransitions.create(c.outputsColumnar, VeloxBatch))
     injector.injectPostTransform(
       c => RemoveTopmostColumnarToRow(c.session, c.ac.isAdaptiveContext()))
     SparkShimLoader.getSparkShims
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index d993e399db..84de41daa0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -82,8 +82,6 @@ case class ColumnarPartialProjectExec(original: ProjectExec, 
child: SparkPlan)(
     replacedAliasUdf :: Nil
   }
 
-  final override val supportsColumnar: Boolean = true
-
   private def validateExpression(expr: Expression): Boolean = {
     expr.deterministic && !expr.isInstanceOf[LambdaFunction] && expr.children
       .forall(validateExpression)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 995582024b..5283ab61e3 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -53,7 +53,6 @@ case class VeloxResizeBatchesExec(
     "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append 
/ split batches")
   )
 
-  override def supportsColumnar: Boolean = true
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index f1f5eb9062..a2eee53660 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -214,9 +214,8 @@ case class ColumnarArrowEvalPythonExec(
   extends EvalPythonExec
   with GlutenPlan
   with KnownChildrenConventions {
-  override def supportsColumnar: Boolean = true
 
-  override protected def batchType0(): Convention.BatchType = ArrowJavaBatch
+  override def batchType(): Convention.BatchType = ArrowJavaBatch
 
   override def requiredChildrenConventions(): Seq[ConventionReq] = List(
     ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(ArrowJavaBatch)))
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
index 85bc682234..16b8fb0e9f 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/ArrowFileSourceScanExec.scala
@@ -33,8 +33,6 @@ case class ArrowFileSourceScanExec(original: 
FileSourceScanExec)
 
   override def output: Seq[Attribute] = original.output
 
-  override def supportsColumnar: Boolean = original.supportsColumnar
-
   override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 38a6d1803d..6617e8b138 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -21,8 +21,7 @@ import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
 trait BaseArrowScanExec extends GlutenPlan {
-
-  final override protected def batchType0(): Convention.BatchType = {
+  final override def batchType(): Convention.BatchType = {
     ArrowBatches.ArrowJavaBatch
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
index d12faae0f7..e14ffd43d8 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/transition/VeloxTransitionSuite.scala
@@ -33,37 +33,37 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Vanilla C2R - outputs row") {
     val in = BatchLeaf(VanillaBatch)
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == ColumnarToRowExec(BatchLeaf(VanillaBatch)))
   }
 
   test("Vanilla C2R - requires row input") {
     val in = RowUnary(BatchLeaf(VanillaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(VanillaBatch))))
   }
 
   test("Vanilla R2C - requires vanilla input") {
     val in = BatchUnary(VanillaBatch, RowLeaf())
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
RowToColumnarExec(RowLeaf()))))
   }
 
   test("ArrowNative C2R - outputs row") {
     val in = BatchLeaf(ArrowNativeBatch)
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == 
ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch))))
   }
 
   test("ArrowNative C2R - requires row input") {
     val in = RowUnary(BatchLeaf(ArrowNativeBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == 
RowUnary(ColumnarToRowExec(LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
   }
 
   test("ArrowNative R2C - requires Arrow input") {
     val in = BatchUnary(ArrowNativeBatch, RowLeaf())
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         LoadArrowDataExec(BatchUnary(ArrowNativeBatch, 
RowToVeloxColumnarExec(RowLeaf())))))
@@ -71,7 +71,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("ArrowNative-to-Velox C2C") {
     val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowNativeBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     // No explicit transition needed for ArrowNative-to-Velox.
     // FIXME: Add explicit transitions.
     //  See https://github.com/apache/incubator-gluten/issues/7313.
@@ -82,7 +82,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Velox-to-ArrowNative C2C") {
     val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VeloxBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         LoadArrowDataExec(BatchUnary(ArrowNativeBatch, 
BatchLeaf(VeloxBatch)))))
@@ -90,7 +90,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Vanilla-to-ArrowNative C2C") {
     val in = BatchUnary(ArrowNativeBatch, BatchLeaf(VanillaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         LoadArrowDataExec(BatchUnary(
@@ -100,7 +100,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("ArrowNative-to-Vanilla C2C") {
     val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowNativeBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         BatchUnary(VanillaBatch, 
LoadArrowDataExec(BatchLeaf(ArrowNativeBatch)))))
@@ -108,19 +108,19 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("ArrowJava C2R - outputs row") {
     val in = BatchLeaf(ArrowJavaBatch)
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == ColumnarToRowExec(BatchLeaf(ArrowJavaBatch)))
   }
 
   test("ArrowJava C2R - requires row input") {
     val in = RowUnary(BatchLeaf(ArrowJavaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == RowUnary(ColumnarToRowExec(BatchLeaf(ArrowJavaBatch))))
   }
 
   test("ArrowJava R2C - requires Arrow input") {
     val in = BatchUnary(ArrowJavaBatch, RowLeaf())
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         BatchUnary(ArrowJavaBatch, 
LoadArrowDataExec(RowToVeloxColumnarExec(RowLeaf())))))
@@ -128,7 +128,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("ArrowJava-to-Velox C2C") {
     val in = BatchUnary(VeloxBatch, BatchLeaf(ArrowJavaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
         BatchUnary(
@@ -138,7 +138,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Velox-to-ArrowJava C2C") {
     val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VeloxBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         BatchUnary(ArrowJavaBatch, LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
@@ -146,7 +146,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Vanilla-to-ArrowJava C2C") {
     val in = BatchUnary(ArrowJavaBatch, BatchLeaf(VanillaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(
         BatchUnary(
@@ -156,37 +156,37 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("ArrowJava-to-Vanilla C2C") {
     val in = BatchUnary(VanillaBatch, BatchLeaf(ArrowJavaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
BatchLeaf(ArrowJavaBatch))))
   }
 
   test("Velox C2R - outputs row") {
     val in = BatchLeaf(VeloxBatch)
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == VeloxColumnarToRowExec(BatchLeaf(VeloxBatch)))
   }
 
   test("Velox C2R - requires row input") {
     val in = RowUnary(BatchLeaf(VeloxBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == RowUnary(VeloxColumnarToRowExec(BatchLeaf(VeloxBatch))))
   }
 
   test("Velox R2C - outputs Velox") {
     val in = RowLeaf()
-    val out = Transitions.insertTransitions(in, outputsColumnar = true)
+    val out = BackendTransitions.insert(in, outputsColumnar = true)
     assert(out == RowToVeloxColumnarExec(RowLeaf()))
   }
 
   test("Velox R2C - requires Velox input") {
     val in = BatchUnary(VeloxBatch, RowLeaf())
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(out == VeloxColumnarToRowExec(BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(RowLeaf()))))
   }
 
   test("Vanilla-to-Velox C2C") {
     val in = BatchUnary(VeloxBatch, BatchLeaf(VanillaBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == VeloxColumnarToRowExec(
         BatchUnary(VeloxBatch, 
RowToVeloxColumnarExec(ColumnarToRowExec(BatchLeaf(VanillaBatch))))))
@@ -194,7 +194,7 @@ class VeloxTransitionSuite extends SharedSparkSession {
 
   test("Velox-to-Vanilla C2C") {
     val in = BatchUnary(VanillaBatch, BatchLeaf(VeloxBatch))
-    val out = Transitions.insertTransitions(in, outputsColumnar = false)
+    val out = BackendTransitions.insert(in, outputsColumnar = false)
     assert(
       out == ColumnarToRowExec(BatchUnary(VanillaBatch, 
LoadArrowDataExec(BatchLeaf(VeloxBatch)))))
   }
diff --git a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala 
b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
index 346181e140..f406a6ac4d 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/backend/Backend.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.backend
 
-import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionFunc}
+import org.apache.gluten.extension.columnar.transition.ConventionFunc
 import org.apache.gluten.extension.injector.Injector
 
 import org.apache.spark.SparkContext
@@ -39,9 +39,6 @@ trait Backend {
   def onExecutorStart(pc: PluginContext): Unit = {}
   def onExecutorShutdown(): Unit = {}
 
-  /** The columnar-batch type this backend is by default using. */
-  def defaultBatchType: Convention.BatchType
-
   /**
    * Overrides 
[[org.apache.gluten.extension.columnar.transition.ConventionFunc]] Gluten is 
using to
    * determine the convention (its row-based processing / columnar-batch 
processing support) of a
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index d0eacc1e4d..7ca4b36b06 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -17,8 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.Convention.KnownBatchType
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 import org.apache.gluten.iterator.Iterators
 
 import org.apache.spark.rdd.RDD
@@ -32,8 +30,9 @@ import java.util.concurrent.atomic.AtomicLong
 
 abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: 
Convention.BatchType)
   extends ColumnarToColumnarTransition
-  with KnownBatchType
-  with KnownChildrenConventions {
+  with Convention.KnownBatchType
+  with Convention.KnownRowTypeForSpark33AndLater
+  with ConventionReq.KnownChildrenConventions {
 
   def child: SparkPlan
   protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch]
@@ -47,8 +46,20 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
       "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
convert batches")
     )
 
-  override def supportsColumnar: Boolean = true
+  final override val supportsColumnar: Boolean = {
+    batchType() != Convention.BatchType.None
+  }
+
   override def batchType(): Convention.BatchType = to
+
+  final override val supportsRowBased: Boolean = {
+    rowType() != Convention.RowType.None
+  }
+
+  override def rowType0(): Convention.RowType = {
+    Convention.RowType.None
+  }
+
   override def requiredChildrenConventions(): Seq[ConventionReq] = List(
     ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(from)))
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
index 3378344253..5b440302a0 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenColumnarRule.scala
@@ -31,11 +31,9 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
 object GlutenColumnarRule {
-
   // Utilities to infer columnar rule's caller's property:
   // ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
-
-  case class DummyRowOutputExec(override val child: SparkPlan) extends 
UnaryExecNode {
+  private case class DummyRowOutputExec(override val child: SparkPlan) extends 
UnaryExecNode {
     override def supportsColumnar: Boolean = false
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
@@ -47,7 +45,7 @@ object GlutenColumnarRule {
       copy(child = newChild)
   }
 
-  case class DummyColumnarOutputExec(override val child: SparkPlan) extends 
UnaryExecNode {
+  private case class DummyColumnarOutputExec(override val child: SparkPlan) 
extends UnaryExecNode {
     override def supportsColumnar: Boolean = true
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
@@ -99,9 +97,8 @@ case class GlutenColumnarRule(
           "This should not happen. Please leave an issue at" +
             " https://github.com/apache/incubator-gluten.";)
     }
-    val vanillaPlan = Transitions.insertTransitions(originalPlan, 
outputsColumnar)
+    val vanillaPlan = Transitions.insert(originalPlan, outputsColumnar)
     val applier = applierBuilder.apply(session)
     applier.apply(vanillaPlan, outputsColumnar)
   }
-
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index fad0ae386c..67399e25d4 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -56,10 +56,10 @@ case class EnumeratedTransform(costModel: 
CostModel[SparkPlan], rules: Seq[RasRu
       .create()
   }
 
-  private val reqConvention = Conv.any
+  private val convReq = Conv.any
 
   override def apply(plan: SparkPlan): SparkPlan = {
-    val constraintSet = PropertySet(List(reqConvention))
+    val constraintSet = PropertySet(Seq(convReq))
     val planner = optimization.newPlanner(plan, constraintSet)
     val out = planner.plan()
     out
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
index 1cfe132d84..393ac35de4 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/cost/LongCostModel.scala
@@ -69,19 +69,20 @@ object LongCostModel extends Logging {
    */
   sealed trait Kind {
     import Kind._
-    values.synchronized {
+    all.synchronized {
       val n = name()
-      if (values.contains(n)) {
+      if (all.contains(n)) {
         throw new GlutenException(s"Cost mode kind $n already registered")
       }
-      values += n -> this
+      all += n -> this
     }
 
     def name(): String
   }
 
   object Kind {
-    val values: mutable.Map[String, Kind] = mutable.Map()
+    private val all: mutable.Map[String, Kind] = mutable.Map()
+    def values(): Map[String, Kind] = all.toMap
   }
 
   /**
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index fa92eacd4d..568ea50396 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -19,7 +19,6 @@ package 
org.apache.gluten.extension.columnar.enumerated.planner.plan
 import 
org.apache.gluten.extension.columnar.enumerated.planner.metadata.GlutenMetadata
 import org.apache.gluten.extension.columnar.enumerated.planner.property.{Conv, 
ConvDef}
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.Convention.{KnownBatchType, 
KnownRowType}
 import org.apache.gluten.ras.{Metadata, PlanModel}
 import org.apache.gluten.ras.property.PropertySet
 import org.apache.gluten.sql.shims.SparkShimLoader
@@ -43,17 +42,13 @@ object GlutenPlanModel {
       metadata: GlutenMetadata,
       constraintSet: PropertySet[SparkPlan])
     extends LeafExecNode
-    with KnownBatchType
-    with KnownRowType {
+    with Convention.KnownBatchType
+    with Convention.KnownRowTypeForSpark33AndLater {
     private val req: Conv.Req = 
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
 
     override protected def doExecute(): RDD[InternalRow] = throw new 
IllegalStateException()
     override def output: Seq[Attribute] = metadata.schema().output
 
-    override def supportsColumnar: Boolean = {
-      batchType != Convention.BatchType.None
-    }
-
     override val batchType: Convention.BatchType = {
       val out = req.req.requiredBatchType match {
         case ConventionReq.BatchType.Any => Convention.BatchType.None
@@ -62,13 +57,21 @@ object GlutenPlanModel {
       out
     }
 
-    override val rowType: Convention.RowType = {
+    final override val supportsColumnar: Boolean = {
+      batchType != Convention.BatchType.None
+    }
+
+    override val rowType0: Convention.RowType = {
       val out = req.req.requiredRowType match {
         case ConventionReq.RowType.Any => Convention.RowType.None
         case ConventionReq.RowType.Is(r) => r
       }
       out
     }
+
+    final override val supportsRowBased: Boolean = {
+      rowType() != Convention.RowType.None
+    }
   }
 
   private object PlanModelImpl extends PlanModel[SparkPlan] {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
index 831b212e1f..7b2b801ac9 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
@@ -29,6 +29,18 @@ sealed trait Conv extends Property[SparkPlan] {
   }
 
   override def satisfies(other: Property[SparkPlan]): Boolean = {
+    // The following enforces strict type checking against `this` and `other`
+    // to make sure:
+    //
+    //  1. `this`, which came from user implementation of 
PropertyDef.getProperty, must be a `Prop`
+    //  2. `other` which came from user implementation of 
PropertyDef.getChildrenConstraints,
+    //     must be a `Req`
+    //
+    // If the user implementation doesn't follow the criteria, cast error will 
be thrown.
+    //
+    // This can be a common practice to implement a safe Property for RAS.
+    //
+    // TODO: Add a similar case to RAS UTs.
     val req = other.asInstanceOf[Req]
     if (req.isAny) {
       return true
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index 840b62fb67..b57f3e0c0a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -16,10 +16,15 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
+import org.apache.gluten.exception.GlutenException
+
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
+import org.apache.spark.util.SparkVersionUtil
 
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable
+
 /**
  * Convention of a query plan consists of the row data type and columnar data 
type it supports to
  * output.
@@ -74,6 +79,7 @@ object Convention {
   }
 
   trait BatchType extends TransitionGraph.Vertex with Serializable {
+    import BatchType._
     private val initialized: AtomicBoolean = new AtomicBoolean(false)
 
     final def ensureRegistered(): Unit = {
@@ -84,7 +90,8 @@ object Convention {
       register()
     }
 
-    final private def register(): Unit = {
+    final private def register(): Unit = BatchType.synchronized {
+      assert(all.add(this))
       Transition.graph.addVertex(this)
       registerTransitions()
     }
@@ -117,6 +124,8 @@ object Convention {
   }
 
   object BatchType {
+    private val all: mutable.Set[BatchType] = mutable.Set()
+    def values(): Set[BatchType] = all.toSet
     // None indicates that the plan doesn't support batch-based processing.
     final case object None extends BatchType {
       override protected[this] def registerTransitions(): Unit = {}
@@ -133,7 +142,46 @@ object Convention {
     def batchType(): BatchType
   }
 
-  trait KnownRowType {
+  sealed trait KnownRowType extends KnownRowType.SupportsRowBasedCompatible {
     def rowType(): RowType
   }
+
+  object KnownRowType {
+    // To be compatible with Spark (version < 3.3)
+    sealed trait SupportsRowBasedCompatible {
+      def supportsRowBased(): Boolean = {
+        throw new GlutenException("Illegal state: The method is not expected 
to be called")
+      }
+    }
+  }
+
+  trait KnownRowTypeForSpark33AndLater extends KnownRowType {
+    this: SparkPlan =>
+    import KnownRowTypeForSpark33AndLater._
+
+    final override def rowType(): RowType = {
+      if (lteSpark32) {
+        // It's known that in Spark 3.2, one Spark plan node is considered 
either only having
+        // row-based support or only having columnar support at a time.
+        // Hence, if the plan supports columnar output, we'd disable its 
row-based support.
+        // The same for the opposite.
+        if (supportsColumnar) {
+          Convention.RowType.None
+        } else {
+          Convention.RowType.VanillaRow
+        }
+      } else {
+        rowType0()
+      }
+    }
+
+    def rowType0(): RowType
+  }
+
+  object KnownRowTypeForSpark33AndLater {
+    private val lteSpark32: Boolean = {
+      val v = SparkVersionUtil.majorMinorVersion()
+      SparkVersionUtil.compareMajorMinorVersion(v, (3, 2)) <= 0
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index c3feefe943..5cb3d44a15 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -21,7 +21,7 @@ import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildr
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.QueryStageExec
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
 import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 
@@ -89,17 +89,6 @@ object ConventionFunc {
         }
       case q: QueryStageExec => conventionOf0(q.plan)
       case r: ReusedExchangeExec => conventionOf0(r.child)
-      case a: AdaptiveSparkPlanExec =>
-        val rowType = rowTypeOf(a)
-        val batchType = if (a.supportsColumnar) {
-          // By default, we execute columnar AQE with backend batch output.
-          // See 
org.apache.gluten.extension.columnar.transition.InsertTransitions.apply
-          Backend.get().defaultBatchType
-        } else {
-          Convention.BatchType.None
-        }
-        val conv = Convention.of(rowType, batchType)
-        conv
       case other =>
         val conv = Convention.of(rowTypeOf(other), batchTypeOf(other))
         conv
@@ -119,12 +108,24 @@ object ConventionFunc {
         case _ =>
           Convention.RowType.None
       }
-      assert(
-        out == Convention.RowType.None || 
plan.isInstanceOf[Convention.KnownRowType] ||
-          SparkShimLoader.getSparkShims.supportsRowBased(plan))
+      checkRowType(plan, out)
       out
     }
 
+    private def checkRowType(plan: SparkPlan, rowType: Convention.RowType): 
Unit = {
+      if (SparkShimLoader.getSparkShims.supportsRowBased(plan)) {
+        assert(
+          rowType != Convention.RowType.None,
+          s"Plan ${plan.nodeName} supports row-based execution, " +
+            s"however #rowTypeOf returns None")
+      } else {
+        assert(
+          rowType == Convention.RowType.None,
+          s"Plan ${plan.nodeName} doesn't support row-based " +
+            s"execution, however #rowTypeOf returns $rowType")
+      }
+    }
+
     private def batchTypeOf(plan: SparkPlan): Convention.BatchType = {
       val out = o.batchTypeOf.applyOrElse(plan, batchTypeOf0)
       out
@@ -139,10 +140,24 @@ object ConventionFunc {
         case _ =>
           Convention.BatchType.None
       }
-      assert(out == Convention.BatchType.None || plan.supportsColumnar)
+      checkBatchType(plan, out)
       out
     }
 
+    private def checkBatchType(plan: SparkPlan, batchType: 
Convention.BatchType): Unit = {
+      if (plan.supportsColumnar) {
+        assert(
+          batchType != Convention.BatchType.None,
+          s"Plan ${plan.nodeName} supports columnar " +
+            s"execution, however #batchTypeOf returns None")
+      } else {
+        assert(
+          batchType == Convention.BatchType.None,
+          s"Plan ${plan.nodeName} doesn't support " +
+            s"columnar execution, however #batchTypeOf returns $batchType")
+      }
+    }
+
     override def conventionReqOf(plan: SparkPlan): ConventionReq = {
       val req = o.conventionReqOf.applyOrElse(plan, conventionReqOf0)
       req
@@ -169,14 +184,15 @@ object ConventionFunc {
         // To align with 
ApplyColumnarRulesAndInsertTransitions#insertTransitions
         ConventionReq.any
       case u: UnionExec =>
-        // We force vanilla union to output row data to get best compatibility 
with vanilla Spark.
+        // We force vanilla union to output row data to get the best 
compatibility with vanilla
+        // Spark.
         // As a result it's a common practice to rewrite it with GlutenPlan 
for offloading.
         ConventionReq.of(
           ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
           ConventionReq.BatchType.Any)
       case other =>
         // In the normal case, children's convention should follow parent 
node's convention.
-        // Note, we don't have consider C2R / R2C here since they are already 
removed by
+        // Note, we don't have to consider C2R / R2C here since they are 
already removed by
         // RemoveTransitions.
         val thisConv = conventionOf0(other)
         thisConv.asReq()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index ce613bf7db..a081f21434 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backend.Backend
-
 import org.apache.spark.sql.execution.SparkPlan
 
 /**
@@ -53,15 +51,14 @@ object ConventionReq {
       override val requiredBatchType: BatchType
   ) extends ConventionReq
 
-  val any: ConventionReq = Impl(RowType.Any, BatchType.Any)
-  val row: ConventionReq = Impl(RowType.Is(Convention.RowType.VanillaRow), 
BatchType.Any)
-  val vanillaBatch: ConventionReq =
-    Impl(RowType.Any, BatchType.Is(Convention.BatchType.VanillaBatch))
-  lazy val backendBatch: ConventionReq =
-    Impl(RowType.Any, BatchType.Is(Backend.get().defaultBatchType))
+  val any: ConventionReq = of(RowType.Any, BatchType.Any)
+  val row: ConventionReq = ofRow(RowType.Is(Convention.RowType.VanillaRow))
+  val vanillaBatch: ConventionReq = 
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
 
   def get(plan: SparkPlan): ConventionReq = 
ConventionFunc.create().conventionReqOf(plan)
   def of(rowType: RowType, batchType: BatchType): ConventionReq = 
Impl(rowType, batchType)
+  def ofRow(rowType: RowType): ConventionReq = Impl(rowType, BatchType.Any)
+  def ofBatch(batchType: BatchType): ConventionReq = Impl(RowType.Any, 
batchType)
 
   trait KnownChildrenConventions {
     def requiredChildrenConventions(): Seq[ConventionReq]
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
index ced9378ad6..0a7f635b8b 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transition.scala
@@ -101,22 +101,27 @@ object Transition {
           case (ConventionReq.RowType.Is(toRowType), 
ConventionReq.BatchType.Any) =>
             from.rowType match {
               case Convention.RowType.None =>
+                // Input query plan doesn't have recognizable row-based output,
+                // find columnar-to-row transition.
                 graph.transitionOfOption(from.batchType, 
toRowType).getOrElse(orElse)
-              case fromRowType =>
+              case fromRowType if toRowType == fromRowType =>
                 // We have only one single built-in row type.
-                assert(toRowType == fromRowType)
                 Transition.empty
+              case _ =>
+                throw new UnsupportedOperationException(
+                  "Row-to-row transition is not yet supported")
             }
           case (ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(toBatchType)) =>
             from.batchType match {
               case Convention.BatchType.None =>
+                // Input query plan doesn't have recognizable columnar output,
+                // find row-to-columnar transition.
                 graph.transitionOfOption(from.rowType, 
toBatchType).getOrElse(orElse)
+              case fromBatchType if toBatchType == fromBatchType =>
+                Transition.empty
               case fromBatchType =>
-                if (toBatchType == fromBatchType) {
-                  Transition.empty
-                } else {
-                  graph.transitionOfOption(fromBatchType, 
toBatchType).getOrElse(orElse)
-                }
+                // Find columnar-to-columnar transition.
+                graph.transitionOfOption(fromBatchType, 
toBatchType).getOrElse(orElse)
             }
           case (ConventionReq.RowType.Any, ConventionReq.BatchType.Any) =>
             Transition.empty
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 2f2840b52b..10d50f453d 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -16,24 +16,22 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.backend.Backend
+import org.apache.gluten.extension.columnar.transition.Convention.BatchType
 
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
 
 import scala.annotation.tailrec
 
-case class InsertTransitions(outputsColumnar: Boolean) extends Rule[SparkPlan] 
{
+case class InsertTransitions(convReq: ConventionReq) extends Rule[SparkPlan] {
   private val convFunc = ConventionFunc.create()
 
   override def apply(plan: SparkPlan): SparkPlan = {
     // Remove all transitions at first.
     val removed = RemoveTransitions.apply(plan)
     val filled = fillWithTransitions(removed)
-    if (!outputsColumnar) {
-      return Transitions.toRowPlan(filled)
-    }
-    Transitions.toBackendBatchPlan(filled)
+    val out = Transitions.enforceReq(filled, convReq)
+    out
   }
 
   private def fillWithTransitions(plan: SparkPlan): SparkPlan = 
plan.transformUp {
@@ -63,6 +61,17 @@ case class InsertTransitions(outputsColumnar: Boolean) 
extends Rule[SparkPlan] {
   }
 }
 
+object InsertTransitions {
+  def create(outputsColumnar: Boolean, batchType: BatchType): 
InsertTransitions = {
+    val conventionReq = if (outputsColumnar) {
+      ConventionReq.ofBatch(ConventionReq.BatchType.Is(batchType))
+    } else {
+      ConventionReq.row
+    }
+    InsertTransitions(conventionReq)
+  }
+}
+
 object RemoveTransitions extends Rule[SparkPlan] {
   override def apply(plan: SparkPlan): SparkPlan = plan.transformDown { case p 
=> removeForNode(p) }
 
@@ -76,8 +85,8 @@ object RemoveTransitions extends Rule[SparkPlan] {
 }
 
 object Transitions {
-  def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan 
= {
-    InsertTransitions(outputsColumnar).apply(plan)
+  def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+    InsertTransitions.create(outputsColumnar, 
BatchType.VanillaBatch).apply(plan)
   }
 
   def toRowPlan(plan: SparkPlan): SparkPlan = {
@@ -88,24 +97,13 @@ object Transitions {
         ConventionReq.BatchType.Any))
   }
 
-  def toBackendBatchPlan(plan: SparkPlan): SparkPlan = {
-    val backendBatchType = Backend.get().defaultBatchType
-    val out = toBatchPlan(plan, backendBatchType)
-    out
-  }
-
-  def toVanillaBatchPlan(plan: SparkPlan): SparkPlan = {
-    val out = toBatchPlan(plan, Convention.BatchType.VanillaBatch)
-    out
-  }
-
-  private def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType): 
SparkPlan = {
+  def toBatchPlan(plan: SparkPlan, toBatchType: Convention.BatchType): 
SparkPlan = {
     enforceReq(
       plan,
       ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(toBatchType)))
   }
 
-  private def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
+  def enforceReq(plan: SparkPlan, req: ConventionReq): SparkPlan = {
     val convFunc = ConventionFunc.create()
     val removed = RemoveTransitions.removeForNode(plan)
     val transition = Transition
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
index 89314dfcf3..efe584d441 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/injector/GlutenInjector.scala
@@ -150,8 +150,8 @@ object GlutenInjector {
     private def findCostModel(
         registry: LongCostModel.Registry,
         aliasOrClass: String): CostModel[SparkPlan] = {
-      if (LongCostModel.Kind.values.contains(aliasOrClass)) {
-        val kind = LongCostModel.Kind.values(aliasOrClass)
+      if (LongCostModel.Kind.values().contains(aliasOrClass)) {
+        val kind = LongCostModel.Kind.values()(aliasOrClass)
         val model = registry.get(kind)
         return model
       }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala 
b/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
index 2de1c7b4ed..ef8ca2c974 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/iterator/Iterators.scala
@@ -41,7 +41,7 @@ object Iterators {
   }
 
   def wrap[A](in: Iterator[A]): WrapperBuilder[A] = {
-    wrap(V1, in)
+    wrap(DEFAULT_VERSION, in)
   }
 
   def wrap[A](version: Version, in: Iterator[A]): WrapperBuilder[A] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
similarity index 64%
copy from 
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
index 38a6d1803d..6864d3caa0 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ b/gluten-core/src/main/scala/org/apache/spark/util/SparkVersionUtil.scala
@@ -14,15 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution
+package org.apache.spark.util
 
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
-
-trait BaseArrowScanExec extends GlutenPlan {
+object SparkVersionUtil {
+  def majorMinorVersion(): (Int, Int) = {
+    VersionUtils.majorMinorVersion(org.apache.spark.SPARK_VERSION)
+  }
 
-  final override protected def batchType0(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
+  // Returns X. X < 0 if one < other, x == 0 if one == other, x > 0 if one > 
other.
+  def compareMajorMinorVersion(one: (Int, Int), other: (Int, Int)): Int = {
+    val base = 1000
+    assert(one._2 < base && other._2 < base)
+    one._1 * base + one._2 - (other._1 * base + other._2)
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 506424b79c..1eb69da6e5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.catalyst.catalog.BucketSpec
@@ -28,6 +29,10 @@ import 
org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF
 import org.apache.spark.sql.types.StructField
 
 trait BackendSettingsApi {
+
+  /** The columnar-batch type this backend is by default using. */
+  def primaryBatchType: Convention.BatchType
+
   def validateScanExec(
       format: ReadFileFormat,
       fields: Array[StructField],
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 76505da3e0..dbe667ebb2 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -263,8 +263,6 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) 
extends GlutenPlan {
     case _ =>
   }
 
-  override def supportsColumnar: Boolean = true
-
   override def output: Seq[Attribute] = {
     children.map(_.output).transpose.map {
       attrs =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 7f7e54e9c7..28fb691896 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -45,7 +45,6 @@ import java.io.{IOException, ObjectOutputStream}
  */
 case class ColumnarCartesianProductBridge(child: SparkPlan) extends 
UnaryExecNode with GlutenPlan {
   override def output: Seq[Attribute] = child.output
-  override def supportsColumnar: Boolean = true
   override protected def doExecute(): RDD[InternalRow] =
     throw new UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
index f40a7f8f07..3b13207c93 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
@@ -30,8 +30,6 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: 
SparkPlan)
   extends UnaryExecNode
   with GlutenPlan {
 
-  override def supportsColumnar: Boolean = true
-
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index fd86106bf3..fae3115981 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.ConventionReq
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 
 import org.apache.spark.broadcast.Broadcast
@@ -43,6 +43,10 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
 
   final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def batchType(): Convention.BatchType = Convention.BatchType.None
+
+  override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+
   override def doExecuteBroadcast[T](): Broadcast[T] = {
     // Require for explicit implementation, otherwise throw error.
     super.doExecuteBroadcast[T]()
@@ -55,7 +59,8 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
   }
 
   override def requiredChildrenConventions(): Seq[ConventionReq] = {
-    List(ConventionReq.backendBatch)
+    List(
+      ConventionReq.ofBatch(
+        
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
   }
-
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
index f4dd160b58..2a52616361 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
@@ -45,6 +46,8 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
 
   final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   final override def doExecute(): RDD[InternalRow] = {
     child.execute()
   }
@@ -54,8 +57,6 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
     super.doExecuteBroadcast[T]()
   }
 
-  final override def supportsColumnar: Boolean = true
-
   def doExecuteColumnarInternal(): RDD[ColumnarBatch]
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index c4e192d183..c960bda249 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -43,7 +43,6 @@ case class TakeOrderedAndProjectExecTransformer(
   with GlutenPlan {
   override def outputPartitioning: Partitioning = SinglePartition
   override def outputOrdering: Seq[SortOrder] = sortOrder
-  override def supportsColumnar: Boolean = true
 
   override def output: Seq[Attribute] = {
     projectList.map(_.toAttribute)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index beb7fe5f99..e8a42883a5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -60,8 +60,6 @@ trait TransformSupport extends GlutenPlan {
       s"${this.getClass.getSimpleName} doesn't support doExecute")
   }
 
-  final override lazy val supportsColumnar: Boolean = true
-
   /**
    * Returns all the RDDs of ColumnarBatch which generates the input rows.
    *
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index a6ca9b1dcc..3639ac522f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -17,11 +17,11 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.expression.TransformerState
 import org.apache.gluten.extension.columnar.FallbackTag
+import org.apache.gluten.extension.columnar.FallbackTag.{Appendable, Converter}
 import org.apache.gluten.extension.columnar.FallbackTags.add
 import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.extension.columnar.validator.Validator
@@ -34,7 +34,6 @@ import org.apache.gluten.test.TestStats
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.execution.SparkPlan
 
-import FallbackTag.{Appendable, Converter}
 import com.google.common.collect.Lists
 
 sealed trait ValidationResult {
@@ -83,7 +82,11 @@ object ValidationResult {
 }
 
 /** Every Gluten Operator should extend this trait. */
-trait GlutenPlan extends SparkPlan with Convention.KnownBatchType with 
LogLevelUtil {
+trait GlutenPlan
+  extends SparkPlan
+  with Convention.KnownBatchType
+  with Convention.KnownRowTypeForSpark33AndLater
+  with LogLevelUtil {
   protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
 
   protected def glutenConf: GlutenConfig = GlutenConfig.getConf
@@ -126,17 +129,20 @@ trait GlutenPlan extends SparkPlan with 
Convention.KnownBatchType with LogLevelU
     }
   }
 
-  final override def batchType(): Convention.BatchType = {
-    if (!supportsColumnar) {
-      return Convention.BatchType.None
-    }
-    val batchType = batchType0()
-    assert(batchType != Convention.BatchType.None)
-    batchType
+  final override val supportsColumnar: Boolean = {
+    batchType() != Convention.BatchType.None
+  }
+
+  override def batchType(): Convention.BatchType = {
+    BackendsApiManager.getSettings.primaryBatchType
+  }
+
+  final override val supportsRowBased: Boolean = {
+    rowType() != Convention.RowType.None
   }
 
-  protected def batchType0(): Convention.BatchType = {
-    Backend.get().defaultBatchType
+  override def rowType0(): Convention.RowType = {
+    Convention.RowType.None
   }
 
   protected def doValidateInternal(): ValidationResult = 
ValidationResult.succeeded
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index a199b5920c..e11c613954 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.extension.columnar
 
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
Transitions}
 import org.apache.gluten.utils.PlanUtil
 
@@ -106,7 +107,8 @@ object MiscColumnarRules {
 
     private def toColumnarBroadcastExchange(
         exchange: BroadcastExchangeExec): ColumnarBroadcastExchangeExec = {
-      val newChild = Transitions.toBackendBatchPlan(exchange.child)
+      val newChild =
+        Transitions.toBatchPlan(exchange.child, 
BackendsApiManager.getSettings.primaryBatchType)
       ColumnarBroadcastExchangeExec(exchange.mode, newChild)
     }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
index e1c8b6f41f..44ed81f565 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.{FallbackTag, FallbackTags}
 import org.apache.gluten.extension.columnar.FallbackTags.add
-import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike, Transitions}
+import org.apache.gluten.extension.columnar.transition.{BackendTransitions, 
ColumnarToRowLike, RowToColumnarLike}
 import org.apache.gluten.utils.PlanUtil
 
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -32,6 +32,9 @@ import 
org.apache.spark.sql.execution.command.ExecutedCommandExec
 import org.apache.spark.sql.execution.exchange.Exchange
 
 
+
+
+
 // format: off
 /**
  * Note, this rule should only fallback to row-based plan if there is no harm.
@@ -226,7 +229,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean, 
originalPlan: SparkP
       case _ =>
     }
 
-    val planWithTransitions = Transitions.insertTransitions(originalPlan, 
outputsColumnar)
+    val planWithTransitions = BackendTransitions.insert(originalPlan, 
outputsColumnar)
     planWithTransitions
   }
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
similarity index 56%
copy from 
backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
copy to 
gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
index 38a6d1803d..86d4b40d55 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/transition/BackendTransitions.scala
@@ -14,15 +14,23 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.execution
+package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.backendsapi.BackendsApiManager
 
-trait BaseArrowScanExec extends GlutenPlan {
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
 
-  final override protected def batchType0(): Convention.BatchType = {
-    ArrowBatches.ArrowJavaBatch
+case class InsertBackendTransitions(outputsColumnar: Boolean) extends 
Rule[SparkPlan] {
+  def apply(plan: SparkPlan): SparkPlan = {
+    InsertTransitions
+      .create(outputsColumnar, BackendsApiManager.getSettings.primaryBatchType)
+      .apply(plan)
+  }
+}
+
+object BackendTransitions {
+  def insert(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
+    InsertBackendTransitions(outputsColumnar)(plan)
   }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index d55733fe4e..01a4380a14 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -121,8 +121,6 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
   @transient
   private val timeout: Long = SQLConf.get.broadcastTimeout
 
-  override def supportsColumnar: Boolean = true
-
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 5ea5e4159d..9ec078e003 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backend.Backend
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution._
 import org.apache.gluten.extension.columnar.transition.Convention
@@ -178,9 +177,9 @@ case class ColumnarInputAdapter(child: SparkPlan)
   extends InputAdapterGenerateTreeStringShim
   with Convention.KnownBatchType {
   override def output: Seq[Attribute] = child.output
-  override def supportsColumnar: Boolean = true
+  override val supportsColumnar: Boolean = true
   override def batchType(): Convention.BatchType =
-    Backend.get().defaultBatchType
+    BackendsApiManager.getSettings.primaryBatchType
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
   override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index 4f62377b09..d4b33be292 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -126,7 +126,6 @@ case class ColumnarShuffleExchangeExec(
 
   override def nodeName: String = "ColumnarExchange"
 
-  override def supportsColumnar: Boolean = true
   override def numMappers: Int = shuffleDependency.rdd.getNumPartitions
 
   override def numPartitions: Int = shuffleDependency.partitioner.numPartitions
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index fcd82d8c19..25d6c4ed61 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenException
 import org.apache.gluten.extension.GlutenPlan
-import 
org.apache.gluten.extension.columnar.transition.Convention.{KnownRowType, 
RowType}
+import org.apache.gluten.extension.columnar.transition.Convention.RowType
 import org.apache.gluten.extension.columnar.transition.ConventionReq
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.TaskContext
@@ -45,18 +44,17 @@ abstract class ColumnarWriteFilesExec protected (
     override val right: SparkPlan)
   extends BinaryExecNode
   with GlutenPlan
-  with KnownChildrenConventions
-  with KnownRowType
+  with ConventionReq.KnownChildrenConventions
   with ColumnarWriteFilesExec.ExecuteWriteCompatible {
 
   val child: SparkPlan = left
 
   override lazy val references: AttributeSet = AttributeSet.empty
 
-  override def supportsColumnar: Boolean = true
-
   override def requiredChildrenConventions(): Seq[ConventionReq] = {
-    List(ConventionReq.backendBatch)
+    List(
+      ConventionReq.ofBatch(
+        
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
   }
 
   /**
@@ -69,7 +67,7 @@ abstract class ColumnarWriteFilesExec protected (
    *
    * Since https://github.com/apache/incubator-gluten/pull/6745.
    */
-  override def rowType(): RowType = {
+  override def rowType0(): RowType = {
     RowType.VanillaRow
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 917f6c03df..fb42c55ba0 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.ColumnarToRowExecBase
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Transitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
Transitions}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SparkSession
@@ -61,6 +61,8 @@ case class FakeRowAdaptor(child: SparkPlan)
 
   override def output: Seq[Attribute] = child.output
 
+  override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
+
   override protected def doExecute(): RDD[InternalRow] = {
     doExecuteColumnar().map(cb => new FakeRow(cb))
   }
@@ -74,7 +76,7 @@ case class FakeRowAdaptor(child: SparkPlan)
     if (child.supportsColumnar) {
       child.executeColumnar()
     } else {
-      val r2c = Transitions.toBackendBatchPlan(child)
+      val r2c = Transitions.toBatchPlan(child, 
BackendsApiManager.getSettings.primaryBatchType)
       r2c.executeColumnar()
     }
   }
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index 9712bd2c21..5daca9bede 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -32,7 +32,7 @@ class TransitionSuite extends SharedSparkSession {
   test("Trivial C2R") {
     val in = BatchLeaf(TypeA)
     val out = ConventionFunc.ignoreBackend {
-      Transitions.insertTransitions(in, outputsColumnar = false)
+      Transitions.insert(in, outputsColumnar = false)
     }
     assert(out == BatchToRow(TypeA, BatchLeaf(TypeA)))
   }
@@ -40,7 +40,7 @@ class TransitionSuite extends SharedSparkSession {
   test("Insert C2R") {
     val in = RowUnary(BatchLeaf(TypeA))
     val out = ConventionFunc.ignoreBackend {
-      Transitions.insertTransitions(in, outputsColumnar = false)
+      Transitions.insert(in, outputsColumnar = false)
     }
     assert(out == RowUnary(BatchToRow(TypeA, BatchLeaf(TypeA))))
   }
@@ -48,7 +48,7 @@ class TransitionSuite extends SharedSparkSession {
   test("Insert R2C") {
     val in = BatchUnary(TypeA, RowLeaf())
     val out = ConventionFunc.ignoreBackend {
-      Transitions.insertTransitions(in, outputsColumnar = false)
+      Transitions.insert(in, outputsColumnar = false)
     }
     assert(out == BatchToRow(TypeA, BatchUnary(TypeA, RowToBatch(TypeA, 
RowLeaf()))))
   }
@@ -56,7 +56,7 @@ class TransitionSuite extends SharedSparkSession {
   test("Insert C2R2C") {
     val in = BatchUnary(TypeA, BatchLeaf(TypeB))
     val out = ConventionFunc.ignoreBackend {
-      Transitions.insertTransitions(in, outputsColumnar = false)
+      Transitions.insert(in, outputsColumnar = false)
     }
     assert(
       out == BatchToRow(
@@ -67,7 +67,7 @@ class TransitionSuite extends SharedSparkSession {
   test("Insert C2C") {
     val in = BatchUnary(TypeA, BatchLeaf(TypeC))
     val out = ConventionFunc.ignoreBackend {
-      Transitions.insertTransitions(in, outputsColumnar = false)
+      Transitions.insert(in, outputsColumnar = false)
     }
     assert(
       out == BatchToRow(
@@ -79,7 +79,7 @@ class TransitionSuite extends SharedSparkSession {
     val in = BatchUnary(TypeA, BatchLeaf(TypeD))
     assertThrows[GlutenException] {
       ConventionFunc.ignoreBackend {
-        Transitions.insertTransitions(in, outputsColumnar = false)
+        Transitions.insert(in, outputsColumnar = false)
       }
     }
   }
@@ -116,8 +116,7 @@ object TransitionSuite extends TransitionSuiteBase {
   case class RowToBatch(toBatchType: Convention.BatchType, override val child: 
SparkPlan)
     extends RowToColumnarTransition
     with GlutenPlan {
-    override def supportsColumnar: Boolean = true
-    override protected def batchType0(): Convention.BatchType = toBatchType
+    override def batchType(): Convention.BatchType = toBatchType
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
     override protected def doExecute(): RDD[InternalRow] =
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index d82cc3aac9..43805b3d65 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -24,22 +24,18 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.{BinaryExecNode, LeafExecNode, 
SparkPlan, UnaryExecNode}
 
 trait TransitionSuiteBase {
-  case class BatchLeaf(override val batchType0: Convention.BatchType)
+  case class BatchLeaf(override val batchType: Convention.BatchType)
     extends LeafExecNode
     with GlutenPlan {
-    override def supportsColumnar: Boolean = true
 
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
 
     override def output: Seq[Attribute] = List.empty
   }
 
-  case class BatchUnary(
-      override val batchType0: Convention.BatchType,
-      override val child: SparkPlan)
+  case class BatchUnary(override val batchType: Convention.BatchType, override 
val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
-    override def supportsColumnar: Boolean = true
 
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
@@ -50,12 +46,11 @@ trait TransitionSuiteBase {
   }
 
   case class BatchBinary(
-      override val batchType0: Convention.BatchType,
+      override val batchType: Convention.BatchType,
       override val left: SparkPlan,
       override val right: SparkPlan)
     extends BinaryExecNode
     with GlutenPlan {
-    override def supportsColumnar: Boolean = true
 
     override protected def withNewChildrenInternal(
         newLeft: SparkPlan,
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 7abe8228fa..7d4315e8d7 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.RemoveFallbackTagRule
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -44,7 +44,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
       assert(outputPlan == originalPlan)
@@ -61,7 +61,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -79,7 +79,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -99,7 +99,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -119,7 +119,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -199,17 +199,13 @@ private object FallbackStrategiesSuite {
   }
 
 // For replacing LeafOp.
-  case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
-    extends LeafExecNode
-    with GlutenPlan {
+  case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
 
 // For replacing UnaryOp1.
-  case class UnaryOp1Transformer(
-      override val child: SparkPlan,
-      override val supportsColumnar: Boolean = true)
+  case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index d03619a2e0..e8cc7898c2 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -43,7 +43,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
       assert(outputPlan == originalPlan)
@@ -60,7 +60,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -78,7 +78,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -98,7 +98,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -118,7 +118,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -228,17 +228,13 @@ private object FallbackStrategiesSuite {
   }
 
   // For replacing LeafOp.
-  case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
-    extends LeafExecNode
-    with GlutenPlan {
+  case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
 
   // For replacing UnaryOp1.
-  case class UnaryOp1Transformer(
-      override val child: SparkPlan,
-      override val supportsColumnar: Boolean = true)
+  case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index d03619a2e0..e8cc7898c2 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -43,7 +43,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
       assert(outputPlan == originalPlan)
@@ -60,7 +60,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -78,7 +78,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -98,7 +98,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -118,7 +118,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -228,17 +228,13 @@ private object FallbackStrategiesSuite {
   }
 
   // For replacing LeafOp.
-  case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
-    extends LeafExecNode
-    with GlutenPlan {
+  case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
 
   // For replacing UnaryOp1.
-  case class UnaryOp1Transformer(
-      override val child: SparkPlan,
-      override val supportsColumnar: Boolean = true)
+  case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
index 640b8cdc6f..cdfb63aa20 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/benchmarks/ParquetReadBenchmark.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.benchmarks
 
 import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.{FileSourceScanExecTransformer, 
WholeStageTransformer}
 import org.apache.gluten.extension.columnar.transition.Transitions
 import org.apache.gluten.jni.JniLibLoader
@@ -125,7 +126,7 @@ object ParquetReadBenchmark extends SqlBasedBenchmark {
 
     // generate ColumnarToRow
     val columnarToRowPlan =
-      Transitions.toBackendBatchPlan(newWholeStage)
+      Transitions.toBatchPlan(newWholeStage, 
BackendsApiManager.getSettings.primaryBatchType)
 
     val newWholeStageRDD = newWholeStage.executeColumnar()
     val newColumnarToRowRDD = columnarToRowPlan.execute()
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index f43652a7d4..1d45e8a672 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -23,7 +23,7 @@ import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertTransitions
+import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -44,7 +44,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
       assert(outputPlan == originalPlan)
@@ -61,7 +61,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -79,7 +79,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOp()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -99,7 +99,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to fall back the entire plan.
@@ -119,7 +119,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
             _ => {
               
UnaryOp2(UnaryOp1Transformer(UnaryOp2(UnaryOp1Transformer(LeafOpTransformer()))))
             },
-          c => InsertTransitions(c.outputsColumnar)))
+          c => InsertBackendTransitions(c.outputsColumnar)))
         .enableAdaptiveContext()
       val outputPlan = rule.apply(originalPlan, false)
       // Expect to get the plan with columnar rule applied.
@@ -229,17 +229,13 @@ private object FallbackStrategiesSuite {
   }
 
 // For replacing LeafOp.
-  case class LeafOpTransformer(override val supportsColumnar: Boolean = true)
-    extends LeafExecNode
-    with GlutenPlan {
+  case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
 
 // For replacing UnaryOp1.
-  case class UnaryOp1Transformer(
-      override val child: SparkPlan,
-      override val supportsColumnar: Boolean = true)
+  case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 2bdde3b4aa..a3bd5079b0 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -71,10 +71,9 @@ abstract class AbstractFileSourceScanExec(
     disableBucketedScan: Boolean = false)
   extends DataSourceScanExec {
 
-  // Note that some vals referring the file-based relation are lazy 
intentionally
-  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsColumnar: Boolean = {
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  override def supportsColumnar: Boolean = {
+    // The value should be defined in GlutenPlan.
+    throw new UnsupportedOperationException("Unreachable code")
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 6b3d679fcd..c885f0cf44 100644
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -75,10 +75,9 @@ abstract class AbstractFileSourceScanExec(
   lazy val metadataColumns: Seq[AttributeReference] =
     output.collect { case FileSourceMetadataAttribute(attr) => attr }
 
-  // Note that some vals referring the file-based relation are lazy 
intentionally
-  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsColumnar: Boolean = {
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  override def supportsColumnar: Boolean = {
+    // The value should be defined in GlutenPlan.
+    throw new UnsupportedOperationException("Unreachable code")
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 5e75186186..53ea6f543a 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -67,15 +67,9 @@ abstract class AbstractFileSourceScanExec(
     override val disableBucketedScan: Boolean = false)
   extends FileSourceScanLike {
 
-  // Note that some vals referring the file-based relation are lazy 
intentionally
-  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsColumnar: Boolean = {
-    val conf = relation.sparkSession.sessionState.conf
-    // Only output columnar if there is WSCG to read it.
-    val requiredWholeStageCodegenSettings =
-      conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
-    requiredWholeStageCodegenSettings &&
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  override def supportsColumnar: Boolean = {
+    // The value should be defined in GlutenPlan.
+    throw new UnsupportedOperationException("Unreachable code")
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
index 32cbd435b0..c8dbcc2fed 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/AbstractFileSourceScanExec.scala
@@ -67,15 +67,9 @@ abstract class AbstractFileSourceScanExec(
     override val disableBucketedScan: Boolean = false)
   extends FileSourceScanLike {
 
-  // Note that some vals referring the file-based relation are lazy 
intentionally
-  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsColumnar: Boolean = {
-    val conf = relation.sparkSession.sessionState.conf
-    // Only output columnar if there is WSCG to read it.
-    val requiredWholeStageCodegenSettings =
-      conf.wholeStageEnabled && !WholeStageCodegenExec.isTooManyFields(conf, 
schema)
-    requiredWholeStageCodegenSettings &&
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  override def supportsColumnar: Boolean = {
+    // The value should be defined in GlutenPlan.
+    throw new UnsupportedOperationException("Unreachable code")
   }
 
   private lazy val needsUnsafeRowConversion: Boolean = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to