This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ed1f7b613 [CORE] Untangle AddTransformHintRule to extract 
pre-validation code out (#5514)
ed1f7b613 is described below

commit ed1f7b6130515b51dcefa3e47a7f461744e6dd5f
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 25 11:48:23 2024 +0800

    [CORE] Untangle AddTransformHintRule to extract pre-validation code out 
(#5514)
---
 .../extension/columnar/TransformHintRule.scala     | 576 ++++++++-------------
 .../columnar/enumerated/ConditionedRule.scala      |  70 +++
 .../columnar/enumerated/EnumeratedTransform.scala  |   8 +
 .../extension/columnar/validator/Validator.scala   |  42 ++
 .../extension/columnar/validator/Validators.scala  | 211 ++++++++
 .../datasources/GlutenWriterColumnarRules.scala    |   2 +
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |   6 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |   6 +-
 .../spark/sql/gluten/GlutenFallbackSuite.scala     |   6 +-
 9 files changed, 556 insertions(+), 371 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index 8f2607a97..1bd0d3b93 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -20,9 +20,9 @@ import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
-import org.apache.gluten.expression.ExpressionUtils.getExpressionTreeDepth
 import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
 import 
org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits
+import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
 import org.apache.gluten.sql.shims.SparkShimLoader
 
 import org.apache.spark.api.python.EvalPythonExecTransformer
@@ -295,45 +295,16 @@ case class FallbackEmptySchemaRelation() extends 
Rule[SparkPlan] {
 // If false is returned or any unsupported exception is thrown, a row guard 
will
 // be added on the top of that plan to prevent actual conversion.
 case class AddTransformHintRule() extends Rule[SparkPlan] {
-  val columnarConf: GlutenConfig = GlutenConfig.getConf
-  val scanOnly: Boolean = columnarConf.enableScanOnly
-  val enableColumnarShuffle: Boolean =
-    !scanOnly && BackendsApiManager.getSettings.supportColumnarShuffleExec()
-  val enableColumnarSort: Boolean = !scanOnly && 
columnarConf.enableColumnarSort
-  val enableColumnarWindow: Boolean = !scanOnly && 
columnarConf.enableColumnarWindow
-  val enableColumnarWindowGroupLimit: Boolean = !scanOnly &&
-    columnarConf.enableColumnarWindowGroupLimit
-  val enableColumnarSortMergeJoin: Boolean = !scanOnly &&
-    BackendsApiManager.getSettings.supportSortMergeJoinExec()
-  val enableColumnarBatchScan: Boolean = columnarConf.enableColumnarBatchScan
-  val enableColumnarFileScan: Boolean = columnarConf.enableColumnarFileScan
-  val enableColumnarHiveTableScan: Boolean = 
columnarConf.enableColumnarHiveTableScan
-  val enableColumnarProject: Boolean = !scanOnly && 
columnarConf.enableColumnarProject
-  val enableColumnarFilter: Boolean = columnarConf.enableColumnarFilter
-  val fallbackExpressionsThreshold: Int = 
columnarConf.fallbackExpressionsThreshold
-  val enableColumnarHashAgg: Boolean = !scanOnly && 
columnarConf.enableColumnarHashAgg
-  val enableColumnarUnion: Boolean = !scanOnly && 
columnarConf.enableColumnarUnion
-  val enableColumnarExpand: Boolean = !scanOnly && 
columnarConf.enableColumnarExpand
-  val enableColumnarShuffledHashJoin: Boolean =
-    !scanOnly && columnarConf.enableColumnarShuffledHashJoin
-  val enableColumnarBroadcastExchange: Boolean = !scanOnly &&
-    columnarConf.enableColumnarBroadcastExchange
-  val enableColumnarBroadcastJoin: Boolean = !scanOnly &&
-    columnarConf.enableColumnarBroadcastJoin
-  val enableColumnarLimit: Boolean = !scanOnly && 
columnarConf.enableColumnarLimit
-  val enableColumnarGenerate: Boolean = !scanOnly && 
columnarConf.enableColumnarGenerate
-  val enableColumnarCoalesce: Boolean = !scanOnly && 
columnarConf.enableColumnarCoalesce
-  val enableTakeOrderedAndProject: Boolean =
-    !scanOnly && columnarConf.enableTakeOrderedAndProject &&
-      enableColumnarSort && enableColumnarLimit && enableColumnarShuffle && 
enableColumnarProject
-  val enableColumnarWrite: Boolean = 
BackendsApiManager.getSettings.enableNativeWriteFiles()
-  val enableCartesianProduct: Boolean =
-    BackendsApiManager.getSettings.supportCartesianProductExec() &&
-      columnarConf.cartesianProductTransformerEnabled
-  val enableBroadcastNestedLoopJoin: Boolean =
-    BackendsApiManager.getSettings.supportBroadcastNestedLoopJoinExec() &&
-      columnarConf.broadcastNestedLoopJoinTransformerTransformerEnabled &&
-      enableColumnarBroadcastJoin
+  import AddTransformHintRule._
+  private val glutenConf: GlutenConfig = GlutenConfig.getConf
+  private val validator = Validators
+    .builder()
+    .fallbackByHint()
+    .fallbackIfScanOnlyWithFilterPushed(glutenConf.enableScanOnly)
+    .fallbackComplexExpressions()
+    .fallbackByBackendSettings()
+    .fallbackByUserOptions()
+    .build()
 
   def apply(plan: SparkPlan): SparkPlan = {
     addTransformableTags(plan)
@@ -348,365 +319,202 @@ case class AddTransformHintRule() extends 
Rule[SparkPlan] {
   }
 
   private def addTransformableTag(plan: SparkPlan): Unit = {
-    if (TransformHints.isNotTransformable(plan)) {
-      logDebug(
-        s"Skip adding transformable tag, since plan already tagged as " +
-          s"${TransformHints.getHint(plan)}: ${plan.toString()}")
-      return
+    val outcome = validator.validate(plan)
+    outcome match {
+      case Validator.Failed(reason) =>
+        TransformHints.tagNotTransformable(plan, reason)
+        return
+      case Validator.Passed =>
     }
+
     try {
       plan match {
         case plan: BatchScanExec =>
-          if (!enableColumnarBatchScan) {
-            TransformHints.tagNotTransformable(plan, "columnar BatchScan is 
disabled")
-          } else {
-            // IF filter expressions aren't empty, we need to transform the 
inner operators.
-            if (plan.runtimeFilters.isEmpty) {
-              val transformer =
-                ScanTransformerFactory
-                  .createBatchScanTransformer(plan, validation = true)
-                  .asInstanceOf[BasicScanExecTransformer]
-              transformer.doValidate().tagOnFallback(plan)
-            }
+          // If filter expressions aren't empty, we need to transform the 
inner operators.
+          if (plan.runtimeFilters.isEmpty) {
+            val transformer =
+              ScanTransformerFactory
+                .createBatchScanTransformer(plan, validation = true)
+                .asInstanceOf[BasicScanExecTransformer]
+            transformer.doValidate().tagOnFallback(plan)
           }
         case plan: FileSourceScanExec =>
-          if (!enableColumnarFileScan) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar FileScan is not enabled in FileSourceScanExec")
-          } else {
-            // IF filter expressions aren't empty, we need to transform the 
inner operators.
-            if (plan.partitionFilters.isEmpty) {
-              val transformer =
-                ScanTransformerFactory.createFileSourceScanTransformer(plan, 
validation = true)
-              transformer.doValidate().tagOnFallback(plan)
-            }
+          // If filter expressions aren't empty, we need to transform the 
inner operators.
+          if (plan.partitionFilters.isEmpty) {
+            val transformer =
+              ScanTransformerFactory.createFileSourceScanTransformer(plan, 
validation = true)
+            transformer.doValidate().tagOnFallback(plan)
           }
         case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
-          if (!enableColumnarHiveTableScan) {
-            TransformHints.tagNotTransformable(plan, "columnar hive table scan 
is disabled")
-          } else {
-            HiveTableScanExecTransformer.validate(plan).tagOnFallback(plan)
-          }
+          HiveTableScanExecTransformer.validate(plan).tagOnFallback(plan)
         case plan: ProjectExec =>
-          if (!enableColumnarProject) {
-            TransformHints.tagNotTransformable(plan, "columnar project is 
disabled")
-          } else if (
-            plan.projectList.size > 0 && plan.projectList
-              .map(getExpressionTreeDepth(_))
-              .max >= fallbackExpressionsThreshold
-          ) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "Fall back project plan because its" +
-                " max nested expressions number reaches the configured 
threshold")
-          } else {
-            val transformer = ProjectExecTransformer(plan.projectList, 
plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = ProjectExecTransformer(plan.projectList, 
plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: FilterExec =>
-          val childIsScan = plan.child.isInstanceOf[FileSourceScanExec] ||
-            plan.child.isInstanceOf[BatchScanExec]
-          if (!enableColumnarFilter) {
-            TransformHints.tagNotTransformable(plan, "columnar Filter is not 
enabled in FilterExec")
-          } else if (getExpressionTreeDepth(plan.condition) >= 
fallbackExpressionsThreshold) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "Fall back filter plan because its" +
-                " nested expressions number reaches the configured threshold")
-          } else if (scanOnly && !childIsScan) {
-            // When scanOnly is enabled, filter after scan will be offloaded.
-            TransformHints.tagNotTransformable(
-              plan,
-              "ScanOnly enabled and plan child is not Scan in FilterExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genFilterExecTransformer(plan.condition, plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genFilterExecTransformer(plan.condition, plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: HashAggregateExec =>
-          if (!enableColumnarHashAgg) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar HashAggregate is not enabled in HashAggregateExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genHashAggregateExecTransformer(
-                plan.requiredChildDistributionExpressions,
-                plan.groupingExpressions,
-                plan.aggregateExpressions,
-                plan.aggregateAttributes,
-                plan.initialInputBufferOffset,
-                plan.resultExpressions,
-                plan.child
-              )
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genHashAggregateExecTransformer(
+              plan.requiredChildDistributionExpressions,
+              plan.groupingExpressions,
+              plan.aggregateExpressions,
+              plan.aggregateAttributes,
+              plan.initialInputBufferOffset,
+              plan.resultExpressions,
+              plan.child
+            )
+          transformer.doValidate().tagOnFallback(plan)
         case plan: SortAggregateExec =>
-          if (!BackendsApiManager.getSettings.replaceSortAggWithHashAgg) {
-            TransformHints.tagNotTransformable(plan, 
"replaceSortAggWithHashAgg is not enabled")
-          } else if (!enableColumnarHashAgg) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar HashAgg is not enabled in SortAggregateExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genHashAggregateExecTransformer(
-                plan.requiredChildDistributionExpressions,
-                plan.groupingExpressions,
-                plan.aggregateExpressions,
-                plan.aggregateAttributes,
-                plan.initialInputBufferOffset,
-                plan.resultExpressions,
-                plan.child
-              )
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genHashAggregateExecTransformer(
+              plan.requiredChildDistributionExpressions,
+              plan.groupingExpressions,
+              plan.aggregateExpressions,
+              plan.aggregateAttributes,
+              plan.initialInputBufferOffset,
+              plan.resultExpressions,
+              plan.child
+            )
+          transformer.doValidate().tagOnFallback(plan)
         case plan: ObjectHashAggregateExec =>
-          if (!enableColumnarHashAgg) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar HashAgg is not enabled in ObjectHashAggregateExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genHashAggregateExecTransformer(
-                plan.requiredChildDistributionExpressions,
-                plan.groupingExpressions,
-                plan.aggregateExpressions,
-                plan.aggregateAttributes,
-                plan.initialInputBufferOffset,
-                plan.resultExpressions,
-                plan.child
-              )
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genHashAggregateExecTransformer(
+              plan.requiredChildDistributionExpressions,
+              plan.groupingExpressions,
+              plan.aggregateExpressions,
+              plan.aggregateAttributes,
+              plan.initialInputBufferOffset,
+              plan.resultExpressions,
+              plan.child
+            )
+          transformer.doValidate().tagOnFallback(plan)
         case plan: UnionExec =>
-          if (!enableColumnarUnion) {
-            TransformHints.tagNotTransformable(plan, "columnar Union is not 
enabled in UnionExec")
-          } else {
-            val transformer = ColumnarUnionExec(plan.children)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = ColumnarUnionExec(plan.children)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: ExpandExec =>
-          if (!enableColumnarExpand) {
-            TransformHints.tagNotTransformable(plan, "columnar Expand is not 
enabled in ExpandExec")
-          } else {
-            val transformer = ExpandExecTransformer(plan.projections, 
plan.output, plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
-
+          val transformer = ExpandExecTransformer(plan.projections, 
plan.output, plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: WriteFilesExec =>
-          if (!enableColumnarWrite || 
!BackendsApiManager.getSettings.supportTransformWriteFiles) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar Write is not enabled in WriteFilesExec")
-          } else {
-            val transformer = WriteFilesExecTransformer(
-              plan.child,
-              plan.fileFormat,
-              plan.partitionColumns,
-              plan.bucketSpec,
-              plan.options,
-              plan.staticPartitions)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = WriteFilesExecTransformer(
+            plan.child,
+            plan.fileFormat,
+            plan.partitionColumns,
+            plan.bucketSpec,
+            plan.options,
+            plan.staticPartitions)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: SortExec =>
-          if (!enableColumnarSort) {
-            TransformHints.tagNotTransformable(plan, "columnar Sort is not 
enabled in SortExec")
-          } else {
-            val transformer =
-              SortExecTransformer(plan.sortOrder, plan.global, plan.child, 
plan.testSpillFrequency)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer =
+            SortExecTransformer(plan.sortOrder, plan.global, plan.child, 
plan.testSpillFrequency)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: ShuffleExchangeExec =>
-          if (!enableColumnarShuffle) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar Shuffle is not enabled in ShuffleExchangeExec")
-          } else {
-            val transformer = ColumnarShuffleExchangeExec(plan, plan.child, 
plan.child.output)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = ColumnarShuffleExchangeExec(plan, plan.child, 
plan.child.output)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: ShuffledHashJoinExec =>
-          if (!enableColumnarShuffledHashJoin) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar shufflehashjoin is not enabled in 
ShuffledHashJoinExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genShuffledHashJoinExecTransformer(
-                plan.leftKeys,
-                plan.rightKeys,
-                plan.joinType,
-                plan.buildSide,
-                plan.condition,
-                plan.left,
-                plan.right,
-                plan.isSkewJoin)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genShuffledHashJoinExecTransformer(
+              plan.leftKeys,
+              plan.rightKeys,
+              plan.joinType,
+              plan.buildSide,
+              plan.condition,
+              plan.left,
+              plan.right,
+              plan.isSkewJoin)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: BroadcastExchangeExec =>
-          // columnar broadcast is enabled only when columnar bhj is enabled.
-          if (!enableColumnarBroadcastExchange) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar BroadcastExchange is not enabled in 
BroadcastExchangeExec")
-          } else {
-            val transformer = ColumnarBroadcastExchangeExec(plan.mode, 
plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = ColumnarBroadcastExchangeExec(plan.mode, 
plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case bhj: BroadcastHashJoinExec =>
-          if (!enableColumnarBroadcastJoin) {
-            TransformHints.tagNotTransformable(
-              bhj,
-              "columnar BroadcastJoin is not enabled in BroadcastHashJoinExec")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genBroadcastHashJoinExecTransformer(
-                bhj.leftKeys,
-                bhj.rightKeys,
-                bhj.joinType,
-                bhj.buildSide,
-                bhj.condition,
-                bhj.left,
-                bhj.right,
-                isNullAwareAntiJoin = bhj.isNullAwareAntiJoin)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genBroadcastHashJoinExecTransformer(
+              bhj.leftKeys,
+              bhj.rightKeys,
+              bhj.joinType,
+              bhj.buildSide,
+              bhj.condition,
+              bhj.left,
+              bhj.right,
+              isNullAwareAntiJoin = bhj.isNullAwareAntiJoin)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: SortMergeJoinExec =>
-          if (!enableColumnarSortMergeJoin) {
-            TransformHints.tagNotTransformable(plan, "columnar sort merge join 
is not enabled")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genSortMergeJoinExecTransformer(
-                plan.leftKeys,
-                plan.rightKeys,
-                plan.joinType,
-                plan.condition,
-                plan.left,
-                plan.right,
-                plan.isSkewJoin)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genSortMergeJoinExecTransformer(
+              plan.leftKeys,
+              plan.rightKeys,
+              plan.joinType,
+              plan.condition,
+              plan.left,
+              plan.right,
+              plan.isSkewJoin)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: CartesianProductExec =>
-          if (!enableCartesianProduct) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "conversion to CartesianProductTransformer is not enabled.")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genCartesianProductExecTransformer(plan.left, plan.right, 
plan.condition)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genCartesianProductExecTransformer(plan.left, plan.right, 
plan.condition)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: BroadcastNestedLoopJoinExec =>
-          if (!enableBroadcastNestedLoopJoin) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "conversion to BroadcastNestedLoopJoinTransformer is not 
enabled.")
-          } else {
-            val transformer = BackendsApiManager.getSparkPlanExecApiInstance
-              .genBroadcastNestedLoopJoinExecTransformer(
-                plan.left,
-                plan.right,
-                plan.buildSide,
-                plan.joinType,
-                plan.condition)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+            .genBroadcastNestedLoopJoinExecTransformer(
+              plan.left,
+              plan.right,
+              plan.buildSide,
+              plan.joinType,
+              plan.condition)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: WindowExec =>
-          if (!enableColumnarWindow) {
-            TransformHints.tagNotTransformable(plan, "columnar window is not 
enabled in WindowExec")
-          } else {
-            val transformer = WindowExecTransformer(
-              plan.windowExpression,
-              plan.partitionSpec,
-              plan.orderSpec,
-              plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = WindowExecTransformer(
+            plan.windowExpression,
+            plan.partitionSpec,
+            plan.orderSpec,
+            plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan if 
SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
-          if (!enableColumnarWindowGroupLimit) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar window group limit is not enabled in 
WindowGroupLimitExec")
-          } else {
-            val windowGroupLimitPlan = SparkShimLoader.getSparkShims
-              .getWindowGroupLimitExecShim(plan)
-              .asInstanceOf[WindowGroupLimitExecShim]
-            val transformer = WindowGroupLimitExecTransformer(
-              windowGroupLimitPlan.partitionSpec,
-              windowGroupLimitPlan.orderSpec,
-              windowGroupLimitPlan.rankLikeFunction,
-              windowGroupLimitPlan.limit,
-              windowGroupLimitPlan.mode,
-              windowGroupLimitPlan.child
-            )
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val windowGroupLimitPlan = SparkShimLoader.getSparkShims
+            .getWindowGroupLimitExecShim(plan)
+            .asInstanceOf[WindowGroupLimitExecShim]
+          val transformer = WindowGroupLimitExecTransformer(
+            windowGroupLimitPlan.partitionSpec,
+            windowGroupLimitPlan.orderSpec,
+            windowGroupLimitPlan.rankLikeFunction,
+            windowGroupLimitPlan.limit,
+            windowGroupLimitPlan.mode,
+            windowGroupLimitPlan.child
+          )
+          transformer.doValidate().tagOnFallback(plan)
         case plan: CoalesceExec =>
-          if (!enableColumnarCoalesce) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar coalesce is not enabled in CoalesceExec")
-          } else {
-            val transformer = CoalesceExecTransformer(plan.numPartitions, 
plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = CoalesceExecTransformer(plan.numPartitions, 
plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: GlobalLimitExec =>
-          if (!enableColumnarLimit) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar limit is not enabled in GlobalLimitExec")
-          } else {
-            val (limit, offset) =
-              
SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan)
-            val transformer = LimitTransformer(plan.child, offset, limit)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val (limit, offset) =
+            
SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan)
+          val transformer = LimitTransformer(plan.child, offset, limit)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: LocalLimitExec =>
-          if (!enableColumnarLimit) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar limit is not enabled in LocalLimitExec")
-          } else {
-            val transformer = LimitTransformer(plan.child, 0L, plan.limit)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = LimitTransformer(plan.child, 0L, plan.limit)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: GenerateExec =>
-          if (!enableColumnarGenerate) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar generate is not enabled in GenerateExec")
-          } else {
-            val transformer = 
BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer(
-              plan.generator,
-              plan.requiredChildOutput,
-              plan.outer,
-              plan.generatorOutput,
-              plan.child)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val transformer = 
BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer(
+            plan.generator,
+            plan.requiredChildOutput,
+            plan.outer,
+            plan.generatorOutput,
+            plan.child)
+          transformer.doValidate().tagOnFallback(plan)
         case plan: EvalPythonExec =>
           val transformer = EvalPythonExecTransformer(plan.udfs, 
plan.resultAttrs, plan.child)
           transformer.doValidate().tagOnFallback(plan)
-        case _: AQEShuffleReadExec =>
-        // Considered transformable by default.
         case plan: TakeOrderedAndProjectExec =>
-          if (!enableTakeOrderedAndProject) {
-            TransformHints.tagNotTransformable(
-              plan,
-              "columnar topK is not enabled in TakeOrderedAndProjectExec")
-          } else {
-            val (limit, offset) =
-              SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
-            val transformer = TakeOrderedAndProjectExecTransformer(
-              limit,
-              plan.sortOrder,
-              plan.projectList,
-              plan.child,
-              offset)
-            transformer.doValidate().tagOnFallback(plan)
-          }
+          val (limit, offset) =
+            SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
+          val transformer = TakeOrderedAndProjectExecTransformer(
+            limit,
+            plan.sortOrder,
+            plan.projectList,
+            plan.child,
+            offset)
+          transformer.doValidate().tagOnFallback(plan)
         case _ =>
         // Currently we assume a plan to be transformable by default.
       }
@@ -723,6 +531,44 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
   }
 }
 
+object AddTransformHintRule {
+  implicit private class ValidatorBuilderImplicits(builder: 
Validators.Builder) {
+
+    /**
+     * Fails validation on non-scan plan nodes if Gluten is running as 
scan-only mode. Also, passes
+     * validation on filter for the exception that filter + scan is detected. 
Because filters can be
+     * pushed into scan then the filter conditions will be processed only in 
scan.
+     */
+    def fallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean): 
Validators.Builder = {
+      builder.add(new FallbackIfScanOnlyWithFilterPushed(scanOnly))
+      builder
+    }
+  }
+
+  private class FallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean) extends 
Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = {
+      if (!scanOnly) {
+        return pass()
+      }
+      // Scan-only mode
+      plan match {
+        case _: BatchScanExec => pass()
+        case _: FileSourceScanExec => pass()
+        case p if HiveTableScanExecTransformer.isHiveTableScan(p) => pass()
+        case filter: FilterExec =>
+          val childIsScan = filter.child.isInstanceOf[FileSourceScanExec] ||
+            filter.child.isInstanceOf[BatchScanExec]
+          if (childIsScan) {
+            pass()
+          } else {
+            fail(filter)
+          }
+        case other => fail(other)
+      }
+    }
+  }
+}
+
 case class RemoveTransformHintRule() extends Rule[SparkPlan] {
   override def apply(plan: SparkPlan): SparkPlan = {
     plan.foreach(TransformHints.untag)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
new file mode 100644
index 000000000..092d67efc
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.enumerated
+
+import org.apache.gluten.extension.columnar.validator.Validator
+import org.apache.gluten.ras.rule.{RasRule, Shape}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+object ConditionedRule {
+  trait PreCondition {
+    def apply(node: SparkPlan): Boolean
+  }
+
+  object PreCondition {
+    implicit class FromValidator(validator: Validator) extends PreCondition {
+      override def apply(node: SparkPlan): Boolean = {
+        validator.validate(node) match {
+          case Validator.Passed => true
+          case Validator.Failed(reason) => false
+        }
+      }
+    }
+  }
+
+  trait PostCondition {
+    def apply(node: SparkPlan): Boolean
+  }
+
+  object PostCondition {
+    implicit class FromValidator(validator: Validator) extends PostCondition {
+      override def apply(node: SparkPlan): Boolean = {
+        validator.validate(node) match {
+          case Validator.Passed => true
+          case Validator.Failed(reason) => false
+        }
+      }
+    }
+  }
+
+  def wrap(
+      rule: RasRule[SparkPlan],
+      pre: ConditionedRule.PreCondition,
+      post: ConditionedRule.PostCondition): RasRule[SparkPlan] = {
+    new RasRule[SparkPlan] {
+      override def shift(node: SparkPlan): Iterable[SparkPlan] = {
+        val out = List(node)
+          .filter(pre.apply)
+          .flatMap(rule.shift)
+          .filter(post.apply)
+        out
+      }
+      override def shape(): Shape[SparkPlan] = rule.shape()
+    }
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 4ce768a06..27dc1be3d 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -17,6 +17,7 @@
 package org.apache.gluten.extension.columnar.enumerated
 
 import org.apache.gluten.extension.columnar.{TransformExchange, TransformJoin, 
TransformOthers, TransformSingleNode}
+import org.apache.gluten.extension.columnar.validator.Validator
 import org.apache.gluten.planner.GlutenOptimization
 import org.apache.gluten.planner.property.Conventions
 import org.apache.gluten.ras.property.PropertySet
@@ -67,4 +68,11 @@ object EnumeratedTransform {
 
     override def shape(): Shape[SparkPlan] = Shapes.fixedHeight(1)
   }
+
+  // TODO: Currently not in use. Prepared for future development.
+  implicit private class RasRuleImplicits(rasRule: RasRule[SparkPlan]) {
+    def withValidator(pre: Validator, post: Validator): RasRule[SparkPlan] = {
+      ConditionedRule.wrap(rasRule, pre, post)
+    }
+  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
new file mode 100644
index 000000000..000dbac7b
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.validator
+
+import org.apache.spark.sql.execution.SparkPlan
+
+trait Validator {
+  import Validator._
+  def validate(plan: SparkPlan): OutCome
+
+  final def pass(): OutCome = {
+    Passed
+  }
+
+  final def fail(p: SparkPlan): OutCome = {
+    Validator.Failed(s"[${getClass.getSimpleName}] Validation failed on node 
${p.nodeName}")
+  }
+
+  final def fail(reason: String): OutCome = {
+    Validator.Failed(reason)
+  }
+}
+
+object Validator {
+  sealed trait OutCome
+  case object Passed extends OutCome
+  case class Failed private (reason: String) extends OutCome
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
new file mode 100644
index 000000000..57bcc7e09
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.validator
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED, 
TransformHints}
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.window.WindowExec
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+object Validators {
+  def builder(): Builder = Builder()
+
+  class Builder private {
+    private val conf = GlutenConfig.getConf
+    private val settings = BackendsApiManager.getSettings
+    private val buffer: ListBuffer[Validator] = mutable.ListBuffer()
+
+    /** Fails validation if a plan node was already tagged with 
TRANSFORM_UNSUPPORTED. */
+    def fallbackByHint(): Builder = {
+      buffer += FallbackByHint
+      this
+    }
+
+    /**
+     * Fails validation if a plan node includes an expression that is 
considered too complex to
+     * executed by native library. By default, we use a threshold option in 
config to make the
+     * decision.
+     */
+    def fallbackComplexExpressions(): Builder = {
+      buffer += new 
FallbackComplexExpressions(conf.fallbackExpressionsThreshold)
+      this
+    }
+
+    /** Fails validation on non-scan plan nodes if Gluten is running as 
scan-only mode. */
+    def fallbackIfScanOnly(): Builder = {
+      buffer += new FallbackIfScanOnly(conf.enableScanOnly)
+      this
+    }
+
+    /**
+     * Fails validation if native-execution of a plan node is not supported by 
current backend
+     * implementation by checking the active BackendSettings.
+     */
+    def fallbackByBackendSettings(): Builder = {
+      buffer += new FallbackByBackendSettings(settings)
+      this
+    }
+
+    /**
+     * Fails validation if native-execution of a plan node is disabled by 
Gluten/Spark
+     * configuration.
+     */
+    def fallbackByUserOptions(): Builder = {
+      buffer += new FallbackByUserOptions(conf)
+      this
+    }
+
+    /** Add a custom validator to pipeline. */
+    def add(validator: Validator): Builder = {
+      buffer += validator
+      this
+    }
+
+    def build(): Validator = {
+      if (buffer.isEmpty) {
+        NoopValidator
+      } else {
+        new ValidatorPipeline(buffer)
+      }
+    }
+  }
+
+  private object Builder {
+    def apply(): Builder = new Builder()
+  }
+
+  private object FallbackByHint extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = {
+      if (TransformHints.isNotTransformable(plan)) {
+        val hint = 
TransformHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED]
+        return fail(hint.reason.getOrElse("Reason not recorded"))
+      }
+      pass()
+    }
+  }
+
+  private class FallbackComplexExpressions(threshold: Int) extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = {
+      if (plan.expressions.exists(e => 
ExpressionUtils.getExpressionTreeDepth(e) > threshold)) {
+        return fail(
+          s"Disabled because at least one present expression exceeded depth 
threshold: " +
+            s"${plan.nodeName}")
+      }
+      pass()
+    }
+  }
+
+  private class FallbackIfScanOnly(scanOnly: Boolean) extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+      case _: BatchScanExec => pass()
+      case _: FileSourceScanExec => pass()
+      case p if HiveTableScanExecTransformer.isHiveTableScan(p) => pass()
+      case p if scanOnly => fail(p)
+      case _ => pass()
+    }
+  }
+
+  private class FallbackByBackendSettings(settings: BackendSettingsApi) 
extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+      case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() => 
fail(p)
+      case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() => 
fail(p)
+      case p: WriteFilesExec
+          if !(settings.enableNativeWriteFiles() && 
settings.supportTransformWriteFiles) =>
+        fail(p)
+      case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
+        fail(p)
+      case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
+      case p: BroadcastNestedLoopJoinExec if 
!settings.supportBroadcastNestedLoopJoinExec() =>
+        fail(p)
+      case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
+      case _ => pass()
+    }
+  }
+
+  private class FallbackByUserOptions(conf: GlutenConfig) extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+      case p: SortExec if !conf.enableColumnarSort => fail(p)
+      case p: WindowExec if !conf.enableColumnarWindow => fail(p)
+      case p: SortMergeJoinExec if !conf.enableColumnarSortMergeJoin => fail(p)
+      case p: BatchScanExec if !conf.enableColumnarBatchScan => fail(p)
+      case p: FileSourceScanExec if !conf.enableColumnarFileScan => fail(p)
+      case p: ProjectExec if !conf.enableColumnarProject => fail(p)
+      case p: FilterExec if !conf.enableColumnarFilter => fail(p)
+      case p: UnionExec if !conf.enableColumnarUnion => fail(p)
+      case p: ExpandExec if !conf.enableColumnarExpand => fail(p)
+      case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin => 
fail(p)
+      case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p)
+      case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange 
=> fail(p)
+      case p @ (_: LocalLimitExec | _: GlobalLimitExec) if 
!conf.enableColumnarLimit => fail(p)
+      case p: GenerateExec if !conf.enableColumnarGenerate => fail(p)
+      case p: CoalesceExec if !conf.enableColumnarCoalesce => fail(p)
+      case p: CartesianProductExec if !conf.cartesianProductTransformerEnabled 
=> fail(p)
+      case p: TakeOrderedAndProjectExec
+          if !(conf.enableTakeOrderedAndProject && conf.enableColumnarSort &&
+            conf.enableColumnarShuffle && conf.enableColumnarProject) =>
+        fail(p)
+      case p: BroadcastHashJoinExec if !conf.enableColumnarBroadcastJoin =>
+        fail(p)
+      case p: BroadcastNestedLoopJoinExec
+          if !(conf.enableColumnarBroadcastJoin &&
+            conf.broadcastNestedLoopJoinTransformerTransformerEnabled) =>
+        fail(p)
+      case p @ (_: HashAggregateExec | _: SortAggregateExec | _: 
ObjectHashAggregateExec)
+          if !conf.enableColumnarHashAgg =>
+        fail(p)
+      case p
+          if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(
+            plan) && !conf.enableColumnarWindowGroupLimit =>
+        fail(p)
+      case p
+          if HiveTableScanExecTransformer.isHiveTableScan(p) && 
!conf.enableColumnarHiveTableScan =>
+        fail(p)
+      case _ => pass()
+    }
+  }
+
+  private class ValidatorPipeline(validators: Seq[Validator]) extends 
Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = {
+      val init: Validator.OutCome = pass()
+      val finalOut = validators.foldLeft(init) {
+        case (out, validator) =>
+          out match {
+            case Validator.Passed => validator.validate(plan)
+            case Validator.Failed(_) => out
+          }
+      }
+      finalOut
+    }
+  }
+
+  private object NoopValidator extends Validator {
+    override def validate(plan: SparkPlan): Validator.OutCome = pass()
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 5b14eef18..f1adb09e2 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -162,6 +162,8 @@ object GlutenWriterColumnarRules {
         session.sparkContext.setLocalProperty(
           "staticPartitionWriteOnly",
           BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
+        // FIXME: We should only use context property if having no other 
approaches.
+        //  Should see if there is another way to pass these options.
         session.sparkContext.setLocalProperty("isNativeAppliable", 
format.isDefined.toString)
         session.sparkContext.setLocalProperty("nativeFormat", 
format.getOrElse(""))
         if (format.isDefined) {
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 2e91e6c86..b85dd6a35 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -43,7 +43,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
         }
       }
       val msgRegex = """Validation failed for plan: Scan parquet 
default\.t\[QueryId=[0-9]+\],""" +
-        """ due to: columnar FileScan is not enabled in FileSourceScanExec\."""
+        """ due to: \[FallbackByUserOptions\] Validation failed on node Scan 
parquet default\.t\."""
       
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
     }
   }
@@ -90,7 +90,9 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
         assert(execution.get.numFallbackNodes == 1)
         val fallbackReason = execution.get.fallbackNodeToReason.head
         assert(fallbackReason._1.contains("Scan parquet default.t"))
-        assert(fallbackReason._2.contains("columnar FileScan is not enabled in 
FileSourceScanExec"))
+        assert(
+          fallbackReason._2.contains(
+            "[FallbackByUserOptions] Validation failed on node Scan parquet 
default.t"))
       }
     }
 
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 54f236b7b..9e8c7e542 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -45,7 +45,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       }
       val msgRegex =
         """Validation failed for plan: Scan parquet 
spark_catalog.default\.t\[QueryId=[0-9]+\],""" +
-          """ due to: columnar FileScan is not enabled in 
FileSourceScanExec\."""
+          """ due to: \[FallbackByUserOptions\] Validation failed on node Scan 
parquet""" +
+          """ spark_catalog\.default\.t\.""".stripMargin
       
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
     }
   }
@@ -92,7 +93,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
         assert(execution.get.numFallbackNodes == 1)
         val fallbackReason = execution.get.fallbackNodeToReason.head
         assert(fallbackReason._1.contains("Scan parquet 
spark_catalog.default.t"))
-        assert(fallbackReason._2.contains("columnar FileScan is not enabled in 
FileSourceScanExec"))
+        assert(fallbackReason._2.contains(
+          "[FallbackByUserOptions] Validation failed on node Scan parquet 
spark_catalog.default.t"))
       }
     }
 
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 54f236b7b..9e8c7e542 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -45,7 +45,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
       }
       val msgRegex =
         """Validation failed for plan: Scan parquet 
spark_catalog.default\.t\[QueryId=[0-9]+\],""" +
-          """ due to: columnar FileScan is not enabled in 
FileSourceScanExec\."""
+          """ due to: \[FallbackByUserOptions\] Validation failed on node Scan 
parquet""" +
+          """ spark_catalog\.default\.t\.""".stripMargin
       
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
     }
   }
@@ -92,7 +93,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with 
AdaptiveSparkPlanHelp
         assert(execution.get.numFallbackNodes == 1)
         val fallbackReason = execution.get.fallbackNodeToReason.head
         assert(fallbackReason._1.contains("Scan parquet 
spark_catalog.default.t"))
-        assert(fallbackReason._2.contains("columnar FileScan is not enabled in 
FileSourceScanExec"))
+        assert(fallbackReason._2.contains(
+          "[FallbackByUserOptions] Validation failed on node Scan parquet 
spark_catalog.default.t"))
       }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to