This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 353169e5cd [GLUTEN-6920][CORE] Redesign and move trait `GlutenPlan` to 
`gluten-core` (#8036)
353169e5cd is described below

commit 353169e5cd3541cf56ee8e5bdd01e73a5d157aaf
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Nov 27 13:37:06 2024 +0800

    [GLUTEN-6920][CORE] Redesign and move trait `GlutenPlan` to `gluten-core` 
(#8036)
---
 .../extension/FallbackBroadcastHashJoinRules.scala |  8 +-
 .../GlutenClickHouseTPCDSMetricsSuite.scala        |  2 +-
 .../metrics/GlutenClickHouseTPCHMetricsSuite.scala |  2 +-
 ...ckHouseTPCHColumnarShuffleParquetAQESuite.scala |  2 +-
 .../GlutenClickHouseTPCHSaltNullParquetSuite.scala |  2 +-
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |  1 -
 .../execution/ColumnarPartialProjectExec.scala     | 12 ++-
 .../gluten/execution/RowToVeloxColumnarExec.scala  |  1 -
 .../gluten/execution/VeloxResizeBatchesExec.scala  |  7 +-
 .../ArrowConvertorRule.scala                       |  3 +-
 .../api/python/ColumnarArrowEvalPythonExec.scala   | 10 ++-
 .../spark/sql/execution/BaseArrowScanExec.scala    |  4 +-
 .../apache/gluten/execution/FallbackSuite.scala    |  1 -
 .../gluten/execution/ColumnarToColumnarExec.scala  | 14 +---
 .../org/apache/gluten/execution/GlutenPlan.scala   | 60 +++++++++++++
 .../enumerated/planner/plan/GlutenPlanModel.scala  |  5 +-
 .../enumerated/planner/property/Conv.scala         |  2 +-
 .../extension/columnar/transition/Convention.scala | 26 +++---
 .../columnar/transition/ConventionFunc.scala       | 73 +++++++---------
 .../columnar/transition/ConventionReq.scala        |  6 +-
 .../columnar/transition/Transitions.scala          |  8 +-
 .../BasicPhysicalOperatorTransformer.scala         |  9 +-
 .../CartesianProductExecTransformer.scala          |  5 +-
 .../gluten/execution/ColumnarCoalesceExec.scala    |  9 +-
 .../gluten/execution/ColumnarToRowExecBase.scala   | 19 ++---
 .../gluten/execution/RowToColumnarExecBase.scala   |  9 +-
 .../TakeOrderedAndProjectExecTransformer.scala     |  7 +-
 .../gluten/execution/WholeStageTransformer.scala   | 87 ++++++++++++++++++-
 .../org/apache/gluten/extension/GlutenPlan.scala   | 98 ----------------------
 .../extension/columnar/enumerated/RasOffload.scala |  6 +-
 .../extension/columnar/enumerated/RemoveSort.scala |  3 +-
 .../columnar/heuristic/ExpandFallbackPolicy.scala  |  4 +-
 .../columnar/offload/OffloadSingleNodeRules.scala  |  4 +-
 .../extension/columnar/validator/Validators.scala  |  2 +-
 .../execution/ColumnarBroadcastExchangeExec.scala  |  9 +-
 .../ColumnarCollapseTransformStages.scala          | 15 +++-
 .../execution/ColumnarShuffleExchangeExec.scala    | 10 ++-
 .../execution/ColumnarSubqueryBroadcastExec.scala  |  7 +-
 .../sql/execution/ColumnarWriteFilesExec.scala     | 17 ++--
 .../spark/sql/execution/GlutenExplainUtils.scala   |  3 +-
 .../sql/execution/GlutenFallbackReporter.scala     |  2 +-
 .../spark/sql/execution/GlutenImplicits.scala      |  6 +-
 .../datasources/GlutenWriterColumnarRules.scala    |  5 +-
 .../columnar/transition/TransitionSuite.scala      |  4 +-
 .../columnar/transition/TransitionSuiteBase.scala  |  5 +-
 .../org/apache/gluten/test/FallbackUtil.scala      |  2 +-
 .../org/apache/spark/sql/GlutenQueryTest.scala     |  2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    | 11 ++-
 .../sql/statistics/SparkFunctionStatistics.scala   |  2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    | 11 ++-
 .../sql/statistics/SparkFunctionStatistics.scala   |  2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    | 11 ++-
 .../spark/sql/sources/GlutenInsertSuite.scala      |  2 +-
 .../sql/statistics/SparkFunctionStatistics.scala   |  2 +-
 .../sql/execution/FallbackStrategiesSuite.scala    | 11 ++-
 .../spark/sql/sources/GlutenInsertSuite.scala      |  2 +-
 .../sql/statistics/SparkFunctionStatistics.scala   |  2 +-
 .../sql/GlutenExpressionDataTypesValidation.scala  |  6 +-
 58 files changed, 381 insertions(+), 279 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
index 4af2d0fc0c..6a788617a6 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala
@@ -116,19 +116,19 @@ case class 
FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
             if (FallbackTags.nonEmpty(bnlj)) {
               ValidationResult.failed("broadcast join is already tagged as not 
transformable")
             } else {
-              val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+              val bnljTransformer = 
BackendsApiManager.getSparkPlanExecApiInstance
                 .genBroadcastNestedLoopJoinExecTransformer(
                   bnlj.left,
                   bnlj.right,
                   bnlj.buildSide,
                   bnlj.joinType,
                   bnlj.condition)
-              val isTransformable = transformer.doValidate()
-              if (isTransformable.ok()) {
+              val isBnljTransformable = bnljTransformer.doValidate()
+              if (isBnljTransformable.ok()) {
                 val exchangeTransformer = ColumnarBroadcastExchangeExec(mode, 
child)
                 exchangeTransformer.doValidate()
               } else {
-                isTransformable
+                isBnljTransformable
               }
             }
           }
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
index cbf1caf44e..02445270d4 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution.metrics
 
 import org.apache.gluten.execution.{ColumnarNativeIterator, 
GlutenClickHouseTPCDSAbstractSuite, WholeStageTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.Attribute
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
index 3cfb8cc4fc..015da0dfae 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.execution.metrics
 
 import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.catalyst.expressions.Attribute
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index 885e50b046..1c3e4de4c6 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -18,7 +18,7 @@ package org.apache.gluten.execution.tpch
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.Arm
 
 import org.apache.spark.SparkConf
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
index 4a2b7040fa..fbefc054fa 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHSaltNullParquetSuite.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.execution.tpch
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.clickhouse.CHConf
 import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.DataFrame
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 0a62a41baa..7841e6cd94 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -19,7 +19,6 @@ package org.apache.gluten.backendsapi.velox
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.RuleApi
 import org.apache.gluten.columnarbatch.VeloxBatch
-import org.apache.gluten.datasource.ArrowConvertorRule
 import org.apache.gluten.extension._
 import org.apache.gluten.extension.columnar._
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
index 84de41daa0..20744f531b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarPartialProjectExec.scala
@@ -17,9 +17,11 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
 import org.apache.gluten.expression.{ArrowProjection, ExpressionUtils}
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.sql.shims.SparkShimLoader
@@ -31,7 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, AttributeRef
 import org.apache.spark.sql.execution.{ExplainUtils, ProjectExec, SparkPlan, 
UnaryExecNode}
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.hive.HiveUdfUtil
-import org.apache.spark.sql.types.{BinaryType, BooleanType, ByteType, 
DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, 
NullType, ShortType, StringType, TimestampType, YearMonthIntervalType}
+import org.apache.spark.sql.types._
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 import scala.collection.mutable.ListBuffer
@@ -51,7 +53,7 @@ import scala.collection.mutable.ListBuffer
 case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
     replacedAliasUdf: Seq[Alias])
   extends UnaryExecNode
-  with GlutenPlan {
+  with ValidatablePlan {
 
   private val projectAttributes: ListBuffer[Attribute] = ListBuffer()
   private val projectIndexInChild: ListBuffer[Int] = ListBuffer()
@@ -73,6 +75,10 @@ case class ColumnarPartialProjectExec(original: ProjectExec, 
child: SparkPlan)(
 
   override def output: Seq[Attribute] = child.output ++ 
replacedAliasUdf.map(_.toAttribute)
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   final override def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException(
       s"${this.getClass.getSimpleName} doesn't support doExecute")
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
index a853778484..9921ffbfab 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/RowToVeloxColumnarExec.scala
@@ -43,7 +43,6 @@ import org.apache.arrow.memory.ArrowBuf
 import scala.collection.mutable.ListBuffer
 
 case class RowToVeloxColumnarExec(child: SparkPlan) extends 
RowToColumnarExecBase(child = child) {
-
   override def doExecuteColumnarInternal(): RDD[ColumnarBatch] = {
     val numInputRows = longMetric("numInputRows")
     val numOutputBatches = longMetric("numOutputBatches")
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
index 5283ab61e3..9ed687d337 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxResizeBatchesExec.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.utils.VeloxBatchResizer
 
@@ -53,6 +54,10 @@ case class VeloxResizeBatchesExec(
     "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to append 
/ split batches")
   )
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
similarity index 97%
rename from 
backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
rename to 
backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
index b1b0b813f6..5e02cf54b0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/datasource/ArrowConvertorRule.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/extension/ArrowConvertorRule.scala
@@ -14,9 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.datasource
+package org.apache.gluten.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.datasource.ArrowCSVFileFormat
 import org.apache.gluten.datasource.v2.ArrowCSVTable
 import org.apache.gluten.sql.shims.SparkShimLoader
 
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
index a2eee53660..548dec13be 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/api/python/ColumnarArrowEvalPythonExec.scala
@@ -19,9 +19,9 @@ package org.apache.spark.api.python
 import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
 import org.apache.gluten.columnarbatch.ColumnarBatches
 import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
 import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
 import org.apache.gluten.utils.PullOutProjectHelper
@@ -213,11 +213,13 @@ case class ColumnarArrowEvalPythonExec(
     evalType: Int)
   extends EvalPythonExec
   with GlutenPlan
-  with KnownChildrenConventions {
+  with KnownChildConvention {
 
   override def batchType(): Convention.BatchType = ArrowJavaBatch
 
-  override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override def requiredChildConvention(): Seq[ConventionReq] = List(
     ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(ArrowJavaBatch)))
 
   override lazy val metrics = Map(
diff --git 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
index 6617e8b138..1aacc1b954 100644
--- 
a/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/spark/sql/execution/BaseArrowScanExec.scala
@@ -17,11 +17,13 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.columnarbatch.ArrowBatches
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
 trait BaseArrowScanExec extends GlutenPlan {
   final override def batchType(): Convention.BatchType = {
     ArrowBatches.ArrowJavaBatch
   }
+
+  final override def rowType0(): Convention.RowType = Convention.RowType.None
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
index 0f7b70bf00..0f94b8648e 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/FallbackSuite.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.execution.{ColumnarBroadcastExchangeExec, 
ColumnarShuffleExchangeExec, SparkPlan}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
index 7ca4b36b06..e27b6a4e2f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/execution/ColumnarToColumnarExec.scala
@@ -30,9 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 abstract class ColumnarToColumnarExec(from: Convention.BatchType, to: 
Convention.BatchType)
   extends ColumnarToColumnarTransition
-  with Convention.KnownBatchType
-  with Convention.KnownRowTypeForSpark33AndLater
-  with ConventionReq.KnownChildrenConventions {
+  with GlutenPlan {
 
   def child: SparkPlan
   protected def mapIterator(in: Iterator[ColumnarBatch]): 
Iterator[ColumnarBatch]
@@ -46,21 +44,13 @@ abstract class ColumnarToColumnarExec(from: 
Convention.BatchType, to: Convention
       "selfTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to 
convert batches")
     )
 
-  final override val supportsColumnar: Boolean = {
-    batchType() != Convention.BatchType.None
-  }
-
   override def batchType(): Convention.BatchType = to
 
-  final override val supportsRowBased: Boolean = {
-    rowType() != Convention.RowType.None
-  }
-
   override def rowType0(): Convention.RowType = {
     Convention.RowType.None
   }
 
-  override def requiredChildrenConventions(): Seq[ConventionReq] = List(
+  override def requiredChildConvention(): Seq[ConventionReq] = List(
     ConventionReq.of(ConventionReq.RowType.Any, 
ConventionReq.BatchType.Is(from)))
 
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala 
b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
new file mode 100644
index 0000000000..460326d8d9
--- /dev/null
+++ b/gluten-core/src/main/scala/org/apache/gluten/execution/GlutenPlan.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+trait GlutenPlan
+  extends SparkPlan
+  with Convention.KnownBatchType
+  with Convention.KnownRowTypeForSpark33AndLater
+  with GlutenPlan.SupportsRowBasedCompatible
+  with ConventionReq.KnownChildConvention {
+
+  final override val supportsColumnar: Boolean = {
+    batchType() != Convention.BatchType.None
+  }
+
+  override def batchType(): Convention.BatchType
+
+  final override val supportsRowBased: Boolean = {
+    rowType() != Convention.RowType.None
+  }
+
+  override def rowType0(): Convention.RowType
+
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    // In the normal case, children's convention should follow parent node's 
convention.
+    val childReq = Convention.of(rowType(), batchType()).asReq()
+    Seq.tabulate(children.size)(
+      _ => {
+        childReq
+      })
+  }
+}
+
+object GlutenPlan {
+  // To be compatible with Spark (version < 3.3)
+  trait SupportsRowBasedCompatible {
+    def supportsRowBased(): Boolean = {
+      throw new GlutenException("Illegal state: The method is not expected to 
be called")
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
index 568ea50396..20a238a47d 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/plan/GlutenPlanModel.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.enumerated.planner.plan
 
+import org.apache.gluten.execution.GlutenPlan
 import 
org.apache.gluten.extension.columnar.enumerated.planner.metadata.GlutenMetadata
 import org.apache.gluten.extension.columnar.enumerated.planner.property.{Conv, 
ConvDef}
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
@@ -37,13 +38,15 @@ object GlutenPlanModel {
     PlanModelImpl
   }
 
+  // TODO: Make this inherit from GlutenPlan.
   case class GroupLeafExec(
       groupId: Int,
       metadata: GlutenMetadata,
       constraintSet: PropertySet[SparkPlan])
     extends LeafExecNode
     with Convention.KnownBatchType
-    with Convention.KnownRowTypeForSpark33AndLater {
+    with Convention.KnownRowTypeForSpark33AndLater
+    with GlutenPlan.SupportsRowBasedCompatible {
     private val req: Conv.Req = 
constraintSet.get(ConvDef).asInstanceOf[Conv.Req]
 
     override protected def doExecute(): RDD[InternalRow] = throw new 
IllegalStateException()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
index 7b2b801ac9..cfb32e7644 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/planner/property/Conv.scala
@@ -92,7 +92,7 @@ object ConvDef extends PropertyDef[SparkPlan, Conv] {
   override def getChildrenConstraints(
       constraint: Property[SparkPlan],
       plan: SparkPlan): Seq[Conv] = {
-    val out = List.tabulate(plan.children.size)(_ => 
Conv.req(ConventionReq.get(plan)))
+    val out = ConventionReq.get(plan).map(Conv.req)
     out
   }
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
index b57f3e0c0a..b9fbe023b2 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Convention.scala
@@ -16,8 +16,6 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.exception.GlutenException
-
 import org.apache.spark.sql.execution.{ColumnarToRowExec, RowToColumnarExec, 
SparkPlan}
 import org.apache.spark.util.SparkVersionUtil
 
@@ -55,6 +53,19 @@ object Convention {
       }
       Convention.of(rowType(), batchType())
     }
+
+    def asReq(): ConventionReq = {
+      val rowTypeReq = conv.rowType match {
+        case Convention.RowType.None => ConventionReq.RowType.Any
+        case r => ConventionReq.RowType.Is(r)
+      }
+
+      val batchTypeReq = conv.batchType match {
+        case Convention.BatchType.None => ConventionReq.BatchType.Any
+        case b => ConventionReq.BatchType.Is(b)
+      }
+      ConventionReq.of(rowTypeReq, batchTypeReq)
+    }
   }
 
   private case class Impl(override val rowType: RowType, override val 
batchType: BatchType)
@@ -142,19 +153,10 @@ object Convention {
     def batchType(): BatchType
   }
 
-  sealed trait KnownRowType extends KnownRowType.SupportsRowBasedCompatible {
+  sealed trait KnownRowType {
     def rowType(): RowType
   }
 
-  object KnownRowType {
-    // To be compatible with Spark (version < 3.3)
-    sealed trait SupportsRowBasedCompatible {
-      def supportsRowBased(): Boolean = {
-        throw new GlutenException("Illegal state: The method is not expected 
to be called")
-      }
-    }
-  }
-
   trait KnownRowTypeForSpark33AndLater extends KnownRowType {
     this: SparkPlan =>
     import KnownRowTypeForSpark33AndLater._
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
index 5cb3d44a15..2be9271a1f 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionFunc.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension.columnar.transition
 
 import org.apache.gluten.backend.Backend
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan, UnionExec}
@@ -28,14 +28,14 @@ import 
org.apache.spark.sql.execution.exchange.ReusedExchangeExec
 /** ConventionFunc is a utility to derive [[Convention]] or [[ConventionReq]] 
from a query plan. */
 sealed trait ConventionFunc {
   def conventionOf(plan: SparkPlan): Convention
-  def conventionReqOf(plan: SparkPlan): ConventionReq
+  def conventionReqOf(plan: SparkPlan): Seq[ConventionReq]
 }
 
 object ConventionFunc {
   trait Override {
     def rowTypeOf: PartialFunction[SparkPlan, Convention.RowType] = 
PartialFunction.empty
     def batchTypeOf: PartialFunction[SparkPlan, Convention.BatchType] = 
PartialFunction.empty
-    def conventionReqOf: PartialFunction[SparkPlan, ConventionReq] = 
PartialFunction.empty
+    def conventionReqOf: PartialFunction[SparkPlan, Seq[ConventionReq]] = 
PartialFunction.empty
   }
 
   object Override {
@@ -72,7 +72,6 @@ object ConventionFunc {
   }
 
   private class BuiltinFunc(o: Override) extends ConventionFunc {
-    import BuiltinFunc._
     override def conventionOf(plan: SparkPlan): Convention = {
       val conv = conventionOf0(plan)
       conv
@@ -158,61 +157,45 @@ object ConventionFunc {
       }
     }
 
-    override def conventionReqOf(plan: SparkPlan): ConventionReq = {
+    override def conventionReqOf(plan: SparkPlan): Seq[ConventionReq] = {
       val req = o.conventionReqOf.applyOrElse(plan, conventionReqOf0)
+      assert(req.size == plan.children.size)
       req
     }
 
-    private def conventionReqOf0(plan: SparkPlan): ConventionReq = plan match {
-      case k: KnownChildrenConventions =>
-        val reqs = k.requiredChildrenConventions().distinct
-        // This can be a temporary restriction.
-        assert(
-          reqs.size == 1,
-          "KnownChildrenConventions#requiredChildrenConventions should output 
the same element" +
-            " for all children")
-        reqs.head
+    private def conventionReqOf0(plan: SparkPlan): Seq[ConventionReq] = plan 
match {
+      case k: KnownChildConvention =>
+        val reqs = k.requiredChildConvention()
+        reqs
       case RowToColumnarLike(_) =>
-        ConventionReq.of(
-          ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
-          ConventionReq.BatchType.Any)
+        Seq(
+          ConventionReq.of(
+            ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+            ConventionReq.BatchType.Any))
       case ColumnarToRowExec(_) =>
-        ConventionReq.of(
-          ConventionReq.RowType.Any,
-          ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch))
+        Seq(
+          ConventionReq.of(
+            ConventionReq.RowType.Any,
+            ConventionReq.BatchType.Is(Convention.BatchType.VanillaBatch)))
       case write: DataWritingCommandExec if 
SparkShimLoader.getSparkShims.isPlannedV1Write(write) =>
         // To align with 
ApplyColumnarRulesAndInsertTransitions#insertTransitions
-        ConventionReq.any
+        Seq(ConventionReq.any)
       case u: UnionExec =>
         // We force vanilla union to output row data to get the best 
compatibility with vanilla
         // Spark.
         // As a result it's a common practice to rewrite it with GlutenPlan 
for offloading.
-        ConventionReq.of(
-          ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
-          ConventionReq.BatchType.Any)
+        Seq.tabulate(u.children.size)(
+          _ =>
+            ConventionReq.of(
+              ConventionReq.RowType.Is(Convention.RowType.VanillaRow),
+              ConventionReq.BatchType.Any))
       case other =>
         // In the normal case, children's convention should follow parent 
node's convention.
-        // Note, we don't have to consider C2R / R2C here since they are 
already removed by
-        // RemoveTransitions.
-        val thisConv = conventionOf0(other)
-        thisConv.asReq()
-    }
-  }
-
-  private object BuiltinFunc {
-    implicit private class ConventionOps(conv: Convention) {
-      def asReq(): ConventionReq = {
-        val rowTypeReq = conv.rowType match {
-          case Convention.RowType.None => ConventionReq.RowType.Any
-          case r => ConventionReq.RowType.Is(r)
-        }
-
-        val batchTypeReq = conv.batchType match {
-          case Convention.BatchType.None => ConventionReq.BatchType.Any
-          case b => ConventionReq.BatchType.Is(b)
-        }
-        ConventionReq.of(rowTypeReq, batchTypeReq)
-      }
+        val childReq = conventionOf0(other).asReq()
+        Seq.tabulate(other.children.size)(
+          _ => {
+            childReq
+          })
     }
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
index a081f21434..86637f2d5a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/ConventionReq.scala
@@ -55,12 +55,12 @@ object ConventionReq {
   val row: ConventionReq = ofRow(RowType.Is(Convention.RowType.VanillaRow))
   val vanillaBatch: ConventionReq = 
ofBatch(BatchType.Is(Convention.BatchType.VanillaBatch))
 
-  def get(plan: SparkPlan): ConventionReq = 
ConventionFunc.create().conventionReqOf(plan)
+  def get(plan: SparkPlan): Seq[ConventionReq] = 
ConventionFunc.create().conventionReqOf(plan)
   def of(rowType: RowType, batchType: BatchType): ConventionReq = 
Impl(rowType, batchType)
   def ofRow(rowType: RowType): ConventionReq = Impl(rowType, BatchType.Any)
   def ofBatch(batchType: BatchType): ConventionReq = Impl(RowType.Any, 
batchType)
 
-  trait KnownChildrenConventions {
-    def requiredChildrenConventions(): Seq[ConventionReq]
+  trait KnownChildConvention {
+    def requiredChildConvention(): Seq[ConventionReq]
   }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
index 10d50f453d..cec09ee7a1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transition/Transitions.scala
@@ -42,13 +42,13 @@ case class InsertTransitions(convReq: ConventionReq) 
extends Rule[SparkPlan] {
     if (node.children.isEmpty) {
       return node
     }
-    val convReq = convFunc.conventionReqOf(node)
-    val newChildren = node.children.map {
-      child =>
+    val convReqs = convFunc.conventionReqOf(node)
+    val newChildren = node.children.zip(convReqs).map {
+      case (child, convReq) =>
         val from = convFunc.conventionOf(child)
         if (from.isNone) {
           // For example, a union op with row child and columnar child at the 
same time,
-          // The plan is actually not executable and we cannot tell about its 
convention.
+          // The plan is actually not executable, and we cannot tell about its 
convention.
           child
         } else {
           val transition =
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 2830ef404c..f9755605ca 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -19,7 +19,8 @@ package org.apache.gluten.execution
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, 
ExpressionTransformer}
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.`type`.TypeBuilder
 import org.apache.gluten.substrait.SubstraitContext
@@ -261,13 +262,17 @@ abstract class ProjectExecTransformerBase(val list: 
Seq[NamedExpression], val in
 }
 
 // An alternatives for UnionExec.
-case class ColumnarUnionExec(children: Seq[SparkPlan]) extends GlutenPlan {
+case class ColumnarUnionExec(children: Seq[SparkPlan]) extends ValidatablePlan 
{
   children.foreach {
     case w: WholeStageTransformer =>
       w.setOutputSchemaForPlan(output)
     case _ =>
   }
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override def output: Seq[Attribute] = {
     children.map(_.output).transpose.map {
       attrs =>
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 28fb691896..3e3169aa55 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -18,7 +18,8 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.expression.ExpressionConverter
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.rel.RelBuilder
@@ -45,6 +46,8 @@ import java.io.{IOException, ObjectOutputStream}
  */
 case class ColumnarCartesianProductBridge(child: SparkPlan) extends 
UnaryExecNode with GlutenPlan {
   override def output: Seq[Attribute] = child.output
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+  override def rowType0(): Convention.RowType = Convention.RowType.None
   override protected def doExecute(): RDD[InternalRow] =
     throw new UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
index 3b13207c93..107bf544cd 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCoalesceExec.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.gluten.execution
 
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 import org.apache.spark.rdd.RDD
@@ -28,7 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
 
 case class ColumnarCoalesceExec(numPartitions: Int, child: SparkPlan)
   extends UnaryExecNode
-  with GlutenPlan {
+  with ValidatablePlan {
 
   override def output: Seq[Attribute] = child.output
 
@@ -36,6 +37,10 @@ case class ColumnarCoalesceExec(numPartitions: Int, child: 
SparkPlan)
     if (numPartitions == 1) SinglePartition else 
UnknownPartitioning(numPartitions)
   }
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override protected def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException()
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
index fae3115981..8bf1fbe447 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarToRowExecBase.scala
@@ -17,9 +17,8 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
-import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
+import 
org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildConvention
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
@@ -30,8 +29,8 @@ import 
org.apache.spark.sql.execution.{ColumnarToRowTransition, SparkPlan}
 
 abstract class ColumnarToRowExecBase(child: SparkPlan)
   extends ColumnarToRowTransition
-  with GlutenPlan
-  with KnownChildrenConventions {
+  with KnownChildConvention
+  with ValidatablePlan {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics =
@@ -47,6 +46,12 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
 
   override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
 
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    List(
+      ConventionReq.ofBatch(
+        
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
+  }
+
   override def doExecuteBroadcast[T](): Broadcast[T] = {
     // Require for explicit implementation, otherwise throw error.
     super.doExecuteBroadcast[T]()
@@ -57,10 +62,4 @@ abstract class ColumnarToRowExecBase(child: SparkPlan)
   override def doExecute(): RDD[InternalRow] = {
     doExecuteInternal()
   }
-
-  override def requiredChildrenConventions(): Seq[ConventionReq] = {
-    List(
-      ConventionReq.ofBatch(
-        
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
-  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
index 2a52616361..9f36b8fc67 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RowToColumnarExecBase.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 
 import org.apache.spark.broadcast
 import org.apache.spark.rdd.RDD
@@ -46,8 +45,14 @@ abstract class RowToColumnarExecBase(child: SparkPlan)
 
   final override def outputOrdering: Seq[SortOrder] = child.outputOrdering
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
   override def rowType0(): Convention.RowType = Convention.RowType.None
 
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    Seq(ConventionReq.row)
+  }
+
   final override def doExecute(): RDD[InternalRow] = {
     child.execute()
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index c960bda249..f19960ec1c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -17,7 +17,8 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -40,9 +41,11 @@ case class TakeOrderedAndProjectExecTransformer(
     child: SparkPlan,
     offset: Int = 0)
   extends UnaryExecNode
-  with GlutenPlan {
+  with ValidatablePlan {
   override def outputPartitioning: Partitioning = SinglePartition
   override def outputOrdering: Seq[SortOrder] = sortOrder
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+  override def rowType0(): Convention.RowType = Convention.RowType.None
 
   override def output: Seq[Attribute] = {
     projectList.map(_.toAttribute)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index 6414b67a80..dbfc11c136 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -18,14 +18,17 @@ package org.apache.gluten.execution
 
 import org.apache.gluten.{GlutenConfig, GlutenNumaBindingInfo}
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.exception.{GlutenException, GlutenNotSupportException}
 import org.apache.gluten.expression._
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.metrics.{GlutenTimeMetric, MetricsUpdater}
 import org.apache.gluten.substrait.`type`.{TypeBuilder, TypeNode}
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.plan.{PlanBuilder, PlanNode}
 import org.apache.gluten.substrait.rel.{LocalFilesNode, RelNode, SplitInfo}
+import org.apache.gluten.test.TestStats
 import org.apache.gluten.utils.SubstraitPlanPrinterUtil
 
 import org.apache.spark._
@@ -53,7 +56,74 @@ case class TransformContext(outputAttributes: 
Seq[Attribute], root: RelNode)
 
 case class WholeStageTransformContext(root: PlanNode, substraitContext: 
SubstraitContext = null)
 
-trait TransformSupport extends GlutenPlan {
+/**
+ * Base interface for a Gluten query plan that is also open to validation 
calls.
+ *
+ * Since https://github.com/apache/incubator-gluten/pull/2185.
+ */
+trait ValidatablePlan extends GlutenPlan with LogLevelUtil {
+  protected def glutenConf: GlutenConfig = GlutenConfig.getConf
+
+  protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
+
+  /**
+   * Validate whether this SparkPlan supports to be transformed into substrait 
node in Native Code.
+   */
+  final def doValidate(): ValidationResult = {
+    val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
+      .doSchemaValidate(schema)
+      .map {
+        reason =>
+          ValidationResult.failed(s"Found schema check failure for $schema, 
due to: $reason")
+      }
+      .getOrElse(ValidationResult.succeeded)
+    if (!schemaValidationResult.ok()) {
+      TestStats.addFallBackClassName(this.getClass.toString)
+      return schemaValidationResult
+    }
+    try {
+      TransformerState.enterValidation
+      val res = doValidateInternal()
+      if (!res.ok()) {
+        TestStats.addFallBackClassName(this.getClass.toString)
+      }
+      res
+    } catch {
+      case e @ (_: GlutenNotSupportException | _: 
UnsupportedOperationException) =>
+        if (!e.isInstanceOf[GlutenNotSupportException]) {
+          logDebug(s"Just a warning. This exception perhaps needs to be 
fixed.", e)
+        }
+        // FIXME: Use a validation-specific method to catch validation failures
+        TestStats.addFallBackClassName(this.getClass.toString)
+        logValidationMessage(
+          s"Validation failed with exception for plan: $nodeName, due to: 
${e.getMessage}",
+          e)
+        ValidationResult.failed(e.getMessage)
+    } finally {
+      TransformerState.finishValidation
+    }
+  }
+
+  protected def doValidateInternal(): ValidationResult = 
ValidationResult.succeeded
+
+  private def logValidationMessage(msg: => String, e: Throwable): Unit = {
+    if (glutenConf.printStackOnValidationFailure) {
+      logOnLevel(glutenConf.validationLogLevel, msg, e)
+    } else {
+      logOnLevel(glutenConf.validationLogLevel, msg)
+    }
+  }
+}
+
+/** Base interface for a query plan that can be interpreted to Substrait 
representation. */
+trait TransformSupport extends ValidatablePlan {
+  override def batchType(): Convention.BatchType = {
+    BackendsApiManager.getSettings.primaryBatchType
+  }
+
+  override def rowType0(): Convention.RowType = {
+    Convention.RowType.None
+  }
 
   final override def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException(
@@ -68,6 +138,17 @@ trait TransformSupport extends GlutenPlan {
    */
   def columnarInputRDDs: Seq[RDD[ColumnarBatch]]
 
+  // Since https://github.com/apache/incubator-gluten/pull/2185.
+  protected def doNativeValidation(context: SubstraitContext, node: RelNode): 
ValidationResult = {
+    if (node != null && glutenConf.enableNativeValidation) {
+      val planNode = PlanBuilder.makePlan(context, Lists.newArrayList(node))
+      BackendsApiManager.getValidatorApiInstance
+        .doNativeValidateWithFailureReason(planNode)
+    } else {
+      ValidationResult.succeeded
+    }
+  }
+
   final def transform(context: SubstraitContext): TransformContext = {
     if (isCanonicalizedPlan) {
       throw new IllegalStateException(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 3639ac522f..b49917c59f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -16,25 +16,12 @@
  */
 package org.apache.gluten.extension
 
-import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.expression.TransformerState
 import org.apache.gluten.extension.columnar.FallbackTag
 import org.apache.gluten.extension.columnar.FallbackTag.{Appendable, Converter}
 import org.apache.gluten.extension.columnar.FallbackTags.add
-import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.extension.columnar.validator.Validator
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.substrait.SubstraitContext
-import org.apache.gluten.substrait.plan.PlanBuilder
-import org.apache.gluten.substrait.rel.RelNode
-import org.apache.gluten.test.TestStats
 
 import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.execution.SparkPlan
-
-import com.google.common.collect.Lists
 
 sealed trait ValidationResult {
   def ok(): Boolean
@@ -80,88 +67,3 @@ object ValidationResult {
     }
   }
 }
-
-/** Every Gluten Operator should extend this trait. */
-trait GlutenPlan
-  extends SparkPlan
-  with Convention.KnownBatchType
-  with Convention.KnownRowTypeForSpark33AndLater
-  with LogLevelUtil {
-  protected lazy val enableNativeValidation = glutenConf.enableNativeValidation
-
-  protected def glutenConf: GlutenConfig = GlutenConfig.getConf
-
-  /**
-   * Validate whether this SparkPlan supports to be transformed into substrait 
node in Native Code.
-   */
-  final def doValidate(): ValidationResult = {
-    val schemaValidationResult = BackendsApiManager.getValidatorApiInstance
-      .doSchemaValidate(schema)
-      .map {
-        reason =>
-          ValidationResult.failed(s"Found schema check failure for $schema, 
due to: $reason")
-      }
-      .getOrElse(ValidationResult.succeeded)
-    if (!schemaValidationResult.ok()) {
-      TestStats.addFallBackClassName(this.getClass.toString)
-      return schemaValidationResult
-    }
-    try {
-      TransformerState.enterValidation
-      val res = doValidateInternal()
-      if (!res.ok()) {
-        TestStats.addFallBackClassName(this.getClass.toString)
-      }
-      res
-    } catch {
-      case e @ (_: GlutenNotSupportException | _: 
UnsupportedOperationException) =>
-        if (!e.isInstanceOf[GlutenNotSupportException]) {
-          logDebug(s"Just a warning. This exception perhaps needs to be 
fixed.", e)
-        }
-        // FIXME: Use a validation-specific method to catch validation failures
-        TestStats.addFallBackClassName(this.getClass.toString)
-        logValidationMessage(
-          s"Validation failed with exception for plan: $nodeName, due to: 
${e.getMessage}",
-          e)
-        ValidationResult.failed(e.getMessage)
-    } finally {
-      TransformerState.finishValidation
-    }
-  }
-
-  final override val supportsColumnar: Boolean = {
-    batchType() != Convention.BatchType.None
-  }
-
-  override def batchType(): Convention.BatchType = {
-    BackendsApiManager.getSettings.primaryBatchType
-  }
-
-  final override val supportsRowBased: Boolean = {
-    rowType() != Convention.RowType.None
-  }
-
-  override def rowType0(): Convention.RowType = {
-    Convention.RowType.None
-  }
-
-  protected def doValidateInternal(): ValidationResult = 
ValidationResult.succeeded
-
-  protected def doNativeValidation(context: SubstraitContext, node: RelNode): 
ValidationResult = {
-    if (node != null && glutenConf.enableNativeValidation) {
-      val planNode = PlanBuilder.makePlan(context, Lists.newArrayList(node))
-      BackendsApiManager.getValidatorApiInstance
-        .doNativeValidateWithFailureReason(planNode)
-    } else {
-      ValidationResult.succeeded
-    }
-  }
-
-  private def logValidationMessage(msg: => String, e: Throwable): Unit = {
-    if (glutenConf.printStackOnValidationFailure) {
-      logOnLevel(glutenConf.validationLogLevel, msg, e)
-    } else {
-      logOnLevel(glutenConf.validationLogLevel, msg)
-    }
-  }
-}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
index 49401797f9..982abc16c5 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.enumerated
 
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, ValidatablePlan}
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 import org.apache.gluten.extension.columnar.rewrite.RewriteSingleNode
 import org.apache.gluten.extension.columnar.validator.Validator
@@ -119,7 +119,9 @@ object RasOffload {
             validator.validate(from) match {
               case Validator.Passed =>
                 val offloadedPlan = base.offload(from)
-                val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case 
t: GlutenPlan => t }
+                val offloadedNodes = offloadedPlan.collect[ValidatablePlan] {
+                  case t: ValidatablePlan => t
+                }
                 val outComes = 
offloadedNodes.map(_.doValidate()).filter(!_.ok())
                 if (outComes.nonEmpty) {
                   // 5. If native validation fails on at least one of the 
offloaded nodes, return
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
index 5b5d5e541e..fd56240616 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RemoveSort.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.enumerated
 
-import org.apache.gluten.execution.{HashAggregateExecBaseTransformer, 
ShuffledHashJoinExecTransformerBase, SortExecTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, 
HashAggregateExecBaseTransformer, ShuffledHashJoinExecTransformerBase, 
SortExecTransformer}
 import org.apache.gluten.ras.path.Pattern._
 import org.apache.gluten.ras.path.Pattern.Matchers._
 import org.apache.gluten.ras.rule.{RasRule, Shape}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
index 44ed81f565..3418d3dddc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/ExpandFallbackPolicy.scala
@@ -17,12 +17,11 @@
 package org.apache.gluten.extension.columnar.heuristic
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.{FallbackTag, FallbackTags}
 import org.apache.gluten.extension.columnar.FallbackTags.add
 import org.apache.gluten.extension.columnar.transition.{BackendTransitions, 
ColumnarToRowLike, RowToColumnarLike}
 import org.apache.gluten.utils.PlanUtil
-
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.execution._
@@ -35,6 +34,7 @@ import org.apache.spark.sql.execution.exchange.Exchange
 
 
 
+
 // format: off
 /**
  * Note, this rule should only fallback to row-based plan if there is no harm.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 0fee61acac..fa698cd244 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
-import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.sql.shims.SparkShimLoader
@@ -330,8 +329,7 @@ object OffloadOthers {
             child)
         case p if !p.isInstanceOf[GlutenPlan] =>
           logDebug(s"Transformation for ${p.getClass} is currently not 
supported.")
-          val children = plan.children
-          p.withNewChildren(children)
+          p
         case other => other
       }
     }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 7f41e81072..7e7d732c29 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
 import org.apache.gluten.exception.GlutenNotSupportException
-import org.apache.gluten.execution.{BasicScanExecTransformer, 
ColumnarCoalesceExec, ColumnarUnionExec, ExpandExecTransformer, 
HashAggregateExecBaseTransformer, LimitExecTransformer, ProjectExecTransformer, 
ScanTransformerFactory, SortExecTransformer, 
TakeOrderedAndProjectExecTransformer, WindowExecTransformer, 
WindowGroupLimitExecTransformer, WriteFilesExecTransformer}
+import org.apache.gluten.execution._
 import org.apache.gluten.expression.ExpressionUtils
 import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.extension.columnar.offload.OffloadJoin
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
index 01a4380a14..1de490ad61 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.GlutenTimeMetric
 import org.apache.gluten.sql.shims.SparkShimLoader
 
@@ -41,7 +42,7 @@ import scala.util.control.NonFatal
 
 case class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan)
   extends BroadcastExchangeLike
-  with GlutenPlan {
+  with ValidatablePlan {
 
   // Note: "metrics" is made transient to avoid sending driver-side metrics to 
tasks.
   @transient override lazy val metrics: Map[String, SQLMetric] =
@@ -125,6 +126,10 @@ case class ColumnarBroadcastExchangeExec(mode: 
BroadcastMode, child: SparkPlan)
 
   override def outputPartitioning: Partitioning = BroadcastPartitioning(mode)
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override def doCanonicalize(): SparkPlan = {
     val canonicalized =
       
BackendsApiManager.getSparkPlanExecApiInstance.doCanonicalizeForBroadcastMode(mode)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index 9ec078e003..67895b439c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution._
-import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.rel.RelBuilder
@@ -173,13 +173,22 @@ case class ColumnarCollapseTransformStages(
   }
 }
 
+// TODO: Make this inherit from GlutenPlan.
 case class ColumnarInputAdapter(child: SparkPlan)
   extends InputAdapterGenerateTreeStringShim
-  with Convention.KnownBatchType {
+  with Convention.KnownBatchType
+  with Convention.KnownRowTypeForSpark33AndLater
+  with GlutenPlan.SupportsRowBasedCompatible
+  with ConventionReq.KnownChildConvention {
   override def output: Seq[Attribute] = child.output
-  override val supportsColumnar: Boolean = true
+  final override val supportsColumnar: Boolean = true
+  final override val supportsRowBased: Boolean = false
+  override def rowType0(): Convention.RowType = Convention.RowType.None
   override def batchType(): Convention.BatchType =
     BackendsApiManager.getSettings.primaryBatchType
+  override def requiredChildConvention(): Seq[ConventionReq] = Seq(
+    ConventionReq.ofBatch(
+      
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
   override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
child.executeColumnar()
   override def outputPartitioning: Partitioning = child.outputPartitioning
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index d4b33be292..007a93bd06 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -18,7 +18,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
+import org.apache.gluten.execution.ValidatablePlan
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark._
@@ -45,7 +47,7 @@ case class ColumnarShuffleExchangeExec(
     projectOutputAttributes: Seq[Attribute],
     advisoryPartitionSize: Option[Long] = None)
   extends ShuffleExchangeLike
-  with GlutenPlan {
+  with ValidatablePlan {
   private[sql] lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
 
@@ -148,6 +150,10 @@ case class ColumnarShuffleExchangeExec(
     super.stringArgs ++ Iterator(s"[shuffle_writer_type=$shuffleWriterType]")
   }
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException()
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
index 2c1edd04bb..6275fbb3aa 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarSubqueryBroadcastExec.scala
@@ -17,7 +17,8 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.Convention
 import org.apache.gluten.metrics.GlutenTimeMetric
 
 import org.apache.spark.rdd.RDD
@@ -107,6 +108,10 @@ case class ColumnarSubqueryBroadcastExec(
     relationFuture
   }
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
   override protected def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException(
       "SubqueryBroadcastExec does not support the execute() code path.")
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
index 25d6c4ed61..45f2026370 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarWriteFilesExec.scala
@@ -18,9 +18,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
+import org.apache.gluten.extension.columnar.transition.{Convention, 
ConventionReq}
 import org.apache.gluten.extension.columnar.transition.Convention.RowType
-import org.apache.gluten.extension.columnar.transition.ConventionReq
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.TaskContext
@@ -44,17 +44,19 @@ abstract class ColumnarWriteFilesExec protected (
     override val right: SparkPlan)
   extends BinaryExecNode
   with GlutenPlan
-  with ConventionReq.KnownChildrenConventions
   with ColumnarWriteFilesExec.ExecuteWriteCompatible {
 
   val child: SparkPlan = left
 
   override lazy val references: AttributeSet = AttributeSet.empty
 
-  override def requiredChildrenConventions(): Seq[ConventionReq] = {
-    List(
-      ConventionReq.ofBatch(
-        
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType)))
+  override def requiredChildConvention(): Seq[ConventionReq] = {
+    val req = ConventionReq.ofBatch(
+      
ConventionReq.BatchType.Is(BackendsApiManager.getSettings.primaryBatchType))
+    Seq.tabulate(2)(
+      _ => {
+        req
+      })
   }
 
   /**
@@ -67,6 +69,7 @@ abstract class ColumnarWriteFilesExec protected (
    *
    * Since https://github.com/apache/incubator-gluten/pull/6745.
    */
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
   override def rowType0(): RowType = {
     RowType.VanillaRow
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index d6167c931c..7a33350524 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -16,8 +16,7 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.execution.WholeStageTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
 import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.PlanUtil
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index 481e16b0a5..c28395941b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.events.GlutenPlanFallbackEvent
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.extension.columnar.FallbackTags
 import org.apache.gluten.logging.LogLevelUtil
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 435bf9239e..709673feab 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -16,16 +16,14 @@
  */
 package org.apache.spark.sql.execution
 
-import org.apache.gluten.execution.WholeStageTransformer
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
 import org.apache.gluten.utils.PlanUtil
-
 import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
 import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
-import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AQEShuffleReadExec, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
AdaptiveSparkPlanExec, QueryStageExec}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index fb42c55ba0..126417bf18 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -17,9 +17,8 @@
 package org.apache.spark.sql.execution.datasources
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.ColumnarToRowExecBase
+import org.apache.gluten.execution.{ColumnarToRowExecBase, GlutenPlan}
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
-import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.transition.{Convention, 
Transitions}
 
 import org.apache.spark.rdd.RDD
@@ -61,6 +60,8 @@ case class FakeRowAdaptor(child: SparkPlan)
 
   override def output: Seq[Attribute] = child.output
 
+  override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+
   override def rowType0(): Convention.RowType = Convention.RowType.VanillaRow
 
   override protected def doExecute(): RDD[InternalRow] = {
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
index 5daca9bede..fec36ac1ac 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuite.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.extension.columnar.transition
 
 import org.apache.gluten.exception.GlutenException
-import org.apache.gluten.execution.ColumnarToColumnarExec
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{ColumnarToColumnarExec, GlutenPlan}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -117,6 +116,7 @@ object TransitionSuite extends TransitionSuiteBase {
     extends RowToColumnarTransition
     with GlutenPlan {
     override def batchType(): Convention.BatchType = toBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
     override protected def doExecute(): RDD[InternalRow] =
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
index 43805b3d65..7ab9b5d0cf 100644
--- 
a/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
+++ 
b/gluten-substrait/src/test/scala/org/apache/gluten/extension/columnar/transition/TransitionSuiteBase.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.extension.columnar.transition
 
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
@@ -27,6 +27,7 @@ trait TransitionSuiteBase {
   case class BatchLeaf(override val batchType: Convention.BatchType)
     extends LeafExecNode
     with GlutenPlan {
+    override def rowType0(): Convention.RowType = Convention.RowType.None
 
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
 
@@ -36,6 +37,7 @@ trait TransitionSuiteBase {
   case class BatchUnary(override val batchType: Convention.BatchType, override 
val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
+    override def rowType0(): Convention.RowType = Convention.RowType.None
 
     override protected def withNewChildInternal(newChild: SparkPlan): 
SparkPlan =
       copy(child = newChild)
@@ -51,6 +53,7 @@ trait TransitionSuiteBase {
       override val right: SparkPlan)
     extends BinaryExecNode
     with GlutenPlan {
+    override def rowType0(): Convention.RowType = Convention.RowType.None
 
     override protected def withNewChildrenInternal(
         newLeft: SparkPlan,
diff --git 
a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala 
b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
index 3d26dd16c4..2a0ecaf92c 100644
--- a/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
+++ b/gluten-substrait/src/test/scala/org/apache/gluten/test/FallbackUtil.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.gluten.test
 
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.execution._
diff --git 
a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala 
b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
index 164083a8d8..8507233a57 100644
--- a/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
+++ b/gluten-substrait/src/test/scala/org/apache/spark/sql/GlutenQueryTest.scala
@@ -21,8 +21,8 @@ package org.apache.spark.sql
  *   1. We need to modify the way org.apache.spark.sql.CHQueryTest#compare 
compares double
  */
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.execution.TransformSupport
-import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.SPARK_VERSION_SHORT
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 7d4315e8d7..754c292418 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.RemoveFallbackTagRule
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
InsertBackendTransitions}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -200,6 +201,8 @@ private object FallbackStrategiesSuite {
 
 // For replacing LeafOp.
   case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
@@ -208,6 +211,8 @@ private object FallbackStrategiesSuite {
   case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = child.output
     override protected def withNewChildInternal(newChild: SparkPlan): 
UnaryOp1Transformer =
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index 7a3c4d1056..e525eb1a9c 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.statistics
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 
 import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e8cc7898c2..88e0ecf65a 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
InsertBackendTransitions}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -229,6 +230,8 @@ private object FallbackStrategiesSuite {
 
   // For replacing LeafOp.
   case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
@@ -237,6 +240,8 @@ private object FallbackStrategiesSuite {
   case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = child.output
     override protected def withNewChildInternal(newChild: SparkPlan): 
UnaryOp1Transformer =
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index 40874cd69d..34e97273d6 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.statistics
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 
 import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index e8cc7898c2..88e0ecf65a 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
InsertBackendTransitions}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -229,6 +230,8 @@ private object FallbackStrategiesSuite {
 
   // For replacing LeafOp.
   case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
@@ -237,6 +240,8 @@ private object FallbackStrategiesSuite {
   case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = child.output
     override protected def withNewChildInternal(newChild: SparkPlan): 
UnaryOp1Transformer =
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index 74c4df1977..e6cc2937a4 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.sources
 
 import org.apache.gluten.execution.{ProjectExecTransformer, 
SortExecTransformer}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 
 import org.apache.spark.SparkConf
 import org.apache.spark.executor.OutputMetrics
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index e5e122c906..f7cc114859 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.statistics
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 
 import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 1d45e8a672..8908047a33 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,13 +17,14 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{GlutenPlan, GlutenSessionExtensions}
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.{BasicScanExecTransformer, GlutenPlan}
+import org.apache.gluten.extension.GlutenSessionExtensions
 import org.apache.gluten.extension.columnar.{FallbackTags, 
RemoveFallbackTagRule}
 import 
org.apache.gluten.extension.columnar.ColumnarRuleApplier.ColumnarRuleCall
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.RemoveTopmostColumnarToRow
 import org.apache.gluten.extension.columnar.heuristic.{ExpandFallbackPolicy, 
HeuristicApplier}
-import org.apache.gluten.extension.columnar.transition.InsertBackendTransitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
InsertBackendTransitions}
 
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{GlutenSQLTestsTrait, SparkSession}
@@ -230,6 +231,8 @@ private object FallbackStrategiesSuite {
 
 // For replacing LeafOp.
   case class LeafOpTransformer() extends LeafExecNode with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = Seq.empty
   }
@@ -238,6 +241,8 @@ private object FallbackStrategiesSuite {
   case class UnaryOp1Transformer(override val child: SparkPlan)
     extends UnaryExecNode
     with GlutenPlan {
+    override def batchType(): Convention.BatchType = 
BackendsApiManager.getSettings.primaryBatchType
+    override def rowType0(): Convention.RowType = Convention.RowType.None
     override protected def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException()
     override def output: Seq[Attribute] = child.output
     override protected def withNewChildInternal(newChild: SparkPlan): 
UnaryOp1Transformer =
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
index 1cb905e10a..38e032aec3 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/sources/GlutenInsertSuite.scala
@@ -17,8 +17,8 @@
 package org.apache.spark.sql.sources
 
 import org.apache.gluten.GlutenColumnarWriteTestSupport
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.execution.SortExecTransformer
-import org.apache.gluten.extension.GlutenPlan
 
 import org.apache.spark.SparkConf
 import org.apache.spark.executor.OutputMetrics
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
index e5e122c906..f7cc114859 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/statistics/SparkFunctionStatistics.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.statistics
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 
 import org.apache.spark.sql.{GlutenTestConstants, QueryTest, SparkSession}
diff --git 
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
index d2a9611471..2088c90c01 100644
--- 
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
+++ 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/GlutenExpressionDataTypesValidation.scala
@@ -17,8 +17,7 @@
 package org.apache.spark.sql
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.execution.{ProjectExecTransformer, 
WholeStageTransformerSuite}
-import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.execution.{ProjectExecTransformer, TransformSupport, 
WholeStageTransformerSuite}
 import org.apache.gluten.utils.{BackendTestUtils, SystemParameters}
 
 import org.apache.spark.SparkConf
@@ -100,7 +99,8 @@ class GlutenExpressionDataTypesValidation extends 
WholeStageTransformerSuite {
       case _ => throw new UnsupportedOperationException("Not supported type: " 
+ t)
     }
   }
-  def generateGlutenProjectPlan(expr: Expression): GlutenPlan = {
+
+  def generateGlutenProjectPlan(expr: Expression): TransformSupport = {
     val namedExpr = Seq(Alias(expr, "r")())
     ProjectExecTransformer(namedExpr, DummyPlan())
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to