This is an automated email from the ASF dual-hosted git repository. ptoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8342d7438114 [SPARK-52974][SQL] Don't broadcast dynamicpruning InSubqueries 8342d7438114 is described below commit 8342d7438114b7f1bffed39a7ff52900a9014348 Author: Peter Toth <peter.t...@gmail.com> AuthorDate: Mon Aug 4 18:35:01 2025 +0200 [SPARK-52974][SQL] Don't broadcast dynamicpruning InSubqueries ### What changes were proposed in this pull request? Currently when AQE is off, DPP `InSubquery`s are planned by the `PlanSubqueries` rule as regular `InSubqueryExec(isDynamicPruning = false)` which causes the results to be broadcasted. But those subquery results are always used at the driver only so there is no need to broadcast the result. ### Why are the changes needed? Perf improvement when AQE is off + make `PlanDynamicPruningFilters` to be on par with `PlanAdaptiveDynamicPruningFilters`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with the revised test case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51685 from peter-toth/SPARK-52974-fix-dpp-insubqueries-broadcast. Authored-by: Peter Toth <peter.t...@gmail.com> Signed-off-by: Peter Toth <peter.t...@gmail.com> --- .../execution/dynamicpruning/PlanDynamicPruningFilters.scala | 12 ++++++------ .../src/test/scala/org/apache/spark/sql/ExplainSuite.scala | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala index 059729d86bfa..fbd341b6e7b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala @@ -17,15 +17,14 @@ package org.apache.spark.sql.execution.dynamicpruning -import org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, ListQuery, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, Literal} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY import org.apache.spark.sql.classic.SparkSession -import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, SparkPlan, SubqueryBroadcastExec} +import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, SparkPlan, SubqueryBroadcastExec, SubqueryExec} import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.internal.SQLConf @@ -56,6 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp case DynamicPruningSubquery( value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, exprId, _) => val sparkPlan = QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan) + val name = s"dynamicpruning#${exprId.id}" // Using `sparkPlan` is a little hacky as it is based on the assumption that this rule is // the first to be applied (apart from `InsertAdaptiveSparkPlan`). val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty && @@ -72,7 +72,6 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp val mode = broadcastMode(buildKeys, executedPlan.output) // plan a broadcast exchange of the build side of the join val exchange = BroadcastExchangeExec(mode, executedPlan) - val name = s"dynamicpruning#${exprId.id}" // place the broadcast adaptor for reusing the broadcast results on the probe side val broadcastValues = SubqueryBroadcastExec(name, broadcastKeyIndices, buildKeys, exchange) @@ -85,8 +84,9 @@ case class PlanDynamicPruningFilters(sparkSession: SparkSession) extends Rule[Sp val aliases = broadcastKeyIndices.map(idx => Alias(buildKeys(idx), buildKeys(idx).toString)()) val aggregate = Aggregate(aliases, aliases, buildPlan) - DynamicPruningExpression(expressions.InSubquery( - Seq(value), ListQuery(aggregate, numCols = aggregate.output.length))) + val sparkPlan = QueryExecution.prepareExecutedPlan(sparkSession, aggregate) + val values = SubqueryExec(name, sparkPlan) + DynamicPruningExpression(InSubqueryExec(value, values, exprId)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 9c90e0105a42..b27122a8de2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -310,10 +310,10 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite |""".stripMargin val expected_pattern1 = - "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN subquery#x" + "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN dynamicpruning#x" val expected_pattern2 = "PartitionFilters: \\[isnotnull\\(k#xL\\), dynamicpruningexpression\\(k#xL " + - "IN subquery#x\\)\\]" + "IN dynamicpruning#x\\)\\]" val expected_pattern3 = "Location: InMemoryFileIndex \\[\\S*org.apache.spark.sql.ExplainSuite" + "/df2/\\S*, ... 99 entries\\]" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org