This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new d8cf6ea15f [CORE][CH] Remove ValidatorApi.doSparkPlanValidate (#7668)
d8cf6ea15f is described below
commit d8cf6ea15ffef59fb6ac7d344e5b3e30f63103b3
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Oct 24 14:02:03 2024 +0800
[CORE][CH] Remove ValidatorApi.doSparkPlanValidate (#7668)
---
.../gluten/backendsapi/clickhouse/CHRuleApi.scala | 72 +++++++++++++++++++---
.../backendsapi/clickhouse/CHValidatorApi.scala | 31 +---------
.../backendsapi/velox/VeloxValidatorApi.scala | 2 -
.../apache/gluten/backendsapi/ValidatorApi.scala | 3 -
.../apache/gluten/utils/QueryPlanSelector.scala | 13 +---
5 files changed, 67 insertions(+), 54 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 2cf1f4fcc4..9a1ead1e6c 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -29,7 +29,11 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.PhysicalPlanSelector
import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule,
EqualToRewrite}
-import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.DeltaLogFileIndex
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec,
SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
import org.apache.spark.util.SparkPlanRules
class CHRuleApi extends RuleApi {
@@ -64,6 +68,7 @@ private object CHRuleApi {
}
def injectLegacy(injector: LegacyInjector): Unit = {
+
// Gluten columnar: Transform rules.
injector.injectTransform(_ => RemoveTransitions)
injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
@@ -72,11 +77,11 @@ private object CHRuleApi {
injector.injectTransform(_ => RewriteSubqueryBroadcast())
injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
injector.injectTransform(c =>
MergeTwoPhasesHashBaseAggregate.apply(c.session))
- injector.injectTransform(_ => RewriteSparkPlanRulesManager())
- injector.injectTransform(_ => AddFallbackTagRule())
- injector.injectTransform(_ => TransformPreOverrides())
+ injector.injectTransform(_ => intercept(RewriteSparkPlanRulesManager()))
+ injector.injectTransform(_ => intercept(AddFallbackTagRule()))
+ injector.injectTransform(_ => intercept(TransformPreOverrides()))
injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
- injector.injectTransform(c => RewriteTransformer.apply(c.session))
+ injector.injectTransform(c =>
intercept(RewriteTransformer.apply(c.session)))
injector.injectTransform(_ => PushDownFilterToScan)
injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
injector.injectTransform(_ => EnsureLocalSortRequirements)
@@ -85,7 +90,9 @@ private object CHRuleApi {
injector.injectTransform(c =>
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
injector.injectTransform(c =>
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
injector.injectTransform(
- c =>
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
+ c =>
+ intercept(
+
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session)))
injector.injectTransform(c => InsertTransitions(c.outputsColumnar))
// Gluten columnar: Fallback policies.
@@ -96,10 +103,11 @@ private object CHRuleApi {
injector.injectPost(c => RemoveTopmostColumnarToRow(c.session,
c.ac.isAdaptiveContext()))
SparkShimLoader.getSparkShims
.getExtendedColumnarPostRules()
- .foreach(each => injector.injectPost(c => each(c.session)))
+ .foreach(each => injector.injectPost(c => intercept(each(c.session))))
injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
injector.injectTransform(
- c =>
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))
+ c =>
+
intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session)))
// Gluten columnar: Final rules.
injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
@@ -116,4 +124,52 @@ private object CHRuleApi {
"Clickhouse backend doesn't yet have RAS support, please try
disabling RAS and" +
" rerunning the application"))
}
+
+ /**
+ * Since https://github.com/apache/incubator-gluten/pull/883.
+ *
+ * TODO: Remove this since tricky to maintain.
+ */
+ private class CHSparkRuleInterceptor(delegate: Rule[SparkPlan])
+ extends Rule[SparkPlan]
+ with AdaptiveSparkPlanHelper {
+ override val ruleName: String = delegate.ruleName
+
+ override def apply(plan: SparkPlan): SparkPlan = {
+ if (skipOn(plan)) {
+ return plan
+ }
+ delegate(plan)
+ }
+
+ private def skipOn(plan: SparkPlan): Boolean = {
+ // TODO: Currently there are some fallback issues on CH backend when
SparkPlan is
+ // TODO: SerializeFromObjectExec, ObjectHashAggregateExec and
V2CommandExec.
+ // For example:
+ // val tookTimeArr = Array(12, 23, 56, 100, 500, 20)
+ // import spark.implicits._
+ // val df = spark.sparkContext.parallelize(tookTimeArr.toSeq,
1).toDF("time")
+ // df.summary().show(100, false)
+
+ def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
+ scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
+ }
+
+ val includedUnsupportedPlans = collect(plan) {
+ // case s: SerializeFromObjectExec => true
+ // case d: DeserializeToObjectExec => true
+ // case o: ObjectHashAggregateExec => true
+ case rddScanExec: RDDScanExec if rddScanExec.nodeName.contains("Delta
Table State") => true
+ case f: FileSourceScanExec if includedDeltaOperator(f) => true
+ case v2CommandExec: V2CommandExec => true
+ case commandResultExec: CommandResultExec => true
+ }
+
+ includedUnsupportedPlans.contains(true)
+ }
+ }
+
+ private def intercept(delegate: Rule[SparkPlan]): Rule[SparkPlan] = {
+ new CHSparkRuleInterceptor(delegate)
+ }
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
index 21b4116cdc..6081b87e1d 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
@@ -30,10 +30,8 @@ import org.apache.spark.internal.Logging
import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning,
Partitioning, RangePartitioning}
-import org.apache.spark.sql.delta.DeltaLogFileIndex
-import org.apache.spark.sql.execution.{CommandResultExec, FileSourceScanExec,
RDDScanExec, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with
Logging {
@@ -67,33 +65,6 @@ class CHValidatorApi extends ValidatorApi with
AdaptiveSparkPlanHelper with Logg
true
}
- /** Validate against a whole Spark plan, before being interpreted by Gluten.
*/
- override def doSparkPlanValidate(plan: SparkPlan): Boolean = {
- // TODO: Currently there are some fallback issues on CH backend when
SparkPlan is
- // TODO: SerializeFromObjectExec, ObjectHashAggregateExec and
V2CommandExec.
- // For example:
- // val tookTimeArr = Array(12, 23, 56, 100, 500, 20)
- // import spark.implicits._
- // val df = spark.sparkContext.parallelize(tookTimeArr.toSeq,
1).toDF("time")
- // df.summary().show(100, false)
-
- def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
- scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
- }
-
- val includedUnsupportedPlans = collect(plan) {
- // case s: SerializeFromObjectExec => true
- // case d: DeserializeToObjectExec => true
- // case o: ObjectHashAggregateExec => true
- case rddScanExec: RDDScanExec if rddScanExec.nodeName.contains("Delta
Table State") => true
- case f: FileSourceScanExec if includedDeltaOperator(f) => true
- case v2CommandExec: V2CommandExec => true
- case commandResultExec: CommandResultExec => true
- }
-
- !includedUnsupportedPlans.contains(true)
- }
-
/** Validate whether the compression method support splittable at clickhouse
backend. */
override def doCompressionSplittableValidate(compressionMethod: String):
Boolean = {
false
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 7d681d5c6a..00a8f8cb0e 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -43,8 +43,6 @@ class VeloxValidatorApi extends ValidatorApi {
}
}
- override def doSparkPlanValidate(plan: SparkPlan): Boolean = true
-
private def asValidationResult(info: NativePlanValidationInfo):
ValidationResult = {
if (info.isSupported == 1) {
return ValidationResult.succeeded
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
index dadb5580e9..90f132d78b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
@@ -40,9 +40,6 @@ trait ValidatorApi {
*/
def doExprValidate(substraitExprName: String, expr: Expression): Boolean =
true
- /** Validate against a whole Spark plan, before being interpreted by Gluten.
*/
- def doSparkPlanValidate(plan: SparkPlan): Boolean
-
/** Validate against Substrait plan node in native backend. */
def doNativeValidateWithFailureReason(plan: PlanNode): ValidationResult
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
index cd063ce31a..2d8d7b29e4 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.utils
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
import org.apache.spark.internal.Logging
@@ -29,15 +28,9 @@ import org.apache.spark.sql.execution.SparkPlan
object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
!shouldUseGluten(session, plan)
-
- override protected def validate(plan: SparkPlan): Boolean = {
- BackendsApiManager.getValidatorApiInstance.doSparkPlanValidate(plan)
- }
}
-object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {
- override protected def validate(plan: LogicalPlan): Boolean = true
-}
+object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {}
/** Select to decide whether a Spark plan can be accepted by Gluten for
further execution. */
abstract class QueryPlanSelector[T <: QueryPlan[_]] extends Logging {
@@ -57,8 +50,6 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends
Logging {
}
}
- protected def validate(plan: T): Boolean
-
def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
val glutenEnabled = session.conf
.get(GlutenConfig.GLUTEN_ENABLE_KEY,
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
@@ -71,7 +62,7 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends
Logging {
s"plan:\n${plan.treeString}\n" +
"=========================")
}
- glutenEnabled & validate(plan)
+ glutenEnabled
}
def maybe(session: SparkSession, plan: T)(func: => T): T = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]