This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d2180138888 [SPARK-41048][SQL] Improve output partitioning and 
ordering with AQE cache
d2180138888 is described below

commit d21801388884ae56c6925a5f11b9684e531de3c0
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Thu Nov 17 11:34:33 2022 +0800

    [SPARK-41048][SQL] Improve output partitioning and ordering with AQE cache
    
    ### What changes were proposed in this pull request?
    
    Try our best to give a stable output partitioning and ordering if current 
executed plan is final plan in `InMemoryTableScanExec`.
    Make AdaptiveSparkPlanExec expose the isFinal flag
    
    ### Why are the changes needed?
    
    The cached plan in InMemoryRelation can be AdaptiveSparkPlanExec, however 
AdaptiveSparkPlanExec deos not specify its output partitioning and ordering. It 
causes unnecessary shuffle and local sort for downstream action.
    
    ```
              ...
               |
      AdaptiveSparkPlanExec
               |
      InMemoryTableScanExec
               |
              ...
    ```
    after this pr, the `InMemoryTableScanExec` can preverse the output 
partitioning and ordering.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no, only improve performance
    
    ### How was this patch tested?
    
    add test
    
    Closes #38558 from ulysses-you/aqe-cache.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  6 ++++--
 .../execution/columnar/InMemoryTableScanExec.scala | 12 ++++++++---
 .../org/apache/spark/sql/DataFrameSuite.scala      | 23 ++++++++++++++++++++--
 3 files changed, 34 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 828a129a876..62a75e75345 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -188,7 +188,7 @@ case class AdaptiveSparkPlanExec(
 
   @volatile private var currentPhysicalPlan = initialPlan
 
-  private var isFinalPlan = false
+  @volatile private var _isFinalPlan = false
 
   private var currentStageId = 0
 
@@ -205,6 +205,8 @@ case class AdaptiveSparkPlanExec(
 
   def executedPlan: SparkPlan = currentPhysicalPlan
 
+  def isFinalPlan: Boolean = _isFinalPlan
+
   override def conf: SQLConf = context.session.sessionState.conf
 
   override def output: Seq[Attribute] = inputPlan.output
@@ -329,7 +331,7 @@ case class AdaptiveSparkPlanExec(
         optimizeQueryStage(result.newPlan, isFinalStage = true),
         postStageCreationRules(supportsColumnar),
         Some((planChangeLogger, "AQE Post Stage Creation")))
-      isFinalPlan = true
+      _isFinalPlan = true
       executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
       currentPhysicalPlan
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
index da9316efdb4..0f00a6a3559 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.columnar.CachedBatch
 import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
@@ -111,10 +112,15 @@ case class InMemoryTableScanExec(
 
   override def output: Seq[Attribute] = attributes
 
+  private def cachedPlan = relation.cachedPlan match {
+    case adaptive: AdaptiveSparkPlanExec if adaptive.isFinalPlan => 
adaptive.executedPlan
+    case other => other
+  }
+
   private def updateAttribute(expr: Expression): Expression = {
     // attributes can be pruned so using relation's output.
     // E.g., relation.output is [id, item] but this scan's output can be 
[item] only.
-    val attrMap = AttributeMap(relation.cachedPlan.output.zip(relation.output))
+    val attrMap = AttributeMap(cachedPlan.output.zip(relation.output))
     expr.transform {
       case attr: Attribute => attrMap.getOrElse(attr, attr)
     }
@@ -123,7 +129,7 @@ case class InMemoryTableScanExec(
   // The cached version does not change the outputPartitioning of the original 
SparkPlan.
   // But the cached version could alias output, so we need to replace output.
   override def outputPartitioning: Partitioning = {
-    relation.cachedPlan.outputPartitioning match {
+    cachedPlan.outputPartitioning match {
       case e: Expression => updateAttribute(e).asInstanceOf[Partitioning]
       case other => other
     }
@@ -132,7 +138,7 @@ case class InMemoryTableScanExec(
   // The cached version does not change the outputOrdering of the original 
SparkPlan.
   // But the cached version could alias output, so we need to replace output.
   override def outputOrdering: Seq[SortOrder] =
-    
relation.cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
+    cachedPlan.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])
 
   lazy val enableAccumulatorsForTest: Boolean = 
conf.inMemoryTableScanStatisticsEnabled
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index fabd0a4e1a9..6be1e424719 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -40,10 +40,10 @@ import 
org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, 
LocalRelation, LogicalPlan, OneRowRelation, Statistics}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.FakeV2Provider
-import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, 
WholeStageCodegenExec}
+import org.apache.spark.sql.execution.{FilterExec, LogicalRDD, QueryExecution, 
SortExec, WholeStageCodegenExec}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.aggregate.HashAggregateExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ReusedExchangeExec, ShuffleExchangeExec, ShuffleExchangeLike}
 import org.apache.spark.sql.expressions.{Aggregator, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -3513,6 +3513,25 @@ class DataFrameSuite extends QueryTest
       assert(df.queryExecution.executedPlan.execute().getNumPartitions == 2)
     }
   }
+
+  test("SPARK-41048: Improve output partitioning and ordering with AQE cache") 
{
+    withSQLConf(
+        SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> "true",
+        SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
+      val df1 = spark.range(10).selectExpr("cast(id as string) c1")
+      val df2 = spark.range(10).selectExpr("cast(id as string) c2")
+      val cached = df1.join(df2, $"c1" === $"c2").cache()
+      cached.count()
+      val executedPlan = 
cached.groupBy("c1").agg(max($"c2")).queryExecution.executedPlan
+      // before is 2 sort and 1 shuffle
+      assert(collect(executedPlan) {
+        case s: ShuffleExchangeLike => s
+      }.isEmpty)
+      assert(collect(executedPlan) {
+        case s: SortExec => s
+      }.isEmpty)
+    }
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to