This is an automated email from the ASF dual-hosted git repository.

ptoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8342d7438114 [SPARK-52974][SQL] Don't broadcast dynamicpruning 
InSubqueries
8342d7438114 is described below

commit 8342d7438114b7f1bffed39a7ff52900a9014348
Author: Peter Toth <peter.t...@gmail.com>
AuthorDate: Mon Aug 4 18:35:01 2025 +0200

    [SPARK-52974][SQL] Don't broadcast dynamicpruning InSubqueries
    
    ### What changes were proposed in this pull request?
    
    Currently when AQE is off, DPP `InSubquery`s are planned by the 
`PlanSubqueries` rule as regular `InSubqueryExec(isDynamicPruning = false)` 
which causes the results to be broadcasted. But those subquery results are 
always used at the driver only so there is no need to broadcast the result.
    
    ### Why are the changes needed?
    
    Perf improvement when AQE is off + make `PlanDynamicPruningFilters` to be 
on par with `PlanAdaptiveDynamicPruningFilters`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs with the revised test case.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51685 from peter-toth/SPARK-52974-fix-dpp-insubqueries-broadcast.
    
    Authored-by: Peter Toth <peter.t...@gmail.com>
    Signed-off-by: Peter Toth <peter.t...@gmail.com>
---
 .../execution/dynamicpruning/PlanDynamicPruningFilters.scala | 12 ++++++------
 .../src/test/scala/org/apache/spark/sql/ExplainSuite.scala   |  4 ++--
 2 files changed, 8 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
index 059729d86bfa..fbd341b6e7b8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/PlanDynamicPruningFilters.scala
@@ -17,15 +17,14 @@
 
 package org.apache.spark.sql.execution.dynamicpruning
 
-import org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, 
BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, 
ListQuery, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeSeq, 
BindReferences, DynamicPruningExpression, DynamicPruningSubquery, Expression, 
Literal}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.Aggregate
 import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern.DYNAMIC_PRUNING_SUBQUERY
 import org.apache.spark.sql.classic.SparkSession
-import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, 
SparkPlan, SubqueryBroadcastExec}
+import org.apache.spark.sql.execution.{InSubqueryExec, QueryExecution, 
SparkPlan, SubqueryBroadcastExec, SubqueryExec}
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
@@ -56,6 +55,7 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession) extends Rule[Sp
       case DynamicPruningSubquery(
           value, buildPlan, buildKeys, broadcastKeyIndices, onlyInBroadcast, 
exprId, _) =>
         val sparkPlan = 
QueryExecution.createSparkPlan(sparkSession.sessionState.planner, buildPlan)
+        val name = s"dynamicpruning#${exprId.id}"
         // Using `sparkPlan` is a little hacky as it is based on the 
assumption that this rule is
         // the first to be applied (apart from `InsertAdaptiveSparkPlan`).
         val canReuseExchange = conf.exchangeReuseEnabled && buildKeys.nonEmpty 
&&
@@ -72,7 +72,6 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession) extends Rule[Sp
           val mode = broadcastMode(buildKeys, executedPlan.output)
           // plan a broadcast exchange of the build side of the join
           val exchange = BroadcastExchangeExec(mode, executedPlan)
-          val name = s"dynamicpruning#${exprId.id}"
           // place the broadcast adaptor for reusing the broadcast results on 
the probe side
           val broadcastValues =
             SubqueryBroadcastExec(name, broadcastKeyIndices, buildKeys, 
exchange)
@@ -85,8 +84,9 @@ case class PlanDynamicPruningFilters(sparkSession: 
SparkSession) extends Rule[Sp
           val aliases = broadcastKeyIndices.map(idx =>
             Alias(buildKeys(idx), buildKeys(idx).toString)())
           val aggregate = Aggregate(aliases, aliases, buildPlan)
-          DynamicPruningExpression(expressions.InSubquery(
-            Seq(value), ListQuery(aggregate, numCols = 
aggregate.output.length)))
+          val sparkPlan = QueryExecution.prepareExecutedPlan(sparkSession, 
aggregate)
+          val values = SubqueryExec(name, sparkPlan)
+          DynamicPruningExpression(InSubqueryExec(value, values, exprId))
         }
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index 9c90e0105a42..b27122a8de2b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -310,10 +310,10 @@ class ExplainSuite extends ExplainSuiteHelper with 
DisableAdaptiveExecutionSuite
               |""".stripMargin
 
           val expected_pattern1 =
-            "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN 
subquery#x"
+            "Subquery:1 Hosting operator id = 1 Hosting Expression = k#xL IN 
dynamicpruning#x"
           val expected_pattern2 =
             "PartitionFilters: \\[isnotnull\\(k#xL\\), 
dynamicpruningexpression\\(k#xL " +
-              "IN subquery#x\\)\\]"
+              "IN dynamicpruning#x\\)\\]"
           val expected_pattern3 =
             "Location: InMemoryFileIndex 
\\[\\S*org.apache.spark.sql.ExplainSuite" +
               "/df2/\\S*, ... 99 entries\\]"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to