This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 2f35c6348 [CORE] Minor code cleanups against fallback tagging (#6320)
2f35c6348 is described below
commit 2f35c63488b3449bd4c4f2d024c46fb1e519a2f0
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jul 12 09:38:45 2024 +0900
[CORE] Minor code cleanups against fallback tagging (#6320)
---
.../gluten/backendsapi/clickhouse/CHBackend.scala | 18 +--
.../CHBroadcastNestedLoopJoinExecTransformer.scala | 4 +-
.../execution/CHGenerateExecTransformer.scala | 2 +-
.../execution/CHHashJoinExecTransformer.scala | 6 +-
.../execution/CHSortMergeJoinExecTransformer.scala | 2 +-
.../extension/FallbackBroadcaseHashJoinRules.scala | 16 +--
.../spark/sql/execution/CHColumnarToRowExec.scala | 2 +-
.../gluten/backendsapi/velox/VeloxBackend.scala | 14 +--
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 8 +-
.../gluten/execution/GenerateExecTransformer.scala | 4 +-
.../gluten/execution/VeloxColumnarToRowExec.scala | 2 +-
.../gluten/validate/NativePlanValidationInfo.java | 16 ++-
.../gluten/backendsapi/BackendSettingsApi.scala | 4 +-
.../BasicPhysicalOperatorTransformer.scala | 2 +-
.../execution/BasicScanExecTransformer.scala | 2 +-
.../execution/BatchScanExecTransformer.scala | 6 +-
.../BroadcastNestedLoopJoinExecTransformer.scala | 17 +--
.../CartesianProductExecTransformer.scala | 2 +-
.../gluten/execution/ExpandExecTransformer.scala | 4 +-
.../execution/FileSourceScanExecTransformer.scala | 8 +-
.../execution/GenerateExecTransformerBase.scala | 2 +-
.../HashAggregateExecBaseTransformer.scala | 4 +-
.../gluten/execution/JoinExecTransformer.scala | 2 +-
.../gluten/execution/SampleExecTransformer.scala | 2 +-
.../gluten/execution/ScanTransformerFactory.scala | 4 +-
.../gluten/execution/SortExecTransformer.scala | 2 +-
.../execution/SortMergeJoinExecTransformer.scala | 4 +-
.../TakeOrderedAndProjectExecTransformer.scala | 6 +-
.../gluten/execution/WindowExecTransformer.scala | 2 +-
.../WindowGroupLimitExecTransformer.scala | 2 +-
.../execution/WriteFilesExecTransformer.scala | 4 +-
.../org/apache/gluten/extension/GlutenPlan.scala | 46 ++++----
.../columnar/CollapseProjectExecTransformer.scala | 4 +-
.../extension/columnar/ExpandFallbackPolicy.scala | 4 +-
.../{FallbackTagRule.scala => FallbackRules.scala} | 122 +++++++++++----------
.../extension/columnar/OffloadSingleNode.scala | 10 +-
.../RemoveNativeWriteFilesSortAndProject.scala | 2 +-
.../columnar/enumerated/PushFilterToScan.scala | 2 +-
.../extension/columnar/enumerated/RasOffload.scala | 2 +-
.../rewrite/RewriteSparkPlanRulesManager.scala | 8 +-
.../extension/columnar/validator/Validators.scala | 6 +-
.../execution/ColumnarShuffleExchangeExec.scala | 7 +-
.../spark/sql/execution/GlutenExplainUtils.scala | 6 +-
.../sql/execution/GlutenFallbackReporter.scala | 52 ++-------
.../python/EvalPythonExecTransformer.scala | 2 +-
.../sql/hive/HiveTableScanExecTransformer.scala | 2 +-
.../gluten/execution/DeltaScanTransformer.scala | 2 +-
.../sql/extension/CustomerColumnarPreRules.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +--
.../sql/extension/CustomerColumnarPreRules.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +--
.../sql/extension/CustomerColumnarPreRules.scala | 2 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +--
.../sql/extension/CustomerColumnarPreRules.scala | 2 +-
54 files changed, 238 insertions(+), 259 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
index 6b23c6f39..341a3e0f0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala
@@ -172,20 +172,20 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
format match {
case ParquetReadFormat =>
if (validateFilePath) {
- ValidationResult.ok
+ ValidationResult.succeeded
} else {
- ValidationResult.notOk("Validate file path failed.")
+ ValidationResult.failed("Validate file path failed.")
}
- case OrcReadFormat => ValidationResult.ok
- case MergeTreeReadFormat => ValidationResult.ok
+ case OrcReadFormat => ValidationResult.succeeded
+ case MergeTreeReadFormat => ValidationResult.succeeded
case TextReadFormat =>
if (!hasComplexType) {
- ValidationResult.ok
+ ValidationResult.succeeded
} else {
- ValidationResult.notOk("Has complex type.")
+ ValidationResult.failed("Has complex type.")
}
- case JsonReadFormat => ValidationResult.ok
- case _ => ValidationResult.notOk(s"Unsupported file format $format")
+ case JsonReadFormat => ValidationResult.succeeded
+ case _ => ValidationResult.failed(s"Unsupported file format $format")
}
}
@@ -291,7 +291,7 @@ object CHBackendSettings extends BackendSettingsApi with
Logging {
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
options: Map[String, String]): ValidationResult =
- ValidationResult.notOk("CH backend is unsupported.")
+ ValidationResult.failed("CH backend is unsupported.")
override def enableNativeWriteFiles(): Boolean = {
GlutenConfig.getConf.enableNativeWriter.getOrElse(false)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
index 35be8ee0b..9c0f41361 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHBroadcastNestedLoopJoinExecTransformer.scala
@@ -98,12 +98,12 @@ case class CHBroadcastNestedLoopJoinExecTransformer(
case _: InnerLike =>
case _ =>
if (joinType == LeftSemi || condition.isDefined) {
- return ValidationResult.notOk(
+ return ValidationResult.failed(
s"Broadcast Nested Loop join is not supported join type $joinType
with conditions")
}
}
- ValidationResult.ok
+ ValidationResult.succeeded
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala
index 733c0a472..44cb0deca 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHGenerateExecTransformer.scala
@@ -64,7 +64,7 @@ case class CHGenerateExecTransformer(
override protected def doGeneratorValidate(
generator: Generator,
outer: Boolean): ValidationResult =
- ValidationResult.ok
+ ValidationResult.succeeded
override protected def getRelNode(
context: SubstraitContext,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
index da9d9c758..48870892d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHHashJoinExecTransformer.scala
@@ -60,7 +60,7 @@ case class CHShuffledHashJoinExecTransformer(
right.outputSet,
condition)
if (shouldFallback) {
- return ValidationResult.notOk("ch join validate fail")
+ return ValidationResult.failed("ch join validate fail")
}
super.doValidateInternal()
}
@@ -118,10 +118,10 @@ case class CHBroadcastHashJoinExecTransformer(
condition)
if (shouldFallback) {
- return ValidationResult.notOk("ch join validate fail")
+ return ValidationResult.failed("ch join validate fail")
}
if (isNullAwareAntiJoin) {
- return ValidationResult.notOk("ch does not support NAAJ")
+ return ValidationResult.failed("ch does not support NAAJ")
}
super.doValidateInternal()
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHSortMergeJoinExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHSortMergeJoinExecTransformer.scala
index e2b586551..670fbed69 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHSortMergeJoinExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHSortMergeJoinExecTransformer.scala
@@ -50,7 +50,7 @@ case class CHSortMergeJoinExecTransformer(
right.outputSet,
condition)
if (shouldFallback) {
- return ValidationResult.notOk("ch join validate fail")
+ return ValidationResult.failed("ch join validate fail")
}
super.doValidateInternal()
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
index c7f9b47de..842dc7615 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcaseHashJoinRules.scala
@@ -57,12 +57,12 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session:
SparkSession) extend
!columnarConf.enableColumnarBroadcastExchange ||
!columnarConf.enableColumnarBroadcastJoin
) {
- ValidationResult.notOk(
+ ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
"columnar broadcast join is disabled")
} else {
if (FallbackTags.nonEmpty(bhj)) {
- ValidationResult.notOk("broadcast join is already tagged as
not transformable")
+ ValidationResult.failed("broadcast join is already tagged as
not transformable")
} else {
val bhjTransformer =
BackendsApiManager.getSparkPlanExecApiInstance
.genBroadcastHashJoinExecTransformer(
@@ -75,7 +75,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session:
SparkSession) extend
bhj.right,
bhj.isNullAwareAntiJoin)
val isBhjTransformable = bhjTransformer.doValidate()
- if (isBhjTransformable.isValid) {
+ if (isBhjTransformable.ok()) {
val exchangeTransformer =
ColumnarBroadcastExchangeExec(mode, child)
exchangeTransformer.doValidate()
} else {
@@ -111,12 +111,12 @@ case class
FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend
!GlutenConfig.getConf.enableColumnarBroadcastExchange ||
!GlutenConfig.getConf.enableColumnarBroadcastJoin
) {
- ValidationResult.notOk(
+ ValidationResult.failed(
"columnar broadcast exchange is disabled or " +
"columnar broadcast join is disabled")
} else {
if (FallbackTags.nonEmpty(bnlj)) {
- ValidationResult.notOk("broadcast join is already tagged as not
transformable")
+ ValidationResult.failed("broadcast join is already tagged as not
transformable")
} else {
val transformer = BackendsApiManager.getSparkPlanExecApiInstance
.genBroadcastNestedLoopJoinExecTransformer(
@@ -126,7 +126,7 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session:
SparkSession) extend
bnlj.joinType,
bnlj.condition)
val isTransformable = transformer.doValidate()
- if (isTransformable.isValid) {
+ if (isTransformable.ok()) {
val exchangeTransformer = ColumnarBroadcastExchangeExec(mode,
child)
exchangeTransformer.doValidate()
} else {
@@ -242,7 +242,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession)
extends Rule[SparkPl
maybeExchange match {
case Some(exchange @ BroadcastExchangeExec(_, _)) =>
isTransformable.tagOnFallback(plan)
- if (!isTransformable.isValid) {
+ if (!isTransformable.ok) {
FallbackTags.add(exchange, isTransformable)
}
case None =>
@@ -280,7 +280,7 @@ case class FallbackBroadcastHashJoin(session: SparkSession)
extends Rule[SparkPl
plan,
"it's a materialized broadcast exchange or reused broadcast
exchange")
case ColumnarBroadcastExchangeExec(mode, child) =>
- if (!isTransformable.isValid) {
+ if (!isTransformable.ok) {
throw new IllegalStateException(
s"BroadcastExchange has already been" +
s" transformed to columnar version but BHJ is determined as"
+
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarToRowExec.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarToRowExec.scala
index 522c7d68f..29fa0d0ab 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarToRowExec.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarToRowExec.scala
@@ -58,7 +58,7 @@ case class CHColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBase(c
s"${field.dataType} is not supported in ColumnarToRowExecBase.")
}
}
- ValidationResult.ok
+ ValidationResult.succeeded
}
override def doExecuteInternal(): RDD[InternalRow] = {
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 8d98c111a..31f0324e3 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -79,9 +79,9 @@ object VeloxBackendSettings extends BackendSettingsApi {
// Collect unsupported types.
val unsupportedDataTypeReason = fields.collect(validatorFunc)
if (unsupportedDataTypeReason.isEmpty) {
- ValidationResult.ok
+ ValidationResult.succeeded
} else {
- ValidationResult.notOk(
+ ValidationResult.failed(
s"Found unsupported data type in $format:
${unsupportedDataTypeReason.mkString(", ")}.")
}
}
@@ -135,10 +135,10 @@ object VeloxBackendSettings extends BackendSettingsApi {
} else {
validateTypes(parquetTypeValidatorWithComplexTypeFallback)
}
- case DwrfReadFormat => ValidationResult.ok
+ case DwrfReadFormat => ValidationResult.succeeded
case OrcReadFormat =>
if (!GlutenConfig.getConf.veloxOrcScanEnabled) {
- ValidationResult.notOk(s"Velox ORC scan is turned off.")
+ ValidationResult.failed(s"Velox ORC scan is turned off.")
} else {
val typeValidator: PartialFunction[StructField, String] = {
case StructField(_, arrayType: ArrayType, _, _)
@@ -164,7 +164,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
validateTypes(orcTypeValidatorWithComplexTypeFallback)
}
}
- case _ => ValidationResult.notOk(s"Unsupported file format for $format.")
+ case _ => ValidationResult.failed(s"Unsupported file format for
$format.")
}
}
@@ -284,8 +284,8 @@ object VeloxBackendSettings extends BackendSettingsApi {
.orElse(validateDataTypes())
.orElse(validateWriteFilesOptions())
.orElse(validateBucketSpec()) match {
- case Some(reason) => ValidationResult.notOk(reason)
- case _ => ValidationResult.ok
+ case Some(reason) => ValidationResult.failed(reason)
+ case _ => ValidationResult.succeeded
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 2b9d01738..37b46df3e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -366,7 +366,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++
child.output
val projectTransformer = ProjectExecTransformer(projectList, child)
val validationResult = projectTransformer.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
val newChild = maybeAddAppendBatchesExec(projectTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild,
newChild.output.drop(1))
} else {
@@ -392,7 +392,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val projectTransformer = ProjectExecTransformer(projectList, child)
val projectBeforeSortValidationResult =
projectTransformer.doValidate()
// Make sure we support offload hash expression
- val projectBeforeSort = if
(projectBeforeSortValidationResult.isValid) {
+ val projectBeforeSort = if
(projectBeforeSortValidationResult.ok()) {
projectTransformer
} else {
val project = ProjectExec(projectList, child)
@@ -405,7 +405,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
val dropSortColumnTransformer =
ProjectExecTransformer(projectList.drop(1), sortByHashCode)
val validationResult = dropSortColumnTransformer.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
val newChild =
maybeAddAppendBatchesExec(dropSortColumnTransformer)
ColumnarShuffleExchangeExec(shuffle, newChild, newChild.output)
} else {
@@ -888,7 +888,7 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
case p @ LimitTransformer(SortExecTransformer(sortOrder, _, child, _),
0, count) =>
val global = child.outputPartitioning.satisfies(AllTuples)
val topN = TopNTransformer(count, sortOrder, global, child)
- if (topN.doValidate().isValid) {
+ if (topN.doValidate().ok()) {
topN
} else {
p
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
index 8ceea8c14..c7b81d55f 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/GenerateExecTransformer.scala
@@ -72,11 +72,11 @@ case class GenerateExecTransformer(
generator: Generator,
outer: Boolean): ValidationResult = {
if (!supportsGenerate(generator, outer)) {
- ValidationResult.notOk(
+ ValidationResult.failed(
s"Velox backend does not support this generator:
${generator.getClass.getSimpleName}" +
s", outer: $outer")
} else {
- ValidationResult.ok
+ ValidationResult.succeeded
}
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
index 1a5425520..2c46893e4 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala
@@ -66,7 +66,7 @@ case class VeloxColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowExecBas
s"VeloxColumnarToRowExec.")
}
}
- ValidationResult.ok
+ ValidationResult.succeeded
}
override def doExecuteInternal(): RDD[InternalRow] = {
diff --git
a/gluten-core/src/main/java/org/apache/gluten/validate/NativePlanValidationInfo.java
b/gluten-core/src/main/java/org/apache/gluten/validate/NativePlanValidationInfo.java
index 12f050c66..9cfad44d6 100644
---
a/gluten-core/src/main/java/org/apache/gluten/validate/NativePlanValidationInfo.java
+++
b/gluten-core/src/main/java/org/apache/gluten/validate/NativePlanValidationInfo.java
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.validate;
+import org.apache.gluten.extension.ValidationResult;
+
import java.util.Vector;
public class NativePlanValidationInfo {
@@ -30,11 +32,13 @@ public class NativePlanValidationInfo {
}
}
- public boolean isSupported() {
- return isSupported == 1;
- }
-
- public Vector<String> getFallbackInfo() {
- return fallbackInfo;
+ public ValidationResult asResult() {
+ if (isSupported == 1) {
+ return ValidationResult.succeeded();
+ }
+ return ValidationResult.failed(
+ String.format(
+ "Native validation failed: %n%s",
+ fallbackInfo.stream().reduce((l, r) -> l + "\n" + r)));
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 07ead8860..8b4c18b01 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -35,12 +35,12 @@ trait BackendSettingsApi {
format: ReadFileFormat,
fields: Array[StructField],
partTable: Boolean,
- paths: Seq[String]): ValidationResult = ValidationResult.ok
+ paths: Seq[String]): ValidationResult = ValidationResult.succeeded
def supportWriteFilesExec(
format: FileFormat,
fields: Array[StructField],
bucketSpec: Option[BucketSpec],
- options: Map[String, String]): ValidationResult = ValidationResult.ok
+ options: Map[String, String]): ValidationResult =
ValidationResult.succeeded
def supportNativeWrite(fields: Array[StructField]): Boolean = true
def supportNativeMetadataColumns(): Boolean = false
def supportNativeRowIndexColumn(): Boolean = false
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 97b4c3a3f..8e87baf53 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -113,7 +113,7 @@ abstract class FilterExecTransformerBase(val cond:
Expression, val input: SparkP
if (remainingCondition == null) {
// All the filters can be pushed down and the computing of this Filter
// is not needed.
- return ValidationResult.ok
+ return ValidationResult.succeeded
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index 64071fb14..99f145eea 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -88,7 +88,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
val validationResult = BackendsApiManager.getSettings
.supportFileFormatRead(fileFormat, fields, getPartitionSchema.nonEmpty,
getInputFilePaths)
- if (!validationResult.isValid) {
+ if (!validationResult.ok()) {
return validationResult
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 6bff68895..4860847de 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -133,18 +133,18 @@ abstract class BatchScanExecTransformerBase(
override def doValidateInternal(): ValidationResult = {
if (pushedAggregate.nonEmpty) {
- return ValidationResult.notOk(s"Unsupported aggregation push down for
$scan.")
+ return ValidationResult.failed(s"Unsupported aggregation push down for
$scan.")
}
if (
SparkShimLoader.getSparkShims.findRowIndexColumnIndexInSchema(schema) >
0 &&
!BackendsApiManager.getSettings.supportNativeRowIndexColumn()
) {
- return ValidationResult.notOk("Unsupported row index column scan in
native.")
+ return ValidationResult.failed("Unsupported row index column scan in
native.")
}
if (hasUnsupportedColumns) {
- return ValidationResult.notOk(s"Unsupported columns scan in native.")
+ return ValidationResult.failed(s"Unsupported columns scan in native.")
}
super.doValidateInternal()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
index b90c1ad8b..ae407b3b3 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/BroadcastNestedLoopJoinExecTransformer.scala
@@ -163,34 +163,35 @@ abstract class BroadcastNestedLoopJoinExecTransformer(
def validateJoinTypeAndBuildSide(): ValidationResult = {
val result = joinType match {
- case _: InnerLike | LeftOuter | RightOuter => ValidationResult.ok
+ case _: InnerLike | LeftOuter | RightOuter => ValidationResult.succeeded
case _ =>
- ValidationResult.notOk(s"$joinType join is not supported with
BroadcastNestedLoopJoin")
+ ValidationResult.failed(s"$joinType join is not supported with
BroadcastNestedLoopJoin")
}
- if (!result.isValid) {
+ if (!result.ok()) {
return result
}
(joinType, buildSide) match {
case (LeftOuter, BuildLeft) | (RightOuter, BuildRight) =>
- ValidationResult.notOk(s"$joinType join is not supported with
$buildSide")
- case _ => ValidationResult.ok // continue
+ ValidationResult.failed(s"$joinType join is not supported with
$buildSide")
+ case _ => ValidationResult.succeeded // continue
}
}
override protected def doValidateInternal(): ValidationResult = {
if
(!GlutenConfig.getConf.broadcastNestedLoopJoinTransformerTransformerEnabled) {
- return ValidationResult.notOk(
+ return ValidationResult.failed(
s"Config
${GlutenConfig.BROADCAST_NESTED_LOOP_JOIN_TRANSFORMER_ENABLED.key} not enabled")
}
if (substraitJoinType == CrossRel.JoinType.UNRECOGNIZED) {
- return ValidationResult.notOk(s"$joinType join is not supported with
BroadcastNestedLoopJoin")
+ return ValidationResult.failed(
+ s"$joinType join is not supported with BroadcastNestedLoopJoin")
}
val validateResult = validateJoinTypeAndBuildSide()
- if (!validateResult.isValid) {
+ if (!validateResult.ok()) {
return validateResult
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
index 91831f184..0dd110fa5 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/CartesianProductExecTransformer.scala
@@ -112,7 +112,7 @@ case class CartesianProductExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportCartesianProductExec()) {
- return ValidationResult.notOk("Cartesian product is not supported in
this backend")
+ return ValidationResult.failed("Cartesian product is not supported in
this backend")
}
val substraitContext = new SubstraitContext
val expressionNode = condition.map {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala
index 362debb53..63f76a25a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ExpandExecTransformer.scala
@@ -95,10 +95,10 @@ case class ExpandExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportExpandExec()) {
- return ValidationResult.notOk("Current backend does not support expand")
+ return ValidationResult.failed("Current backend does not support expand")
}
if (projections.isEmpty) {
- return ValidationResult.notOk("Current backend does not support empty
projections in expand")
+ return ValidationResult.failed("Current backend does not support empty
projections in expand")
}
val substraitContext = new SubstraitContext
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index 4f120488c..3b8ed1167 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -130,23 +130,23 @@ abstract class FileSourceScanExecTransformerBase(
if (
!metadataColumns.isEmpty &&
!BackendsApiManager.getSettings.supportNativeMetadataColumns()
) {
- return ValidationResult.notOk(s"Unsupported metadata columns scan in
native.")
+ return ValidationResult.failed(s"Unsupported metadata columns scan in
native.")
}
if (
SparkShimLoader.getSparkShims.findRowIndexColumnIndexInSchema(schema) >
0 &&
!BackendsApiManager.getSettings.supportNativeRowIndexColumn()
) {
- return ValidationResult.notOk("Unsupported row index column scan in
native.")
+ return ValidationResult.failed("Unsupported row index column scan in
native.")
}
if (hasUnsupportedColumns) {
- return ValidationResult.notOk(s"Unsupported columns scan in native.")
+ return ValidationResult.failed(s"Unsupported columns scan in native.")
}
if (hasFieldIds) {
// Spark read schema expects field Ids , the case didn't support yet by
native.
- return ValidationResult.notOk(
+ return ValidationResult.failed(
s"Unsupported matching schema column names " +
s"by field ids in native scan.")
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala
index b5c9b85ae..af4a92f19 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/GenerateExecTransformerBase.scala
@@ -67,7 +67,7 @@ abstract class GenerateExecTransformerBase(
override protected def doValidateInternal(): ValidationResult = {
val validationResult = doGeneratorValidate(generator, outer)
- if (!validationResult.isValid) {
+ if (!validationResult.ok()) {
return validationResult
}
val context = new SubstraitContext
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
index b200426d9..9a28af801 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/HashAggregateExecBaseTransformer.scala
@@ -117,7 +117,7 @@ abstract class HashAggregateExecBaseTransformer(
val unsupportedAggExprs = aggregateAttributes.filterNot(attr =>
checkType(attr.dataType))
if (unsupportedAggExprs.nonEmpty) {
- return ValidationResult.notOk(
+ return ValidationResult.failed(
"Found unsupported data type in aggregation expression: " +
unsupportedAggExprs
.map(attr => s"${attr.name}#${attr.exprId.id}:${attr.dataType}")
@@ -125,7 +125,7 @@ abstract class HashAggregateExecBaseTransformer(
}
val unsupportedGroupExprs = groupingExpressions.filterNot(attr =>
checkType(attr.dataType))
if (unsupportedGroupExprs.nonEmpty) {
- return ValidationResult.notOk(
+ return ValidationResult.failed(
"Found unsupported data type in grouping expression: " +
unsupportedGroupExprs
.map(attr => s"${attr.name}#${attr.exprId.id}:${attr.dataType}")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index cd22c5785..86e6c1f41 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -210,7 +210,7 @@ trait HashJoinLikeExecTransformer extends BaseJoinExec with
TransformSupport {
// Firstly, need to check if the Substrait plan for this operator can be
successfully generated.
if (substraitJoinType == JoinRel.JoinType.UNRECOGNIZED) {
return ValidationResult
- .notOk(s"Unsupported join type of $hashJoinType for substrait:
$substraitJoinType")
+ .failed(s"Unsupported join type of $hashJoinType for substrait:
$substraitJoinType")
}
val relNode = JoinUtils.createJoinRel(
streamedKeyExprs,
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/SampleExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/SampleExecTransformer.scala
index 6f9ef3428..bed59b913 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/SampleExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/SampleExecTransformer.scala
@@ -99,7 +99,7 @@ case class SampleExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (withReplacement) {
- return ValidationResult.notOk(
+ return ValidationResult.failed(
"Unsupported sample exec in native with " +
s"withReplacement parameter is $withReplacement")
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
index 44a823834..a05a5e72b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/ScanTransformerFactory.scala
@@ -95,11 +95,11 @@ object ScanTransformerFactory {
transformer.setPushDownFilters(allPushDownFilters.get)
// Validate again if allPushDownFilters is defined.
val validationResult = transformer.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
transformer
} else {
val newSource = batchScan.copy(runtimeFilters =
transformer.runtimeFilters)
- FallbackTags.add(newSource, validationResult.reason.get)
+ FallbackTags.add(newSource, validationResult.reason())
newSource
}
} else {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala
index f79dc69e6..b69925d60 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortExecTransformer.scala
@@ -91,7 +91,7 @@ case class SortExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportSortExec()) {
- return ValidationResult.notOk("Current backend does not support sort")
+ return ValidationResult.failed("Current backend does not support sort")
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
index f032c4ca0..c96789569 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/SortMergeJoinExecTransformer.scala
@@ -164,7 +164,7 @@ abstract class SortMergeJoinExecTransformerBase(
// Firstly, need to check if the Substrait plan for this operator can be
successfully generated.
if (substraitJoinType == JoinRel.JoinType.UNRECOGNIZED) {
return ValidationResult
- .notOk(s"Found unsupported join type of $joinType for substrait:
$substraitJoinType")
+ .failed(s"Found unsupported join type of $joinType for substrait:
$substraitJoinType")
}
val relNode = JoinUtils.createJoinRel(
streamedKeys,
@@ -253,7 +253,7 @@ case class SortMergeJoinExecTransformer(
// Firstly, need to check if the Substrait plan for this operator can be
successfully generated.
if (substraitJoinType == JoinRel.JoinType.JOIN_TYPE_OUTER) {
return ValidationResult
- .notOk(s"Found unsupported join type of $joinType for velox smj:
$substraitJoinType")
+ .failed(s"Found unsupported join type of $joinType for velox smj:
$substraitJoinType")
}
super.doValidateInternal()
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index 74158d633..b31471e21 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -67,7 +67,7 @@ case class TakeOrderedAndProjectExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (offset != 0) {
- return ValidationResult.notOk(s"Native TopK does not support offset:
$offset")
+ return ValidationResult.failed(s"Native TopK does not support offset:
$offset")
}
var tagged: ValidationResult = null
@@ -83,14 +83,14 @@ case class TakeOrderedAndProjectExecTransformer(
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(child)
val sortPlan = SortExecTransformer(sortOrder, false, inputTransformer)
val sortValidation = sortPlan.doValidate()
- if (!sortValidation.isValid) {
+ if (!sortValidation.ok()) {
return sortValidation
}
val limitPlan = LimitTransformer(sortPlan, offset, limit)
tagged = limitPlan.doValidate()
}
- if (tagged.isValid) {
+ if (tagged.ok()) {
val projectPlan = ProjectExecTransformer(projectList, child)
tagged = projectPlan.doValidate()
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
index 628c08f29..4902b6c6c 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowExecTransformer.scala
@@ -165,7 +165,7 @@ case class WindowExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if (!BackendsApiManager.getSettings.supportWindowExec(windowExpression)) {
return ValidationResult
- .notOk(s"Found unsupported window expression:
${windowExpression.mkString(", ")}")
+ .failed(s"Found unsupported window expression:
${windowExpression.mkString(", ")}")
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
index c93d01e7a..6068412fb 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
@@ -145,7 +145,7 @@ case class WindowGroupLimitExecTransformer(
override protected def doValidateInternal(): ValidationResult = {
if
(!BackendsApiManager.getSettings.supportWindowGroupLimitExec(rankLikeFunction))
{
return ValidationResult
- .notOk(s"Found unsupported rank like function: $rankLikeFunction")
+ .failed(s"Found unsupported rank like function: $rankLikeFunction")
}
val substraitContext = new SubstraitContext
val operatorId = substraitContext.nextOperatorId(this.nodeName)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
index 14d58bfa8..d78f21bea 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/execution/WriteFilesExecTransformer.scala
@@ -150,8 +150,8 @@ case class WriteFilesExecTransformer(
finalChildOutput.toStructType.fields,
bucketSpec,
caseInsensitiveOptions)
- if (!validationResult.isValid) {
- return ValidationResult.notOk("Unsupported native write: " +
validationResult.reason.get)
+ if (!validationResult.ok()) {
+ return ValidationResult.failed("Unsupported native write: " +
validationResult.reason())
}
val substraitContext = new SubstraitContext
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
index 71a76ff63..0c70e1ea7 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/extension/GlutenPlan.scala
@@ -26,28 +26,29 @@ import org.apache.gluten.substrait.plan.PlanBuilder
import org.apache.gluten.substrait.rel.RelNode
import org.apache.gluten.test.TestStats
import org.apache.gluten.utils.LogLevelUtil
-import org.apache.gluten.validate.NativePlanValidationInfo
import org.apache.spark.sql.execution.SparkPlan
import com.google.common.collect.Lists
-import scala.collection.JavaConverters._
-
-case class ValidationResult(isValid: Boolean, reason: Option[String])
+sealed trait ValidationResult {
+ def ok(): Boolean
+ def reason(): String
+}
object ValidationResult {
- def ok: ValidationResult = ValidationResult(isValid = true, None)
- def notOk(reason: String): ValidationResult = ValidationResult(isValid =
false, Option(reason))
- def convertFromValidationInfo(info: NativePlanValidationInfo):
ValidationResult = {
- if (info.isSupported) {
- ok
- } else {
- val fallbackInfo = info.getFallbackInfo.asScala
- .mkString("Native validation failed:\n ", "\n ", "")
- notOk(fallbackInfo)
- }
+ private case object Succeeded extends ValidationResult {
+ override def ok(): Boolean = true
+ override def reason(): String = throw new UnsupportedOperationException(
+ "Succeeded validation doesn't have failure details")
}
+
+ private case class Failed(override val reason: String) extends
ValidationResult {
+ override def ok(): Boolean = false
+ }
+
+ def succeeded: ValidationResult = Succeeded
+ def failed(reason: String): ValidationResult = Failed(reason)
}
/** Every Gluten Operator should extend this trait. */
@@ -66,17 +67,18 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
val schemaVaidationResult = BackendsApiManager.getValidatorApiInstance
.doSchemaValidate(schema)
.map {
- reason => ValidationResult.notOk(s"Found schema check failure for
$schema, due to: $reason")
+ reason =>
+ ValidationResult.failed(s"Found schema check failure for $schema,
due to: $reason")
}
- .getOrElse(ValidationResult.ok)
- if (!schemaVaidationResult.isValid) {
+ .getOrElse(ValidationResult.succeeded)
+ if (!schemaVaidationResult.ok()) {
TestStats.addFallBackClassName(this.getClass.toString)
return schemaVaidationResult
}
try {
TransformerState.enterValidation
val res = doValidateInternal()
- if (!res.isValid) {
+ if (!res.ok()) {
TestStats.addFallBackClassName(this.getClass.toString)
}
res
@@ -90,7 +92,7 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
logValidationMessage(
s"Validation failed with exception for plan: $nodeName, due to:
${e.getMessage}",
e)
- ValidationResult.notOk(e.getMessage)
+ ValidationResult.failed(e.getMessage)
} finally {
TransformerState.finishValidation
}
@@ -109,16 +111,16 @@ trait GlutenPlan extends SparkPlan with
Convention.KnownBatchType with LogLevelU
BackendsApiManager.getSparkPlanExecApiInstance.batchType
}
- protected def doValidateInternal(): ValidationResult = ValidationResult.ok
+ protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
protected def doNativeValidation(context: SubstraitContext, node: RelNode):
ValidationResult = {
if (node != null && enableNativeValidation) {
val planNode = PlanBuilder.makePlan(context, Lists.newArrayList(node))
val info = BackendsApiManager.getValidatorApiInstance
.doNativeValidateWithFailureReason(planNode)
- ValidationResult.convertFromValidationInfo(info)
+ info.asResult()
} else {
- ValidationResult.ok
+ ValidationResult.succeeded
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
index 25674fd17..bfb926706 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/CollapseProjectExecTransformer.scala
@@ -40,11 +40,11 @@ object CollapseProjectExecTransformer extends
Rule[SparkPlan] {
val collapsedProject = p2.copy(projectList =
CollapseProjectShim.buildCleanedProjectList(p1.projectList,
p2.projectList))
val validationResult = collapsedProject.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
logDebug(s"Collapse project $p1 and $p2.")
collapsedProject
} else {
- logDebug(s"Failed to collapse project, due to
${validationResult.reason.getOrElse("")}")
+ logDebug(s"Failed to collapse project, due to
${validationResult.reason()}")
p1
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
index e334fcfbc..491b54443 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ExpandFallbackPolicy.scala
@@ -243,7 +243,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
originalPlan
.find(_.logicalLink.exists(_.fastEquals(p.logicalLink.get)))
.filterNot(FallbackTags.nonEmpty)
- .foreach(origin => FallbackTags.tag(origin, FallbackTags.getTag(p)))
+ .foreach(origin => FallbackTags.add(origin, FallbackTags.get(p)))
case _ =>
}
@@ -280,7 +280,7 @@ case class ExpandFallbackPolicy(isAdaptiveContext: Boolean,
originalPlan: SparkP
} else {
FallbackTags.addRecursively(
vanillaSparkPlan,
- TRANSFORM_UNSUPPORTED(fallbackInfo.reason, appendReasonIfExists =
false))
+ FallbackTag.Exclusive(fallbackInfo.reason.getOrElse("Unknown
reason")))
FallbackNode(vanillaSparkPlan)
}
} else {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
similarity index 89%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
index ddc6870e6..f9eaa4179 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackTagRule.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
+import org.apache.spark.sql.catalyst.trees.{TreeNode, TreeNodeTag}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
QueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
@@ -50,10 +50,42 @@ sealed trait FallbackTag {
if (FallbackTags.DEBUG) {
Some(ExceptionUtils.getStackTrace(new Throwable()))
} else None
+
+ def reason(): String
}
-case class TRANSFORM_UNSUPPORTED(reason: Option[String], appendReasonIfExists:
Boolean = true)
- extends FallbackTag
+object FallbackTag {
+
+ /** A tag that stores one reason text of fall back. */
+ case class Appendable(override val reason: String) extends FallbackTag
+
+ /**
+ * A tag that stores reason text of fall back. Other reasons will be
discarded when this tag is
+ * added to plan.
+ */
+ case class Exclusive(override val reason: String) extends FallbackTag
+
+ trait Converter[T] {
+ def from(obj: T): Option[FallbackTag]
+ }
+
+ object Converter {
+ implicit def asIs[T <: FallbackTag]: Converter[T] = (tag: T) => Some(tag)
+
+ implicit object FromString extends Converter[String] {
+ override def from(reason: String): Option[FallbackTag] =
Some(Appendable(reason))
+ }
+
+ implicit object FromValidationResult extends Converter[ValidationResult] {
+ override def from(result: ValidationResult): Option[FallbackTag] = {
+ if (result.ok()) {
+ return None
+ }
+ Some(Appendable(result.reason()))
+ }
+ }
+ }
+}
object FallbackTags {
val TAG: TreeNodeTag[FallbackTag] =
@@ -70,10 +102,7 @@ object FallbackTags {
* rules are passed.
*/
def nonEmpty(plan: SparkPlan): Boolean = {
- getTagOption(plan) match {
- case Some(TRANSFORM_UNSUPPORTED(_, _)) => true
- case _ => false
- }
+ getOption(plan).nonEmpty
}
/**
@@ -84,72 +113,51 @@ object FallbackTags {
*/
def maybeOffloadable(plan: SparkPlan): Boolean = !nonEmpty(plan)
- def tag(plan: SparkPlan, hint: FallbackTag): Unit = {
- val mergedHint = getTagOption(plan)
- .map {
- case originalHint @ TRANSFORM_UNSUPPORTED(Some(originalReason),
originAppend) =>
- hint match {
- case TRANSFORM_UNSUPPORTED(Some(newReason), append) =>
- if (originAppend && append) {
- TRANSFORM_UNSUPPORTED(Some(originalReason + "; " + newReason))
- } else if (originAppend) {
- TRANSFORM_UNSUPPORTED(Some(originalReason))
- } else if (append) {
- TRANSFORM_UNSUPPORTED(Some(newReason))
- } else {
- TRANSFORM_UNSUPPORTED(Some(originalReason), false)
- }
- case TRANSFORM_UNSUPPORTED(None, _) =>
- originalHint
- case _ =>
- throw new GlutenNotSupportException(
- "Plan was already tagged as non-transformable, " +
- s"cannot mark it as transformable after
that:\n${plan.toString()}")
- }
- case _ =>
- hint
+ def add[T](plan: TreeNode[_], t: T)(implicit converter:
FallbackTag.Converter[T]): Unit = {
+ val tagOption = getOption(plan)
+ val newTagOption = converter.from(t)
+
+ val mergedTagOption: Option[FallbackTag] =
+ (tagOption ++ newTagOption).reduceOption[FallbackTag] {
+ // New tag comes while the plan was already tagged, merge.
+ case (_, exclusive: FallbackTag.Exclusive) =>
+ exclusive
+ case (exclusive: FallbackTag.Exclusive, _) =>
+ exclusive
+ case (l: FallbackTag.Appendable, r: FallbackTag.Appendable) =>
+ FallbackTag.Appendable(s"${l.reason}; ${r.reason}")
}
- .getOrElse(hint)
- plan.setTagValue(TAG, mergedHint)
- }
-
- def untag(plan: SparkPlan): Unit = {
- plan.unsetTagValue(TAG)
+ mergedTagOption
+ .foreach(mergedTag => plan.setTagValue(TAG, mergedTag))
}
- def add(plan: SparkPlan, validationResult: ValidationResult): Unit = {
- if (!validationResult.isValid) {
- tag(plan, TRANSFORM_UNSUPPORTED(validationResult.reason))
- }
- }
-
- def add(plan: SparkPlan, reason: String): Unit = {
- tag(plan, TRANSFORM_UNSUPPORTED(Some(reason)))
- }
-
- def addRecursively(plan: SparkPlan, hint: TRANSFORM_UNSUPPORTED): Unit = {
+ def addRecursively[T](plan: TreeNode[_], t: T)(implicit
+ converter: FallbackTag.Converter[T]): Unit = {
plan.foreach {
case _: GlutenPlan => // ignore
- case other => tag(other, hint)
+ case other: TreeNode[_] => add(other, t)
}
}
- def getTag(plan: SparkPlan): FallbackTag = {
- getTagOption(plan).getOrElse(
+ def untag(plan: TreeNode[_]): Unit = {
+ plan.unsetTagValue(TAG)
+ }
+
+ def get(plan: TreeNode[_]): FallbackTag = {
+ getOption(plan).getOrElse(
throw new IllegalStateException("Transform hint tag not set in plan: " +
plan.toString()))
}
- def getTagOption(plan: SparkPlan): Option[FallbackTag] = {
+ def getOption(plan: TreeNode[_]): Option[FallbackTag] = {
plan.getTagValue(TAG)
}
- implicit class EncodeFallbackTagImplicits(validationResult:
ValidationResult) {
- def tagOnFallback(plan: SparkPlan): Unit = {
- if (validationResult.isValid) {
+ implicit class EncodeFallbackTagImplicits(result: ValidationResult) {
+ def tagOnFallback(plan: TreeNode[_]): Unit = {
+ if (result.ok()) {
return
}
- val newTag = TRANSFORM_UNSUPPORTED(validationResult.reason)
- tag(plan, newTag)
+ add(plan, result)
}
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
index 742c35341..26c70293c 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala
@@ -379,7 +379,7 @@ case class OffloadFilter() extends OffloadSingleNode with
LogLevelUtil {
val newScan =
FilterHandler.pushFilterToScan(filter.condition, scan)
newScan match {
- case ts: TransformSupport if ts.doValidate().isValid => ts
+ case ts: TransformSupport if ts.doValidate().ok() => ts
case _ => scan
}
} else scan
@@ -550,12 +550,12 @@ object OffloadOthers {
case plan: FileSourceScanExec =>
val transformer =
ScanTransformerFactory.createFileSourceScanTransformer(plan)
val validationResult = transformer.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
transformer
} else {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- FallbackTags.add(plan, validationResult.reason.get)
+ FallbackTags.add(plan, validationResult.reason())
plan
}
case plan: BatchScanExec =>
@@ -565,12 +565,12 @@ object OffloadOthers {
val hiveTableScanExecTransformer =
BackendsApiManager.getSparkPlanExecApiInstance.genHiveTableScanExecTransformer(plan)
val validateResult = hiveTableScanExecTransformer.doValidate()
- if (validateResult.isValid) {
+ if (validateResult.ok()) {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
return hiveTableScanExecTransformer
}
logDebug(s"Columnar Processing for ${plan.getClass} is currently
unsupported.")
- FallbackTags.add(plan, validateResult.reason.get)
+ FallbackTags.add(plan, validateResult.reason())
plan
case other =>
throw new GlutenNotSupportException(s"${other.getClass.toString} is
not supported.")
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
index d32de32eb..ac35ac83b 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RemoveNativeWriteFilesSortAndProject.scala
@@ -71,7 +71,7 @@ object NativeWriteFilesWithSkippingSortAndProject extends
Logging {
}
val transformer = ProjectExecTransformer(newProjectList, p.child)
val validationResult = transformer.doValidate()
- if (validationResult.isValid) {
+ if (validationResult.ok()) {
Some(transformer)
} else {
// If we can not transform the project, then we fallback to origin
plan which means
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
index 611d6db0b..4070a0a58 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/PushFilterToScan.scala
@@ -37,7 +37,7 @@ class PushFilterToScan(validator: Validator) extends
RasRule[SparkPlan] {
val newScan =
FilterHandler.pushFilterToScan(filter.condition, scan)
newScan match {
- case ts: TransformSupport if ts.doValidate().isValid =>
+ case ts: TransformSupport if ts.doValidate().ok() =>
List(filter.withNewChildren(List(ts)))
case _ =>
List.empty
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
index 8091127da..43f52a9e4 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala
@@ -106,7 +106,7 @@ object RasOffload {
case Validator.Passed =>
val offloaded = base.offload(from)
val offloadedNodes = offloaded.collect[GlutenPlan] { case t:
GlutenPlan => t }
- if (offloadedNodes.exists(!_.doValidate().isValid)) {
+ if (offloadedNodes.exists(!_.doValidate().ok())) {
// 4. If native validation fails on the offloaded node, return
the
// original one.
from
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala
index 2abd4d7d4..e005a3dc8 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala
@@ -74,7 +74,7 @@ class RewriteSparkPlanRulesManager private (rewriteRules:
Seq[RewriteSingleNode]
case p if !p.isInstanceOf[ProjectExec] &&
!p.isInstanceOf[RewrittenNodeWall] => p
}
assert(target.size == 1)
- FallbackTags.getTagOption(target.head)
+ FallbackTags.getOption(target.head)
}
private def applyRewriteRules(origin: SparkPlan): (SparkPlan,
Option[String]) = {
@@ -112,10 +112,10 @@ class RewriteSparkPlanRulesManager private (rewriteRules:
Seq[RewriteSingleNode]
origin
} else {
addHint.apply(rewrittenPlan)
- val hint = getFallbackTagBack(rewrittenPlan)
- if (hint.isDefined) {
+ val tag = getFallbackTagBack(rewrittenPlan)
+ if (tag.isDefined) {
// If the rewritten plan is still not transformable, return the
original plan.
- FallbackTags.tag(origin, hint.get)
+ FallbackTags.add(origin, tag.get)
origin
} else {
rewrittenPlan.transformUp {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index b6236ae9a..903723ccb 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -19,7 +19,7 @@ package org.apache.gluten.extension.columnar.validator
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, BackendSettingsApi}
import org.apache.gluten.expression.ExpressionUtils
-import org.apache.gluten.extension.columnar.{FallbackTags,
TRANSFORM_UNSUPPORTED}
+import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark.sql.execution._
@@ -109,8 +109,8 @@ object Validators {
private object FallbackByHint extends Validator {
override def validate(plan: SparkPlan): Validator.OutCome = {
if (FallbackTags.nonEmpty(plan)) {
- val hint =
FallbackTags.getTag(plan).asInstanceOf[TRANSFORM_UNSUPPORTED]
- return fail(hint.reason.getOrElse("Reason not recorded"))
+ val tag = FallbackTags.get(plan)
+ return fail(tag.reason())
}
pass()
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
index 85a4dd387..31175a43f 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala
@@ -18,8 +18,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.extension.{GlutenPlan, ValidationResult}
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.spark._
@@ -119,10 +118,10 @@ case class ColumnarShuffleExchangeExec(
.doColumnarShuffleExchangeExecValidate(outputPartitioning, child)
.map {
reason =>
- ValidationResult.notOk(
+ ValidationResult.failed(
s"Found schema check failure for schema ${child.schema} due to:
$reason")
}
- .getOrElse(ValidationResult.ok)
+ .getOrElse(ValidationResult.succeeded)
}
override def nodeName: String = "ColumnarExchange"
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
index 781dc6b6f..338136b6d 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenExplainUtils.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.WholeStageTransformer
import org.apache.gluten.extension.GlutenPlan
+import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.utils.PlanUtil
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import
org.apache.spark.sql.execution.GlutenFallbackReporter.FALLBACK_REASON_TAG
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
@@ -59,8 +59,8 @@ object GlutenExplainUtils extends AdaptiveSparkPlanHelper {
p: SparkPlan,
fallbackNodeToReason: mutable.HashMap[String, String]
): Unit = {
- p.logicalLink.flatMap(_.getTagValue(FALLBACK_REASON_TAG)) match {
- case Some(reason) => addFallbackNodeWithReason(p, reason,
fallbackNodeToReason)
+ p.logicalLink.flatMap(FallbackTags.getOption) match {
+ case Some(tag) => addFallbackNodeWithReason(p, tag.reason(),
fallbackNodeToReason)
case _ =>
// If the SparkPlan does not have fallback reason, then there are two
options:
// 1. Gluten ignore that plan and it's a kind of fallback
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
index d41dce882..67ecf81b9 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala
@@ -19,14 +19,12 @@ package org.apache.spark.sql.execution
import org.apache.gluten.GlutenConfig
import org.apache.gluten.events.GlutenPlanFallbackEvent
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackTags,
TRANSFORM_UNSUPPORTED}
+import org.apache.gluten.extension.columnar.FallbackTags
import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
-import
org.apache.spark.sql.execution.GlutenFallbackReporter.FALLBACK_REASON_TAG
import org.apache.spark.sql.execution.ui.GlutenEventUtils
/**
@@ -58,41 +56,14 @@ case class GlutenFallbackReporter(glutenConfig:
GlutenConfig, spark: SparkSessio
plan.foreachUp {
case _: GlutenPlan => // ignore
case p: SparkPlan if FallbackTags.nonEmpty(p) =>
- FallbackTags.getTag(p) match {
- case TRANSFORM_UNSUPPORTED(Some(reason), append) =>
- logFallbackReason(validationLogLevel, p.nodeName, reason)
- // With in next round stage in AQE, the physical plan would be a
new instance that
- // can not preserve the tag, so we need to set the fallback reason
to logical plan.
- // Then we can be aware of the fallback reason for the whole plan.
- // If a logical plan mapping to several physical plan, we add all
reason into
- // that logical plan to make sure we do not lose any fallback
reason.
- p.logicalLink.foreach {
- logicalPlan =>
- val newReason = logicalPlan
- .getTagValue(FALLBACK_REASON_TAG)
- .map {
- lastReason =>
- if (!append) {
- lastReason
- } else if (lastReason.contains(reason)) {
- // use the last reason, as the reason is redundant
- lastReason
- } else if (reason.contains(lastReason)) {
- // overwrite the reason
- reason
- } else {
- // add the new reason
- lastReason + "; " + reason
- }
- }
- .getOrElse(reason)
- logicalPlan.setTagValue(FALLBACK_REASON_TAG, newReason)
- }
- case TRANSFORM_UNSUPPORTED(_, _) =>
- logFallbackReason(validationLogLevel, p.nodeName, "unknown reason")
- case _ =>
- throw new IllegalStateException("Unreachable code")
- }
+ val tag = FallbackTags.get(p)
+ logFallbackReason(validationLogLevel, p.nodeName, tag.reason())
+ // With in next round stage in AQE, the physical plan would be a new
instance that
+ // can not preserve the tag, so we need to set the fallback reason to
logical plan.
+ // Then we can be aware of the fallback reason for the whole plan.
+ // If a logical plan mapping to several physical plan, we add all
reason into
+ // that logical plan to make sure we do not lose any fallback reason.
+ p.logicalLink.foreach(logicalPlan => FallbackTags.add(logicalPlan,
tag))
case _ =>
}
}
@@ -119,7 +90,4 @@ case class GlutenFallbackReporter(glutenConfig:
GlutenConfig, spark: SparkSessio
}
}
-object GlutenFallbackReporter {
- // A tag used to inject to logical plan to preserve the fallback reason
- val FALLBACK_REASON_TAG = new TreeNodeTag[String]("GLUTEN_FALLBACK_REASON")
-}
+object GlutenFallbackReporter {}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala
index ecedc1bae..6a9da0a9c 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExecTransformer.scala
@@ -62,7 +62,7 @@ case class EvalPythonExecTransformer(
// All udfs should be scalar python udf
for (udf <- udfs) {
if (!PythonUDF.isScalarPythonUDF(udf)) {
- return ValidationResult.notOk(s"$udf is not scalar python udf")
+ return ValidationResult.failed(s"$udf is not scalar python udf")
}
}
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 95793e5dc..2a3ba79eb 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -202,7 +202,7 @@ object HiveTableScanExecTransformer {
hiveTableScan.relation,
hiveTableScan.partitionPruningPred)(hiveTableScan.session)
hiveTableScanTransformer.doValidate()
- case _ => ValidationResult.notOk("Is not a Hive scan")
+ case _ => ValidationResult.failed("Is not a Hive scan")
}
}
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 1cd735cf7..31e6c6940 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -57,7 +57,7 @@ case class DeltaScanTransformer(
_.name == "__delta_internal_is_row_deleted") ||
requiredSchema.fields.exists(
_.name == "__delta_internal_row_index")
) {
- return ValidationResult.notOk(s"Deletion vector is not supported in
native.")
+ return ValidationResult.failed(s"Deletion vector is not supported in
native.")
}
super.doValidateInternal()
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index fe37da206..2ee1573ea 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -35,7 +35,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
- if (transformer.doValidate().isValid) {
+ if (transformer.doValidate().ok()) {
transformer
} else {
plan
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index b9c9d8a27..54d7596b6 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags, TRANSFORM_UNSUPPORTED}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
@@ -124,17 +124,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
testGluten("Tag not transformable more than once") {
val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true))
- FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason")))
+ FallbackTags.add(originalPlan, "fake reason")
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
- val reason =
FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason
- assert(reason.isDefined)
+ val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
- reason.get.contains("fake reason") &&
- reason.get.contains("at least one of its children has empty output"))
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
} else {
- assert(reason.get.contains("fake reason"))
+ assert(reason.contains("fake reason"))
}
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index fe37da206..2ee1573ea 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -35,7 +35,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
- if (transformer.doValidate().isValid) {
+ if (transformer.doValidate().ok()) {
transformer
} else {
plan
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 8ce0af8df..5150a4768 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags, TRANSFORM_UNSUPPORTED}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
@@ -125,17 +125,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
testGluten("Tag not transformable more than once") {
val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true))
- FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason")))
+ FallbackTags.add(originalPlan, "fake reason")
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
- val reason =
FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason
- assert(reason.isDefined)
+ val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
- reason.get.contains("fake reason") &&
- reason.get.contains("at least one of its children has empty output"))
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
} else {
- assert(reason.get.contains("fake reason"))
+ assert(reason.contains("fake reason"))
}
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index fe37da206..2ee1573ea 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -35,7 +35,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
- if (transformer.doValidate().isValid) {
+ if (transformer.doValidate().ok()) {
transformer
} else {
plan
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 8ce0af8df..5150a4768 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags, TRANSFORM_UNSUPPORTED}
+import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
FallbackTags}
import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.extension.columnar.transition.InsertTransitions
import org.apache.gluten.utils.QueryPlanSelector
@@ -125,17 +125,16 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait
{
testGluten("Tag not transformable more than once") {
val originalPlan = UnaryOp1(LeafOp(supportsColumnar = true))
- FallbackTags.tag(originalPlan, TRANSFORM_UNSUPPORTED(Some("fake reason")))
+ FallbackTags.add(originalPlan, "fake reason")
val rule = FallbackEmptySchemaRelation()
val newPlan = rule.apply(originalPlan)
- val reason =
FallbackTags.getTag(newPlan).asInstanceOf[TRANSFORM_UNSUPPORTED].reason
- assert(reason.isDefined)
+ val reason = FallbackTags.get(newPlan).reason()
if (BackendsApiManager.getSettings.fallbackOnEmptySchema(newPlan)) {
assert(
- reason.get.contains("fake reason") &&
- reason.get.contains("at least one of its children has empty output"))
+ reason.contains("fake reason") &&
+ reason.contains("at least one of its children has empty output"))
} else {
- assert(reason.get.contains("fake reason"))
+ assert(reason.contains("fake reason"))
}
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
index fe37da206..2ee1573ea 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/CustomerColumnarPreRules.scala
@@ -35,7 +35,7 @@ case class CustomerColumnarPreRules(session: SparkSession)
extends Rule[SparkPla
fileSourceScan.tableIdentifier,
fileSourceScan.disableBucketedScan
)
- if (transformer.doValidate().isValid) {
+ if (transformer.doValidate().ok()) {
transformer
} else {
plan
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]