This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new d8cf6ea15f [CORE][CH] Remove ValidatorApi.doSparkPlanValidate (#7668)
d8cf6ea15f is described below

commit d8cf6ea15ffef59fb6ac7d344e5b3e30f63103b3
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Oct 24 14:02:03 2024 +0800

    [CORE][CH] Remove ValidatorApi.doSparkPlanValidate (#7668)
---
 .../gluten/backendsapi/clickhouse/CHRuleApi.scala  | 72 +++++++++++++++++++---
 .../backendsapi/clickhouse/CHValidatorApi.scala    | 31 +---------
 .../backendsapi/velox/VeloxValidatorApi.scala      |  2 -
 .../apache/gluten/backendsapi/ValidatorApi.scala   |  3 -
 .../apache/gluten/utils/QueryPlanSelector.scala    | 13 +---
 5 files changed, 67 insertions(+), 54 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
index 2cf1f4fcc4..9a1ead1e6c 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala
@@ -29,7 +29,11 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.PhysicalPlanSelector
 
 import org.apache.spark.sql.catalyst.{CHAggregateFunctionRewriteRule, 
EqualToRewrite}
-import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.delta.DeltaLogFileIndex
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
CommandResultExec, FileSourceScanExec, GlutenFallbackReporter, RDDScanExec, 
SparkPlan}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 import org.apache.spark.util.SparkPlanRules
 
 class CHRuleApi extends RuleApi {
@@ -64,6 +68,7 @@ private object CHRuleApi {
   }
 
   def injectLegacy(injector: LegacyInjector): Unit = {
+
     // Gluten columnar: Transform rules.
     injector.injectTransform(_ => RemoveTransitions)
     injector.injectTransform(_ => PushDownInputFileExpression.PreOffload)
@@ -72,11 +77,11 @@ private object CHRuleApi {
     injector.injectTransform(_ => RewriteSubqueryBroadcast())
     injector.injectTransform(c => FallbackBroadcastHashJoin.apply(c.session))
     injector.injectTransform(c => 
MergeTwoPhasesHashBaseAggregate.apply(c.session))
-    injector.injectTransform(_ => RewriteSparkPlanRulesManager())
-    injector.injectTransform(_ => AddFallbackTagRule())
-    injector.injectTransform(_ => TransformPreOverrides())
+    injector.injectTransform(_ => intercept(RewriteSparkPlanRulesManager()))
+    injector.injectTransform(_ => intercept(AddFallbackTagRule()))
+    injector.injectTransform(_ => intercept(TransformPreOverrides()))
     injector.injectTransform(_ => RemoveNativeWriteFilesSortAndProject())
-    injector.injectTransform(c => RewriteTransformer.apply(c.session))
+    injector.injectTransform(c => 
intercept(RewriteTransformer.apply(c.session)))
     injector.injectTransform(_ => PushDownFilterToScan)
     injector.injectTransform(_ => PushDownInputFileExpression.PostOffload)
     injector.injectTransform(_ => EnsureLocalSortRequirements)
@@ -85,7 +90,9 @@ private object CHRuleApi {
     injector.injectTransform(c => 
RewriteSortMergeJoinToHashJoinRule.apply(c.session))
     injector.injectTransform(c => 
PushdownAggregatePreProjectionAheadExpand.apply(c.session))
     injector.injectTransform(
-      c => 
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))
+      c =>
+        intercept(
+          
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session)))
     injector.injectTransform(c => InsertTransitions(c.outputsColumnar))
 
     // Gluten columnar: Fallback policies.
@@ -96,10 +103,11 @@ private object CHRuleApi {
     injector.injectPost(c => RemoveTopmostColumnarToRow(c.session, 
c.ac.isAdaptiveContext()))
     SparkShimLoader.getSparkShims
       .getExtendedColumnarPostRules()
-      .foreach(each => injector.injectPost(c => each(c.session)))
+      .foreach(each => injector.injectPost(c => intercept(each(c.session))))
     injector.injectPost(c => ColumnarCollapseTransformStages(c.conf))
     injector.injectTransform(
-      c => 
SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))
+      c =>
+        
intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session)))
 
     // Gluten columnar: Final rules.
     injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session))
@@ -116,4 +124,52 @@ private object CHRuleApi {
           "Clickhouse backend doesn't yet have RAS support, please try 
disabling RAS and" +
             " rerunning the application"))
   }
+
+  /**
+   * Since https://github.com/apache/incubator-gluten/pull/883.
+   *
+   * TODO: Remove this since tricky to maintain.
+   */
+  private class CHSparkRuleInterceptor(delegate: Rule[SparkPlan])
+    extends Rule[SparkPlan]
+    with AdaptiveSparkPlanHelper {
+    override val ruleName: String = delegate.ruleName
+
+    override def apply(plan: SparkPlan): SparkPlan = {
+      if (skipOn(plan)) {
+        return plan
+      }
+      delegate(plan)
+    }
+
+    private def skipOn(plan: SparkPlan): Boolean = {
+      // TODO: Currently there are some fallback issues on CH backend when 
SparkPlan is
+      // TODO: SerializeFromObjectExec, ObjectHashAggregateExec and 
V2CommandExec.
+      // For example:
+      //   val tookTimeArr = Array(12, 23, 56, 100, 500, 20)
+      //   import spark.implicits._
+      //   val df = spark.sparkContext.parallelize(tookTimeArr.toSeq, 
1).toDF("time")
+      //   df.summary().show(100, false)
+
+      def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
+        scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
+      }
+
+      val includedUnsupportedPlans = collect(plan) {
+        // case s: SerializeFromObjectExec => true
+        // case d: DeserializeToObjectExec => true
+        // case o: ObjectHashAggregateExec => true
+        case rddScanExec: RDDScanExec if rddScanExec.nodeName.contains("Delta 
Table State") => true
+        case f: FileSourceScanExec if includedDeltaOperator(f) => true
+        case v2CommandExec: V2CommandExec => true
+        case commandResultExec: CommandResultExec => true
+      }
+
+      includedUnsupportedPlans.contains(true)
+    }
+  }
+
+  private def intercept(delegate: Rule[SparkPlan]): Rule[SparkPlan] = {
+    new CHSparkRuleInterceptor(delegate)
+  }
 }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
