This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ed1f7b613 [CORE] Untangle AddTransformHintRule to extract
pre-validation code out (#5514)
ed1f7b613 is described below
commit ed1f7b6130515b51dcefa3e47a7f461744e6dd5f
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Apr 25 11:48:23 2024 +0800
[CORE] Untangle AddTransformHintRule to extract pre-validation code out
(#5514)
---
.../extension/columnar/TransformHintRule.scala | 576 ++++++++-------------
.../columnar/enumerated/ConditionedRule.scala | 70 +++
.../columnar/enumerated/EnumeratedTransform.scala | 8 +
.../extension/columnar/validator/Validator.scala | 42 ++
.../extension/columnar/validator/Validators.scala | 211 ++++++++
.../datasources/GlutenWriterColumnarRules.scala | 2 +
.../spark/sql/gluten/GlutenFallbackSuite.scala | 6 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 6 +-
.../spark/sql/gluten/GlutenFallbackSuite.scala | 6 +-
9 files changed, 556 insertions(+), 371 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
index 8f2607a97..1bd0d3b93 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/TransformHintRule.scala
@@ -20,9 +20,9 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
-import org.apache.gluten.expression.ExpressionUtils.getExpressionTreeDepth
import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
import
org.apache.gluten.extension.columnar.TransformHints.EncodeTransformableTagImplicits
+import org.apache.gluten.extension.columnar.validator.{Validator, Validators}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.api.python.EvalPythonExecTransformer
@@ -295,45 +295,16 @@ case class FallbackEmptySchemaRelation() extends
Rule[SparkPlan] {
// If false is returned or any unsupported exception is thrown, a row guard
will
// be added on the top of that plan to prevent actual conversion.
case class AddTransformHintRule() extends Rule[SparkPlan] {
- val columnarConf: GlutenConfig = GlutenConfig.getConf
- val scanOnly: Boolean = columnarConf.enableScanOnly
- val enableColumnarShuffle: Boolean =
- !scanOnly && BackendsApiManager.getSettings.supportColumnarShuffleExec()
- val enableColumnarSort: Boolean = !scanOnly &&
columnarConf.enableColumnarSort
- val enableColumnarWindow: Boolean = !scanOnly &&
columnarConf.enableColumnarWindow
- val enableColumnarWindowGroupLimit: Boolean = !scanOnly &&
- columnarConf.enableColumnarWindowGroupLimit
- val enableColumnarSortMergeJoin: Boolean = !scanOnly &&
- BackendsApiManager.getSettings.supportSortMergeJoinExec()
- val enableColumnarBatchScan: Boolean = columnarConf.enableColumnarBatchScan
- val enableColumnarFileScan: Boolean = columnarConf.enableColumnarFileScan
- val enableColumnarHiveTableScan: Boolean =
columnarConf.enableColumnarHiveTableScan
- val enableColumnarProject: Boolean = !scanOnly &&
columnarConf.enableColumnarProject
- val enableColumnarFilter: Boolean = columnarConf.enableColumnarFilter
- val fallbackExpressionsThreshold: Int =
columnarConf.fallbackExpressionsThreshold
- val enableColumnarHashAgg: Boolean = !scanOnly &&
columnarConf.enableColumnarHashAgg
- val enableColumnarUnion: Boolean = !scanOnly &&
columnarConf.enableColumnarUnion
- val enableColumnarExpand: Boolean = !scanOnly &&
columnarConf.enableColumnarExpand
- val enableColumnarShuffledHashJoin: Boolean =
- !scanOnly && columnarConf.enableColumnarShuffledHashJoin
- val enableColumnarBroadcastExchange: Boolean = !scanOnly &&
- columnarConf.enableColumnarBroadcastExchange
- val enableColumnarBroadcastJoin: Boolean = !scanOnly &&
- columnarConf.enableColumnarBroadcastJoin
- val enableColumnarLimit: Boolean = !scanOnly &&
columnarConf.enableColumnarLimit
- val enableColumnarGenerate: Boolean = !scanOnly &&
columnarConf.enableColumnarGenerate
- val enableColumnarCoalesce: Boolean = !scanOnly &&
columnarConf.enableColumnarCoalesce
- val enableTakeOrderedAndProject: Boolean =
- !scanOnly && columnarConf.enableTakeOrderedAndProject &&
- enableColumnarSort && enableColumnarLimit && enableColumnarShuffle &&
enableColumnarProject
- val enableColumnarWrite: Boolean =
BackendsApiManager.getSettings.enableNativeWriteFiles()
- val enableCartesianProduct: Boolean =
- BackendsApiManager.getSettings.supportCartesianProductExec() &&
- columnarConf.cartesianProductTransformerEnabled
- val enableBroadcastNestedLoopJoin: Boolean =
- BackendsApiManager.getSettings.supportBroadcastNestedLoopJoinExec() &&
- columnarConf.broadcastNestedLoopJoinTransformerTransformerEnabled &&
- enableColumnarBroadcastJoin
+ import AddTransformHintRule._
+ private val glutenConf: GlutenConfig = GlutenConfig.getConf
+ private val validator = Validators
+ .builder()
+ .fallbackByHint()
+ .fallbackIfScanOnlyWithFilterPushed(glutenConf.enableScanOnly)
+ .fallbackComplexExpressions()
+ .fallbackByBackendSettings()
+ .fallbackByUserOptions()
+ .build()
def apply(plan: SparkPlan): SparkPlan = {
addTransformableTags(plan)
@@ -348,365 +319,202 @@ case class AddTransformHintRule() extends
Rule[SparkPlan] {
}
private def addTransformableTag(plan: SparkPlan): Unit = {
- if (TransformHints.isNotTransformable(plan)) {
- logDebug(
- s"Skip adding transformable tag, since plan already tagged as " +
- s"${TransformHints.getHint(plan)}: ${plan.toString()}")
- return
+ val outcome = validator.validate(plan)
+ outcome match {
+ case Validator.Failed(reason) =>
+ TransformHints.tagNotTransformable(plan, reason)
+ return
+ case Validator.Passed =>
}
+
try {
plan match {
case plan: BatchScanExec =>
- if (!enableColumnarBatchScan) {
- TransformHints.tagNotTransformable(plan, "columnar BatchScan is
disabled")
- } else {
- // IF filter expressions aren't empty, we need to transform the
inner operators.
- if (plan.runtimeFilters.isEmpty) {
- val transformer =
- ScanTransformerFactory
- .createBatchScanTransformer(plan, validation = true)
- .asInstanceOf[BasicScanExecTransformer]
- transformer.doValidate().tagOnFallback(plan)
- }
+ // If filter expressions aren't empty, we need to transform the
inner operators.
+ if (plan.runtimeFilters.isEmpty) {
+ val transformer =
+ ScanTransformerFactory
+ .createBatchScanTransformer(plan, validation = true)
+ .asInstanceOf[BasicScanExecTransformer]
+ transformer.doValidate().tagOnFallback(plan)
}
case plan: FileSourceScanExec =>
- if (!enableColumnarFileScan) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar FileScan is not enabled in FileSourceScanExec")
- } else {
- // IF filter expressions aren't empty, we need to transform the
inner operators.
- if (plan.partitionFilters.isEmpty) {
- val transformer =
- ScanTransformerFactory.createFileSourceScanTransformer(plan,
validation = true)
- transformer.doValidate().tagOnFallback(plan)
- }
+ // If filter expressions aren't empty, we need to transform the
inner operators.
+ if (plan.partitionFilters.isEmpty) {
+ val transformer =
+ ScanTransformerFactory.createFileSourceScanTransformer(plan,
validation = true)
+ transformer.doValidate().tagOnFallback(plan)
}
case plan if HiveTableScanExecTransformer.isHiveTableScan(plan) =>
- if (!enableColumnarHiveTableScan) {
- TransformHints.tagNotTransformable(plan, "columnar hive table scan
is disabled")
- } else {
- HiveTableScanExecTransformer.validate(plan).tagOnFallback(plan)
- }
+ HiveTableScanExecTransformer.validate(plan).tagOnFallback(plan)
case plan: ProjectExec =>
- if (!enableColumnarProject) {
- TransformHints.tagNotTransformable(plan, "columnar project is
disabled")
- } else if (
- plan.projectList.size > 0 && plan.projectList
- .map(getExpressionTreeDepth(_))
- .max >= fallbackExpressionsThreshold
- ) {
- TransformHints.tagNotTransformable(
- plan,
- "Fall back project plan because its" +
- " max nested expressions number reaches the configured
threshold")
- } else {
- val transformer = ProjectExecTransformer(plan.projectList,
plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = ProjectExecTransformer(plan.projectList,
plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan: FilterExec =>
- val childIsScan = plan.child.isInstanceOf[FileSourceScanExec] ||
- plan.child.isInstanceOf[BatchScanExec]
- if (!enableColumnarFilter) {
- TransformHints.tagNotTransformable(plan, "columnar Filter is not
enabled in FilterExec")
- } else if (getExpressionTreeDepth(plan.condition) >=
fallbackExpressionsThreshold) {
- TransformHints.tagNotTransformable(
- plan,
- "Fall back filter plan because its" +
- " nested expressions number reaches the configured threshold")
- } else if (scanOnly && !childIsScan) {
- // When scanOnly is enabled, filter after scan will be offloaded.
- TransformHints.tagNotTransformable(
- plan,
- "ScanOnly enabled and plan child is not Scan in FilterExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genFilterExecTransformer(plan.condition, plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genFilterExecTransformer(plan.condition, plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan: HashAggregateExec =>
- if (!enableColumnarHashAgg) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar HashAggregate is not enabled in HashAggregateExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genHashAggregateExecTransformer(
- plan.requiredChildDistributionExpressions,
- plan.groupingExpressions,
- plan.aggregateExpressions,
- plan.aggregateAttributes,
- plan.initialInputBufferOffset,
- plan.resultExpressions,
- plan.child
- )
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genHashAggregateExecTransformer(
+ plan.requiredChildDistributionExpressions,
+ plan.groupingExpressions,
+ plan.aggregateExpressions,
+ plan.aggregateAttributes,
+ plan.initialInputBufferOffset,
+ plan.resultExpressions,
+ plan.child
+ )
+ transformer.doValidate().tagOnFallback(plan)
case plan: SortAggregateExec =>
- if (!BackendsApiManager.getSettings.replaceSortAggWithHashAgg) {
- TransformHints.tagNotTransformable(plan,
"replaceSortAggWithHashAgg is not enabled")
- } else if (!enableColumnarHashAgg) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar HashAgg is not enabled in SortAggregateExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genHashAggregateExecTransformer(
- plan.requiredChildDistributionExpressions,
- plan.groupingExpressions,
- plan.aggregateExpressions,
- plan.aggregateAttributes,
- plan.initialInputBufferOffset,
- plan.resultExpressions,
- plan.child
- )
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genHashAggregateExecTransformer(
+ plan.requiredChildDistributionExpressions,
+ plan.groupingExpressions,
+ plan.aggregateExpressions,
+ plan.aggregateAttributes,
+ plan.initialInputBufferOffset,
+ plan.resultExpressions,
+ plan.child
+ )
+ transformer.doValidate().tagOnFallback(plan)
case plan: ObjectHashAggregateExec =>
- if (!enableColumnarHashAgg) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar HashAgg is not enabled in ObjectHashAggregateExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genHashAggregateExecTransformer(
- plan.requiredChildDistributionExpressions,
- plan.groupingExpressions,
- plan.aggregateExpressions,
- plan.aggregateAttributes,
- plan.initialInputBufferOffset,
- plan.resultExpressions,
- plan.child
- )
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genHashAggregateExecTransformer(
+ plan.requiredChildDistributionExpressions,
+ plan.groupingExpressions,
+ plan.aggregateExpressions,
+ plan.aggregateAttributes,
+ plan.initialInputBufferOffset,
+ plan.resultExpressions,
+ plan.child
+ )
+ transformer.doValidate().tagOnFallback(plan)
case plan: UnionExec =>
- if (!enableColumnarUnion) {
- TransformHints.tagNotTransformable(plan, "columnar Union is not
enabled in UnionExec")
- } else {
- val transformer = ColumnarUnionExec(plan.children)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = ColumnarUnionExec(plan.children)
+ transformer.doValidate().tagOnFallback(plan)
case plan: ExpandExec =>
- if (!enableColumnarExpand) {
- TransformHints.tagNotTransformable(plan, "columnar Expand is not
enabled in ExpandExec")
- } else {
- val transformer = ExpandExecTransformer(plan.projections,
plan.output, plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
-
+ val transformer = ExpandExecTransformer(plan.projections,
plan.output, plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan: WriteFilesExec =>
- if (!enableColumnarWrite ||
!BackendsApiManager.getSettings.supportTransformWriteFiles) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar Write is not enabled in WriteFilesExec")
- } else {
- val transformer = WriteFilesExecTransformer(
- plan.child,
- plan.fileFormat,
- plan.partitionColumns,
- plan.bucketSpec,
- plan.options,
- plan.staticPartitions)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = WriteFilesExecTransformer(
+ plan.child,
+ plan.fileFormat,
+ plan.partitionColumns,
+ plan.bucketSpec,
+ plan.options,
+ plan.staticPartitions)
+ transformer.doValidate().tagOnFallback(plan)
case plan: SortExec =>
- if (!enableColumnarSort) {
- TransformHints.tagNotTransformable(plan, "columnar Sort is not
enabled in SortExec")
- } else {
- val transformer =
- SortExecTransformer(plan.sortOrder, plan.global, plan.child,
plan.testSpillFrequency)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer =
+ SortExecTransformer(plan.sortOrder, plan.global, plan.child,
plan.testSpillFrequency)
+ transformer.doValidate().tagOnFallback(plan)
case plan: ShuffleExchangeExec =>
- if (!enableColumnarShuffle) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar Shuffle is not enabled in ShuffleExchangeExec")
- } else {
- val transformer = ColumnarShuffleExchangeExec(plan, plan.child,
plan.child.output)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = ColumnarShuffleExchangeExec(plan, plan.child,
plan.child.output)
+ transformer.doValidate().tagOnFallback(plan)
case plan: ShuffledHashJoinExec =>
- if (!enableColumnarShuffledHashJoin) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar shufflehashjoin is not enabled in
ShuffledHashJoinExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genShuffledHashJoinExecTransformer(
- plan.leftKeys,
- plan.rightKeys,
- plan.joinType,
- plan.buildSide,
- plan.condition,
- plan.left,
- plan.right,
- plan.isSkewJoin)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genShuffledHashJoinExecTransformer(
+ plan.leftKeys,
+ plan.rightKeys,
+ plan.joinType,
+ plan.buildSide,
+ plan.condition,
+ plan.left,
+ plan.right,
+ plan.isSkewJoin)
+ transformer.doValidate().tagOnFallback(plan)
case plan: BroadcastExchangeExec =>
- // columnar broadcast is enabled only when columnar bhj is enabled.
- if (!enableColumnarBroadcastExchange) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar BroadcastExchange is not enabled in
BroadcastExchangeExec")
- } else {
- val transformer = ColumnarBroadcastExchangeExec(plan.mode,
plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = ColumnarBroadcastExchangeExec(plan.mode,
plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case bhj: BroadcastHashJoinExec =>
- if (!enableColumnarBroadcastJoin) {
- TransformHints.tagNotTransformable(
- bhj,
- "columnar BroadcastJoin is not enabled in BroadcastHashJoinExec")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genBroadcastHashJoinExecTransformer(
- bhj.leftKeys,
- bhj.rightKeys,
- bhj.joinType,
- bhj.buildSide,
- bhj.condition,
- bhj.left,
- bhj.right,
- isNullAwareAntiJoin = bhj.isNullAwareAntiJoin)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genBroadcastHashJoinExecTransformer(
+ bhj.leftKeys,
+ bhj.rightKeys,
+ bhj.joinType,
+ bhj.buildSide,
+ bhj.condition,
+ bhj.left,
+ bhj.right,
+ isNullAwareAntiJoin = bhj.isNullAwareAntiJoin)
+ transformer.doValidate().tagOnFallback(plan)
case plan: SortMergeJoinExec =>
- if (!enableColumnarSortMergeJoin) {
- TransformHints.tagNotTransformable(plan, "columnar sort merge join
is not enabled")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genSortMergeJoinExecTransformer(
- plan.leftKeys,
- plan.rightKeys,
- plan.joinType,
- plan.condition,
- plan.left,
- plan.right,
- plan.isSkewJoin)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genSortMergeJoinExecTransformer(
+ plan.leftKeys,
+ plan.rightKeys,
+ plan.joinType,
+ plan.condition,
+ plan.left,
+ plan.right,
+ plan.isSkewJoin)
+ transformer.doValidate().tagOnFallback(plan)
case plan: CartesianProductExec =>
- if (!enableCartesianProduct) {
- TransformHints.tagNotTransformable(
- plan,
- "conversion to CartesianProductTransformer is not enabled.")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genCartesianProductExecTransformer(plan.left, plan.right,
plan.condition)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genCartesianProductExecTransformer(plan.left, plan.right,
plan.condition)
+ transformer.doValidate().tagOnFallback(plan)
case plan: BroadcastNestedLoopJoinExec =>
- if (!enableBroadcastNestedLoopJoin) {
- TransformHints.tagNotTransformable(
- plan,
- "conversion to BroadcastNestedLoopJoinTransformer is not
enabled.")
- } else {
- val transformer = BackendsApiManager.getSparkPlanExecApiInstance
- .genBroadcastNestedLoopJoinExecTransformer(
- plan.left,
- plan.right,
- plan.buildSide,
- plan.joinType,
- plan.condition)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = BackendsApiManager.getSparkPlanExecApiInstance
+ .genBroadcastNestedLoopJoinExecTransformer(
+ plan.left,
+ plan.right,
+ plan.buildSide,
+ plan.joinType,
+ plan.condition)
+ transformer.doValidate().tagOnFallback(plan)
case plan: WindowExec =>
- if (!enableColumnarWindow) {
- TransformHints.tagNotTransformable(plan, "columnar window is not
enabled in WindowExec")
- } else {
- val transformer = WindowExecTransformer(
- plan.windowExpression,
- plan.partitionSpec,
- plan.orderSpec,
- plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = WindowExecTransformer(
+ plan.windowExpression,
+ plan.partitionSpec,
+ plan.orderSpec,
+ plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan if
SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
- if (!enableColumnarWindowGroupLimit) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar window group limit is not enabled in
WindowGroupLimitExec")
- } else {
- val windowGroupLimitPlan = SparkShimLoader.getSparkShims
- .getWindowGroupLimitExecShim(plan)
- .asInstanceOf[WindowGroupLimitExecShim]
- val transformer = WindowGroupLimitExecTransformer(
- windowGroupLimitPlan.partitionSpec,
- windowGroupLimitPlan.orderSpec,
- windowGroupLimitPlan.rankLikeFunction,
- windowGroupLimitPlan.limit,
- windowGroupLimitPlan.mode,
- windowGroupLimitPlan.child
- )
- transformer.doValidate().tagOnFallback(plan)
- }
+ val windowGroupLimitPlan = SparkShimLoader.getSparkShims
+ .getWindowGroupLimitExecShim(plan)
+ .asInstanceOf[WindowGroupLimitExecShim]
+ val transformer = WindowGroupLimitExecTransformer(
+ windowGroupLimitPlan.partitionSpec,
+ windowGroupLimitPlan.orderSpec,
+ windowGroupLimitPlan.rankLikeFunction,
+ windowGroupLimitPlan.limit,
+ windowGroupLimitPlan.mode,
+ windowGroupLimitPlan.child
+ )
+ transformer.doValidate().tagOnFallback(plan)
case plan: CoalesceExec =>
- if (!enableColumnarCoalesce) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar coalesce is not enabled in CoalesceExec")
- } else {
- val transformer = CoalesceExecTransformer(plan.numPartitions,
plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = CoalesceExecTransformer(plan.numPartitions,
plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan: GlobalLimitExec =>
- if (!enableColumnarLimit) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar limit is not enabled in GlobalLimitExec")
- } else {
- val (limit, offset) =
-
SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan)
- val transformer = LimitTransformer(plan.child, offset, limit)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val (limit, offset) =
+
SparkShimLoader.getSparkShims.getLimitAndOffsetFromGlobalLimit(plan)
+ val transformer = LimitTransformer(plan.child, offset, limit)
+ transformer.doValidate().tagOnFallback(plan)
case plan: LocalLimitExec =>
- if (!enableColumnarLimit) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar limit is not enabled in LocalLimitExec")
- } else {
- val transformer = LimitTransformer(plan.child, 0L, plan.limit)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer = LimitTransformer(plan.child, 0L, plan.limit)
+ transformer.doValidate().tagOnFallback(plan)
case plan: GenerateExec =>
- if (!enableColumnarGenerate) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar generate is not enabled in GenerateExec")
- } else {
- val transformer =
BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer(
- plan.generator,
- plan.requiredChildOutput,
- plan.outer,
- plan.generatorOutput,
- plan.child)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val transformer =
BackendsApiManager.getSparkPlanExecApiInstance.genGenerateTransformer(
+ plan.generator,
+ plan.requiredChildOutput,
+ plan.outer,
+ plan.generatorOutput,
+ plan.child)
+ transformer.doValidate().tagOnFallback(plan)
case plan: EvalPythonExec =>
val transformer = EvalPythonExecTransformer(plan.udfs,
plan.resultAttrs, plan.child)
transformer.doValidate().tagOnFallback(plan)
- case _: AQEShuffleReadExec =>
- // Considered transformable by default.
case plan: TakeOrderedAndProjectExec =>
- if (!enableTakeOrderedAndProject) {
- TransformHints.tagNotTransformable(
- plan,
- "columnar topK is not enabled in TakeOrderedAndProjectExec")
- } else {
- val (limit, offset) =
- SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
- val transformer = TakeOrderedAndProjectExecTransformer(
- limit,
- plan.sortOrder,
- plan.projectList,
- plan.child,
- offset)
- transformer.doValidate().tagOnFallback(plan)
- }
+ val (limit, offset) =
+ SparkShimLoader.getSparkShims.getLimitAndOffsetFromTopK(plan)
+ val transformer = TakeOrderedAndProjectExecTransformer(
+ limit,
+ plan.sortOrder,
+ plan.projectList,
+ plan.child,
+ offset)
+ transformer.doValidate().tagOnFallback(plan)
case _ =>
// Currently we assume a plan to be transformable by default.
}
@@ -723,6 +531,44 @@ case class AddTransformHintRule() extends Rule[SparkPlan] {
}
}
+object AddTransformHintRule {
+ implicit private class ValidatorBuilderImplicits(builder:
Validators.Builder) {
+
+ /**
+ * Fails validation on non-scan plan nodes if Gluten is running as
scan-only mode. Also, passes
+ * validation on filter for the exception that filter + scan is detected.
Because filters can be
+ * pushed into scan then the filter conditions will be processed only in
scan.
+ */
+ def fallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean):
Validators.Builder = {
+ builder.add(new FallbackIfScanOnlyWithFilterPushed(scanOnly))
+ builder
+ }
+ }
+
+ private class FallbackIfScanOnlyWithFilterPushed(scanOnly: Boolean) extends
Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = {
+ if (!scanOnly) {
+ return pass()
+ }
+ // Scan-only mode
+ plan match {
+ case _: BatchScanExec => pass()
+ case _: FileSourceScanExec => pass()
+ case p if HiveTableScanExecTransformer.isHiveTableScan(p) => pass()
+ case filter: FilterExec =>
+ val childIsScan = filter.child.isInstanceOf[FileSourceScanExec] ||
+ filter.child.isInstanceOf[BatchScanExec]
+ if (childIsScan) {
+ pass()
+ } else {
+ fail(filter)
+ }
+ case other => fail(other)
+ }
+ }
+ }
+}
+
case class RemoveTransformHintRule() extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = {
plan.foreach(TransformHints.untag)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
new file mode 100644
index 000000000..092d67efc
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/ConditionedRule.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.enumerated
+
+import org.apache.gluten.extension.columnar.validator.Validator
+import org.apache.gluten.ras.rule.{RasRule, Shape}
+
+import org.apache.spark.sql.execution.SparkPlan
+
+object ConditionedRule {
+ trait PreCondition {
+ def apply(node: SparkPlan): Boolean
+ }
+
+ object PreCondition {
+ implicit class FromValidator(validator: Validator) extends PreCondition {
+ override def apply(node: SparkPlan): Boolean = {
+ validator.validate(node) match {
+ case Validator.Passed => true
+ case Validator.Failed(reason) => false
+ }
+ }
+ }
+ }
+
+ trait PostCondition {
+ def apply(node: SparkPlan): Boolean
+ }
+
+ object PostCondition {
+ implicit class FromValidator(validator: Validator) extends PostCondition {
+ override def apply(node: SparkPlan): Boolean = {
+ validator.validate(node) match {
+ case Validator.Passed => true
+ case Validator.Failed(reason) => false
+ }
+ }
+ }
+ }
+
+ def wrap(
+ rule: RasRule[SparkPlan],
+ pre: ConditionedRule.PreCondition,
+ post: ConditionedRule.PostCondition): RasRule[SparkPlan] = {
+ new RasRule[SparkPlan] {
+ override def shift(node: SparkPlan): Iterable[SparkPlan] = {
+ val out = List(node)
+ .filter(pre.apply)
+ .flatMap(rule.shift)
+ .filter(post.apply)
+ out
+ }
+ override def shape(): Shape[SparkPlan] = rule.shape()
+ }
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 4ce768a06..27dc1be3d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -17,6 +17,7 @@
package org.apache.gluten.extension.columnar.enumerated
import org.apache.gluten.extension.columnar.{TransformExchange, TransformJoin,
TransformOthers, TransformSingleNode}
+import org.apache.gluten.extension.columnar.validator.Validator
import org.apache.gluten.planner.GlutenOptimization
import org.apache.gluten.planner.property.Conventions
import org.apache.gluten.ras.property.PropertySet
@@ -67,4 +68,11 @@ object EnumeratedTransform {
override def shape(): Shape[SparkPlan] = Shapes.fixedHeight(1)
}
+
+ // TODO: Currently not in use. Prepared for future development.
+ implicit private class RasRuleImplicits(rasRule: RasRule[SparkPlan]) {
+ def withValidator(pre: Validator, post: Validator): RasRule[SparkPlan] = {
+ ConditionedRule.wrap(rasRule, pre, post)
+ }
+ }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
new file mode 100644
index 000000000..000dbac7b
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validator.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.validator
+
+import org.apache.spark.sql.execution.SparkPlan
+
+trait Validator {
+ import Validator._
+ def validate(plan: SparkPlan): OutCome
+
+ final def pass(): OutCome = {
+ Passed
+ }
+
+ final def fail(p: SparkPlan): OutCome = {
+ Validator.Failed(s"[${getClass.getSimpleName}] Validation failed on node
${p.nodeName}")
+ }
+
+ final def fail(reason: String): OutCome = {
+ Validator.Failed(reason)
+ }
+}
+
+object Validator {
+ sealed trait OutCome
+ case object Passed extends OutCome
+ case class Failed private (reason: String) extends OutCome
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
new file mode 100644
index 000000000..57bcc7e09
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.validator
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
+import org.apache.gluten.expression.ExpressionUtils
+import org.apache.gluten.extension.columnar.{TRANSFORM_UNSUPPORTED,
TransformHints}
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
+import org.apache.spark.sql.execution.datasources.WriteFilesExec
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins._
+import org.apache.spark.sql.execution.window.WindowExec
+import org.apache.spark.sql.hive.HiveTableScanExecTransformer
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+object Validators {
+ def builder(): Builder = Builder()
+
+ class Builder private {
+ private val conf = GlutenConfig.getConf
+ private val settings = BackendsApiManager.getSettings
+ private val buffer: ListBuffer[Validator] = mutable.ListBuffer()
+
+ /** Fails validation if a plan node was already tagged with
TRANSFORM_UNSUPPORTED. */
+ def fallbackByHint(): Builder = {
+ buffer += FallbackByHint
+ this
+ }
+
+ /**
+ * Fails validation if a plan node includes an expression that is
considered too complex to
+ * executed by native library. By default, we use a threshold option in
config to make the
+ * decision.
+ */
+ def fallbackComplexExpressions(): Builder = {
+ buffer += new
FallbackComplexExpressions(conf.fallbackExpressionsThreshold)
+ this
+ }
+
+ /** Fails validation on non-scan plan nodes if Gluten is running as
scan-only mode. */
+ def fallbackIfScanOnly(): Builder = {
+ buffer += new FallbackIfScanOnly(conf.enableScanOnly)
+ this
+ }
+
+ /**
+ * Fails validation if native-execution of a plan node is not supported by
current backend
+ * implementation by checking the active BackendSettings.
+ */
+ def fallbackByBackendSettings(): Builder = {
+ buffer += new FallbackByBackendSettings(settings)
+ this
+ }
+
+ /**
+ * Fails validation if native-execution of a plan node is disabled by
Gluten/Spark
+ * configuration.
+ */
+ def fallbackByUserOptions(): Builder = {
+ buffer += new FallbackByUserOptions(conf)
+ this
+ }
+
+ /** Add a custom validator to pipeline. */
+ def add(validator: Validator): Builder = {
+ buffer += validator
+ this
+ }
+
+ def build(): Validator = {
+ if (buffer.isEmpty) {
+ NoopValidator
+ } else {
+ new ValidatorPipeline(buffer)
+ }
+ }
+ }
+
+ private object Builder {
+ def apply(): Builder = new Builder()
+ }
+
+ private object FallbackByHint extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = {
+ if (TransformHints.isNotTransformable(plan)) {
+ val hint =
TransformHints.getHint(plan).asInstanceOf[TRANSFORM_UNSUPPORTED]
+ return fail(hint.reason.getOrElse("Reason not recorded"))
+ }
+ pass()
+ }
+ }
+
+ private class FallbackComplexExpressions(threshold: Int) extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = {
+ if (plan.expressions.exists(e =>
ExpressionUtils.getExpressionTreeDepth(e) > threshold)) {
+ return fail(
+ s"Disabled because at least one present expression exceeded depth
threshold: " +
+ s"${plan.nodeName}")
+ }
+ pass()
+ }
+ }
+
+ private class FallbackIfScanOnly(scanOnly: Boolean) extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+ case _: BatchScanExec => pass()
+ case _: FileSourceScanExec => pass()
+ case p if HiveTableScanExecTransformer.isHiveTableScan(p) => pass()
+ case p if scanOnly => fail(p)
+ case _ => pass()
+ }
+ }
+
+ private class FallbackByBackendSettings(settings: BackendSettingsApi)
extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+ case p: ShuffleExchangeExec if !settings.supportColumnarShuffleExec() =>
fail(p)
+ case p: SortMergeJoinExec if !settings.supportSortMergeJoinExec() =>
fail(p)
+ case p: WriteFilesExec
+ if !(settings.enableNativeWriteFiles() &&
settings.supportTransformWriteFiles) =>
+ fail(p)
+ case p: SortAggregateExec if !settings.replaceSortAggWithHashAgg =>
+ fail(p)
+ case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
+ case p: BroadcastNestedLoopJoinExec if
!settings.supportBroadcastNestedLoopJoinExec() =>
+ fail(p)
+ case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
+ case _ => pass()
+ }
+ }
+
+ private class FallbackByUserOptions(conf: GlutenConfig) extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = plan match {
+ case p: SortExec if !conf.enableColumnarSort => fail(p)
+ case p: WindowExec if !conf.enableColumnarWindow => fail(p)
+ case p: SortMergeJoinExec if !conf.enableColumnarSortMergeJoin => fail(p)
+ case p: BatchScanExec if !conf.enableColumnarBatchScan => fail(p)
+ case p: FileSourceScanExec if !conf.enableColumnarFileScan => fail(p)
+ case p: ProjectExec if !conf.enableColumnarProject => fail(p)
+ case p: FilterExec if !conf.enableColumnarFilter => fail(p)
+ case p: UnionExec if !conf.enableColumnarUnion => fail(p)
+ case p: ExpandExec if !conf.enableColumnarExpand => fail(p)
+ case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin =>
fail(p)
+ case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p)
+ case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange
=> fail(p)
+ case p @ (_: LocalLimitExec | _: GlobalLimitExec) if
!conf.enableColumnarLimit => fail(p)
+ case p: GenerateExec if !conf.enableColumnarGenerate => fail(p)
+ case p: CoalesceExec if !conf.enableColumnarCoalesce => fail(p)
+ case p: CartesianProductExec if !conf.cartesianProductTransformerEnabled
=> fail(p)
+ case p: TakeOrderedAndProjectExec
+ if !(conf.enableTakeOrderedAndProject && conf.enableColumnarSort &&
+ conf.enableColumnarShuffle && conf.enableColumnarProject) =>
+ fail(p)
+ case p: BroadcastHashJoinExec if !conf.enableColumnarBroadcastJoin =>
+ fail(p)
+ case p: BroadcastNestedLoopJoinExec
+ if !(conf.enableColumnarBroadcastJoin &&
+ conf.broadcastNestedLoopJoinTransformerTransformerEnabled) =>
+ fail(p)
+ case p @ (_: HashAggregateExec | _: SortAggregateExec | _:
ObjectHashAggregateExec)
+ if !conf.enableColumnarHashAgg =>
+ fail(p)
+ case p
+ if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(
+ plan) && !conf.enableColumnarWindowGroupLimit =>
+ fail(p)
+ case p
+ if HiveTableScanExecTransformer.isHiveTableScan(p) &&
!conf.enableColumnarHiveTableScan =>
+ fail(p)
+ case _ => pass()
+ }
+ }
+
+ private class ValidatorPipeline(validators: Seq[Validator]) extends
Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = {
+ val init: Validator.OutCome = pass()
+ val finalOut = validators.foldLeft(init) {
+ case (out, validator) =>
+ out match {
+ case Validator.Passed => validator.validate(plan)
+ case Validator.Failed(_) => out
+ }
+ }
+ finalOut
+ }
+ }
+
+ private object NoopValidator extends Validator {
+ override def validate(plan: SparkPlan): Validator.OutCome = pass()
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
index 5b14eef18..f1adb09e2 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenWriterColumnarRules.scala
@@ -162,6 +162,8 @@ object GlutenWriterColumnarRules {
session.sparkContext.setLocalProperty(
"staticPartitionWriteOnly",
BackendsApiManager.getSettings.staticPartitionWriteOnly().toString)
+ // FIXME: We should only use context property if having no other
approaches.
+ // Should see if there is another way to pass these options.
session.sparkContext.setLocalProperty("isNativeAppliable",
format.isDefined.toString)
session.sparkContext.setLocalProperty("nativeFormat",
format.getOrElse(""))
if (format.isDefined) {
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 2e91e6c86..b85dd6a35 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -43,7 +43,7 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
}
}
val msgRegex = """Validation failed for plan: Scan parquet
default\.t\[QueryId=[0-9]+\],""" +
- """ due to: columnar FileScan is not enabled in FileSourceScanExec\."""
+ """ due to: \[FallbackByUserOptions\] Validation failed on node Scan
parquet default\.t\."""
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
}
}
@@ -90,7 +90,9 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
assert(execution.get.numFallbackNodes == 1)
val fallbackReason = execution.get.fallbackNodeToReason.head
assert(fallbackReason._1.contains("Scan parquet default.t"))
- assert(fallbackReason._2.contains("columnar FileScan is not enabled in
FileSourceScanExec"))
+ assert(
+ fallbackReason._2.contains(
+ "[FallbackByUserOptions] Validation failed on node Scan parquet
default.t"))
}
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 54f236b7b..9e8c7e542 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -45,7 +45,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
}
val msgRegex =
"""Validation failed for plan: Scan parquet
spark_catalog.default\.t\[QueryId=[0-9]+\],""" +
- """ due to: columnar FileScan is not enabled in
FileSourceScanExec\."""
+ """ due to: \[FallbackByUserOptions\] Validation failed on node Scan
parquet""" +
+ """ spark_catalog\.default\.t\.""".stripMargin
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
}
}
@@ -92,7 +93,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
assert(execution.get.numFallbackNodes == 1)
val fallbackReason = execution.get.fallbackNodeToReason.head
assert(fallbackReason._1.contains("Scan parquet
spark_catalog.default.t"))
- assert(fallbackReason._2.contains("columnar FileScan is not enabled in
FileSourceScanExec"))
+ assert(fallbackReason._2.contains(
+ "[FallbackByUserOptions] Validation failed on node Scan parquet
spark_catalog.default.t"))
}
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
index 54f236b7b..9e8c7e542 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala
@@ -45,7 +45,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
}
val msgRegex =
"""Validation failed for plan: Scan parquet
spark_catalog.default\.t\[QueryId=[0-9]+\],""" +
- """ due to: columnar FileScan is not enabled in
FileSourceScanExec\."""
+ """ due to: \[FallbackByUserOptions\] Validation failed on node Scan
parquet""" +
+ """ spark_catalog\.default\.t\.""".stripMargin
assert(testAppender.loggingEvents.exists(_.getMessage.getFormattedMessage.matches(msgRegex)))
}
}
@@ -92,7 +93,8 @@ class GlutenFallbackSuite extends GlutenSQLTestsTrait with
AdaptiveSparkPlanHelp
assert(execution.get.numFallbackNodes == 1)
val fallbackReason = execution.get.fallbackNodeToReason.head
assert(fallbackReason._1.contains("Scan parquet
spark_catalog.default.t"))
- assert(fallbackReason._2.contains("columnar FileScan is not enabled in
FileSourceScanExec"))
+ assert(fallbackReason._2.contains(
+ "[FallbackByUserOptions] Validation failed on node Scan parquet
spark_catalog.default.t"))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]