This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7529622911aa [SPARK-55052][SQL] Add AQEShuffleRead properties to 
Physical Plan Tree
7529622911aa is described below

commit 7529622911aa29aca89aaf55ccf4880b00fa2560
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Tue Feb 24 23:53:45 2026 +0800

    [SPARK-55052][SQL] Add AQEShuffleRead properties to Physical Plan Tree
    
    ### What changes were proposed in this pull request?
    
    `AQEShuffleRead` can have `local` / `coalesced` / `skewed` / `coalesced and 
skewed` properties when reading shuffle files. When Physical Plan Tree is 
complex, it is hard to track this info by correlating with AQEShuffleRead 
details such as which AQEShuffleRead has local read or skewed partition info 
etc. For example, following skewed SortMergeJoin case, this helps to understand 
which SMJ leg has AQEShuffleRead with skew. This addition aims to access this 
kind of use-cases at physical  [...]
    
    **Current Physical Plan Tree:**
    ```
    == Physical Plan ==
    AdaptiveSparkPlan (24)
    +- == Final Plan ==
       ResultQueryStage (17), Statistics(sizeInBytes=8.0 EiB)
       +- * Project (16)
          +- * SortMergeJoin(skew=true) Inner (15)
             :- * Sort (7)
             :  +- AQEShuffleRead (6)
             :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=15.6 KiB, 
rowCount=1.00E+3)
             :        +- Exchange (4)
             :           +- * Project (3)
             :              +- * Filter (2)
             :                 +- * Range (1)
             +- * Sort (14)
                +- AQEShuffleRead (13)
                   +- ShuffleQueryStage (12), Statistics(sizeInBytes=3.1 KiB, 
rowCount=200)
                      +- Exchange (11)
                         +- * Project (10)
                            +- * Filter (9)
                               +- * Range (8)
    ```
    
    **New Physical Plan Tree:**
    ```
    == Physical Plan ==
    AdaptiveSparkPlan (24)
    +- == Final Plan ==
       ResultQueryStage (17), Statistics(sizeInBytes=8.0 EiB)
       +- * Project (16)
          +- * SortMergeJoin(skew=true) Inner (15)
             :- * Sort (7)
             :  +- AQEShuffleRead (6), coalesced
             :     +- ShuffleQueryStage (5), Statistics(sizeInBytes=15.6 KiB, 
rowCount=1.00E+3)
             :        +- Exchange (4)
             :           +- * Project (3)
             :              +- * Filter (2)
             :                 +- * Range (1)
             +- * Sort (14)
                +- AQEShuffleRead (13), coalesced and skewed
                   +- ShuffleQueryStage (12), Statistics(sizeInBytes=3.1 KiB, 
rowCount=200)
                      +- Exchange (11)
                         +- * Project (10)
                            +- * Filter (9)
                               +- * Range (8)
    ```
    
    ### Why are the changes needed?
    
    When physical plan tree is complex (e.g: composed by 1000+ physical nodes), 
it is hard to correlate this information with `AQEShuffleRead` details.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, when the user investigates the physical plan, new AQEShuffleRead 
properties will be seen at Physical Plan Tree.
    
    ### How was this patch tested?
    
    Added a new UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #53817 from erenavsarogullari/SPARK-55052.
    
    Authored-by: Eren Avsarogullari <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/adaptive/AQEShuffleReadExec.scala    | 10 ++++
 .../scala/org/apache/spark/sql/ExplainSuite.scala  | 56 +++++++++++++++++++++-
 2 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
index 2a600b31cc29..eba0346a94bd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEShuffleReadExec.scala
@@ -278,4 +278,14 @@ case class AQEShuffleReadExec private(
 
   override protected def withNewChildInternal(newChild: SparkPlan): 
AQEShuffleReadExec =
     copy(child = newChild)
+
+  override def simpleStringWithNodeId(): String = {
+    val args = stringArgs.mkString(", ")
+    if (args.nonEmpty) {
+      super.simpleStringWithNodeId() + ", " + args
+    } else {
+      super.simpleStringWithNodeId()
+    }
+  }
+
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index b27122a8de2b..e2a01775b89d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import org.apache.spark.sql.catalyst.plans.Inner
 import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
+import org.apache.spark.sql.execution.adaptive.{AQEPropagateEmptyRelation, 
DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
 import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.SortMergeJoinExec
@@ -941,6 +941,60 @@ class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuit
     results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", 
"plan_id=x")
     assert(results == expectedTree)
   }
+
+  test("SPARK-55052: Verify exposed AQEShuffleRead properties (coalesced and 
coalesced-skewed) " +
+    "in Physical Plan Tree") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
+      SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "100",
+      SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "100",
+      SQLConf.SHUFFLE_PARTITIONS.key -> "10") {
+      withTempView("view1", "skewDataView2") {
+        spark
+          .range(0, 1000, 1, 10)
+          .selectExpr("id % 10 as key1")
+          .createOrReplaceTempView("view1")
+        spark
+          .range(0, 200, 1, 10)
+          .selectExpr("id % 1 as key2")
+          .createOrReplaceTempView("skewDataView2")
+
+        val df = spark.sql("SELECT key1 FROM view1 JOIN skewDataView2 ON key1 
= key2")
+        df.collect()
+
+        // Verify expected FinalPlan substring including AQEShuffleRead 
properties
+        checkKeywordsExistsInExplain(
+          df = df,
+          mode = ExplainMode.fromString("FORMATTED"),
+          keywords = "AQEShuffleRead (6), coalesced", "AQEShuffleRead (13), 
coalesced and skewed")
+      }
+    }
+  }
+
+  test("SPARK-55052: Verify exposed AQEShuffleRead properties (local) in 
Physical Plan Tree") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+      SQLConf.COALESCE_PARTITIONS_ENABLED.key -> "true",
+      SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> 
AQEPropagateEmptyRelation.ruleName,
+      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1"
+    ) {
+      val df1 = spark.range(10).withColumn("a", $"id")
+      val df2 = spark.range(10).withColumn("b", $"id")
+
+      val joinedDF = df1.where($"a" > 10)
+        .join(df2.where($"b" > 10), Seq("id"), "left_outer")
+      checkAnswer(joinedDF, Seq())
+      joinedDF.collect()
+
+      // Verify expected FinalPlan substring including AQEShuffleRead 
properties
+      checkKeywordsExistsInExplain(
+        df = joinedDF,
+        mode = ExplainMode.fromString("FORMATTED"),
+        keywords = "AQEShuffleRead (6), local", "AQEShuffleRead (9), local")
+    }
+  }
+
 }
 
 case class ExplainSingleData(id: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to