index 21b4116cdc..6081b87e1d 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala
@@ -30,10 +30,8 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning, RangePartitioning}
-import org.apache.spark.sql.delta.DeltaLogFileIndex
-import org.apache.spark.sql.execution.{CommandResultExec, FileSourceScanExec, 
RDDScanExec, SparkPlan}
+import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.datasources.v2.V2CommandExec
 
 class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with 
Logging {
 
@@ -67,33 +65,6 @@ class CHValidatorApi extends ValidatorApi with 
AdaptiveSparkPlanHelper with Logg
     true
   }
 
-  /** Validate against a whole Spark plan, before being interpreted by Gluten. 
*/
-  override def doSparkPlanValidate(plan: SparkPlan): Boolean = {
-    // TODO: Currently there are some fallback issues on CH backend when 
SparkPlan is
-    // TODO: SerializeFromObjectExec, ObjectHashAggregateExec and 
V2CommandExec.
-    // For example:
-    //   val tookTimeArr = Array(12, 23, 56, 100, 500, 20)
-    //   import spark.implicits._
-    //   val df = spark.sparkContext.parallelize(tookTimeArr.toSeq, 
1).toDF("time")
-    //   df.summary().show(100, false)
-
-    def includedDeltaOperator(scanExec: FileSourceScanExec): Boolean = {
-      scanExec.relation.location.isInstanceOf[DeltaLogFileIndex]
-    }
-
-    val includedUnsupportedPlans = collect(plan) {
-      // case s: SerializeFromObjectExec => true
-      // case d: DeserializeToObjectExec => true
-      // case o: ObjectHashAggregateExec => true
-      case rddScanExec: RDDScanExec if rddScanExec.nodeName.contains("Delta 
Table State") => true
-      case f: FileSourceScanExec if includedDeltaOperator(f) => true
-      case v2CommandExec: V2CommandExec => true
-      case commandResultExec: CommandResultExec => true
-    }
-
-    !includedUnsupportedPlans.contains(true)
-  }
-
   /** Validate whether the compression method support splittable at clickhouse 
backend. */
   override def doCompressionSplittableValidate(compressionMethod: String): 
Boolean = {
     false
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
index 7d681d5c6a..00a8f8cb0e 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala
@@ -43,8 +43,6 @@ class VeloxValidatorApi extends ValidatorApi {
     }
   }
 
-  override def doSparkPlanValidate(plan: SparkPlan): Boolean = true
-
   private def asValidationResult(info: NativePlanValidationInfo): 
ValidationResult = {
     if (info.isSupported == 1) {
       return ValidationResult.succeeded
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
index dadb5580e9..90f132d78b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala
@@ -40,9 +40,6 @@ trait ValidatorApi {
    */
   def doExprValidate(substraitExprName: String, expr: Expression): Boolean = 
true
 
-  /** Validate against a whole Spark plan, before being interpreted by Gluten. 
*/
-  def doSparkPlanValidate(plan: SparkPlan): Boolean
-
   /** Validate against Substrait plan node in native backend. */
   def doNativeValidateWithFailureReason(plan: PlanNode): ValidationResult
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
index cd063ce31a..2d8d7b29e4 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/utils/QueryPlanSelector.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.utils
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar.ColumnarRuleApplier.SkipCondition
 
 import org.apache.spark.internal.Logging
@@ -29,15 +28,9 @@ import org.apache.spark.sql.execution.SparkPlan
 object PhysicalPlanSelector extends QueryPlanSelector[SparkPlan] {
   val skipCond: SkipCondition = (session: SparkSession, plan: SparkPlan) =>
     !shouldUseGluten(session, plan)
-
-  override protected def validate(plan: SparkPlan): Boolean = {
-    BackendsApiManager.getValidatorApiInstance.doSparkPlanValidate(plan)
-  }
 }
 
-object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {
-  override protected def validate(plan: LogicalPlan): Boolean = true
-}
+object LogicalPlanSelector extends QueryPlanSelector[LogicalPlan] {}
 
 /** Select to decide whether a Spark plan can be accepted by Gluten for 
further execution. */
 abstract class QueryPlanSelector[T <: QueryPlan[_]] extends Logging {
@@ -57,8 +50,6 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends 
Logging {
     }
   }
 
-  protected def validate(plan: T): Boolean
-
   def shouldUseGluten(session: SparkSession, plan: T): Boolean = {
     val glutenEnabled = session.conf
       .get(GlutenConfig.GLUTEN_ENABLE_KEY, 
GlutenConfig.GLUTEN_ENABLE_BY_DEFAULT.toString)
@@ -71,7 +62,7 @@ abstract class QueryPlanSelector[T <: QueryPlan[_]] extends 
Logging {
           s"plan:\n${plan.treeString}\n" +
           "=========================")
     }
-    glutenEnabled & validate(plan)
+    glutenEnabled
   }
 
   def maybe(session: SparkSession, plan: T)(func: => T): T = {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to