This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 632784dec8a [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice 632784dec8a is described below commit 632784dec8a0c682a85a888c66e83f709e402e18 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Mon Nov 14 16:06:26 2022 +0800 [SPARK-38959][SQL][FOLLOWUP] Do not optimize subqueries twice ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/38557 . We found that some optimizer rules can't be applied twice (those in the `Once` batch), but running the rule `OptimizeSubqueries` twice breaks it as it optimizes subqueries twice. This PR partially reverts https://github.com/apache/spark/pull/38557 to still invoke `OptimizeSubqueries` in `RowLevelOperationRuntimeGroupFiltering`. We don't fully revert https://github.com/apache/spark/pull/38557 because it's still beneficial to use IN subquery directly instead of using DPP framework as there is no join. ### Why are the changes needed? Fix the optimizer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? N/A Closes #38626 from cloud-fan/follow. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala | 6 ++++-- .../sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala | 2 +- .../sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala | 2 +- .../dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala | 6 ++++-- 4 files changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 9624bf1fa9f..c61fd9ce10f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -51,8 +51,10 @@ class SparkOptimizer( Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("PartitionPruning", Once, PartitionPruning, - RowLevelOperationRuntimeGroupFiltering, - OptimizeSubqueries) :+ + // We can't run `OptimizeSubqueries` in this batch, as it will optimize the subqueries + // twice which may break some optimizer rules that can only be applied once. The rule below + // only invokes `OptimizeSubqueries` to optimize newly added subqueries. + new RowLevelOperationRuntimeGroupFiltering(OptimizeSubqueries)) :+ Batch("InjectRuntimeFilter", FixedPoint(1), InjectRuntimeFilter) :+ Batch("MergeScalarSubqueries", Once, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 21bc55110fe..9a780c11eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, HashedRelati case class PlanAdaptiveDynamicPruningFilters( rootPlan: AdaptiveSparkPlanExec) extends Rule[SparkPlan] with AdaptiveSparkPlanHelper { def apply(plan: SparkPlan): SparkPlan = { - if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) { + if (!conf.dynamicPartitionPruningEnabled) { return plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index df5e3ea1365..c9ff28eb045 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -45,7 +45,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp } override def apply(plan: SparkPlan): SparkPlan = { - if (!conf.dynamicPartitionPruningEnabled && !conf.runtimeRowLevelOperationGroupFilterEnabled) { + if (!conf.dynamicPartitionPruningEnabled) { return plan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala index bb5edc94fa5..f2b513e630b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/RowLevelOperationRuntimeGroupFiltering.scala @@ -37,7 +37,8 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat * * Note this rule only applies to group-based row-level operations. */ -object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper { +class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan]) + extends Rule[LogicalPlan] with PredicateHelper { import DataSourceV2Implicits._ @@ -64,7 +65,8 @@ object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with Pre Filter(dynamicPruningCond, r) } - replaceData.copy(query = newQuery) + // optimize subqueries to rewrite them as joins and trigger job planning + replaceData.copy(query = optimizeSubqueries(newQuery)) } private def buildMatchingRowsPlan( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org