Ziqi Liu created SPARK-47177:
--------------------------------
Summary: Cached SQL plan do not display final AQE plan in explain
string
Key: SPARK-47177
URL: https://issues.apache.org/jira/browse/SPARK-47177
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 3.5.1, 3.5.0, 3.4.2, 4.0.0, 3.5.2
Reporter: Ziqi Liu
AQE plan is expected to display final plan after execution. This is not true
for cached SQL plan: it will show the initial plan instead. This behavior
change is introduced in [https://github.com/apache/spark/pull/40812] it tried
to fix the concurrency issue with cached plan. I don't have a clear idea how
yet, maybe we can check whether the AQE plan is finalized(make the final flag
atomic first, of course), if not we can return the cloned one, otherwise it's
thread-safe to return the final one, since it's immutable.
A simple repro:
{code:java}
d1 = spark.range(1000).withColumn("key", expr("id %
100")).groupBy("key").agg({"key": "count"})
cached_d2 = d1.cache()
df = cached_d2.withColumn("key2", expr("key %
10")).groupBy("key2").agg({"key2": "count"})
df.collect() {code}
{code:java}
Row(key2=7, count(key2)=10), Row(key2=3, count(key2)=10), Row(key2=1,
count(key2)=10), Row(key2=8, count(key2)=10)]
>>> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
+- AQEShuffleRead coalesced
+- ShuffleQueryStage 1
+- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS,
[plan_id=83]
+- *(1) HashAggregate(keys=[key2#36L],
functions=[partial_count(key2#36L)])
+- *(1) Project [(key#27L % 10) AS key2#36L]
+- TableCacheQueryStage 0
+- InMemoryTableScan [key#27L]
+- InMemoryRelation [key#27L, count(key)#33L],
StorageLevel(disk, memory, deserialized, 1 replicas)
+- AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[key#4L],
functions=[count(key#4L)])
+- Exchange hashpartitioning(key#4L,
200), ENSURE_REQUIREMENTS, [plan_id=33]
+- HashAggregate(keys=[key#4L],
functions=[partial_count(key#4L)])
+- Project [(id#2L % 100) AS
key#4L]
+- Range (0, 1000, step=1,
splits=10)
+- == Initial Plan ==
HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
+- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS,
[plan_id=30]
+- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
+- Project [(key#27L % 10) AS key2#36L]
+- InMemoryTableScan [key#27L]
+- InMemoryRelation [key#27L, count(key)#33L],
StorageLevel(disk, memory, deserialized, 1 replicas)
+- AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[key#4L],
functions=[count(key#4L)])
+- Exchange hashpartitioning(key#4L, 200),
ENSURE_REQUIREMENTS, [plan_id=33]
+- HashAggregate(keys=[key#4L],
functions=[partial_count(key#4L)])
+- Project [(id#2L % 100) AS key#4L]
+- Range (0, 1000, step=1, splits=10)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]