Ziqi Liu created SPARK-47177:
--------------------------------

             Summary: Cached SQL plan do not display final AQE plan in explain 
string
                 Key: SPARK-47177
                 URL: https://issues.apache.org/jira/browse/SPARK-47177
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2
            Reporter: Ziqi Liu


AQE plan is expected to display final plan after execution. This is not true 
for cached SQL plan: it will show the initial plan instead. This behavior 
change is introduced in [https://github.com/apache/spark/pull/40812] it tried 
to fix the concurrency issue with cached plan.  I don't have a clear idea how 
yet, maybe we can check whether the AQE plan is finalized(make the final flag 
atomic first, of course), if not we can return the cloned one, otherwise it's 
thread-safe to return the final one, since it's immutable.

 

A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id % 
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key % 
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1, 
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- AQEShuffleRead coalesced
      +- ShuffleQueryStage 1
         +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=83]
            +- *(1) HashAggregate(keys=[key2#36L], 
functions=[partial_count(key2#36L)])
               +- *(1) Project [(key#27L % 10) AS key2#36L]
                  +- TableCacheQueryStage 0
                     +- InMemoryTableScan [key#27L]
                           +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                                 +- AdaptiveSparkPlan isFinalPlan=false
                                    +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                                       +- Exchange hashpartitioning(key#4L, 
200), ENSURE_REQUIREMENTS, [plan_id=33]
                                          +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                             +- Project [(id#2L % 100) AS 
key#4L]
                                                +- Range (0, 1000, step=1, 
splits=10)
+- == Initial Plan ==
   HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
   +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS, 
[plan_id=30]
      +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
         +- Project [(key#27L % 10) AS key2#36L]
            +- InMemoryTableScan [key#27L]
                  +- InMemoryRelation [key#27L, count(key)#33L], 
StorageLevel(disk, memory, deserialized, 1 replicas)
                        +- AdaptiveSparkPlan isFinalPlan=false
                           +- HashAggregate(keys=[key#4L], 
functions=[count(key#4L)])
                              +- Exchange hashpartitioning(key#4L, 200), 
ENSURE_REQUIREMENTS, [plan_id=33]
                                 +- HashAggregate(keys=[key#4L], 
functions=[partial_count(key#4L)])
                                    +- Project [(id#2L % 100) AS key#4L]
                                       +- Range (0, 1000, step=1, splits=10) 
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to