[
https://issues.apache.org/jira/browse/SPARK-47177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated SPARK-47177:
-----------------------------------
Labels: pull-request-available (was: )
> Cached SQL plan do not display final AQE plan in explain string
> ---------------------------------------------------------------
>
> Key: SPARK-47177
> URL: https://issues.apache.org/jira/browse/SPARK-47177
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.2, 3.5.0, 4.0.0, 3.5.1, 3.5.2
> Reporter: Ziqi Liu
> Priority: Major
> Labels: pull-request-available
>
> AQE plan is expected to display final plan after execution. This is not true
> for cached SQL plan: it will show the initial plan instead. This behavior
> change is introduced in [https://github.com/apache/spark/pull/40812] it tried
> to fix the concurrency issue with cached plan. I don't have a clear idea how
> yet, maybe we can check whether the AQE plan is finalized(make the final flag
> atomic first, of course), if not we can return the cloned one, otherwise it's
> thread-safe to return the final one, since it's immutable.
>
> A simple repro:
> {code:java}
> d1 = spark.range(1000).withColumn("key", expr("id %
> 100")).groupBy("key").agg({"key": "count"})
> cached_d2 = d1.cache()
> df = cached_d2.withColumn("key2", expr("key %
> 10")).groupBy("key2").agg({"key2": "count"})
> df.collect() {code}
> {code:java}
> >>> df.explain()
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=true
> +- == Final Plan ==
> *(2) HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
> +- AQEShuffleRead coalesced
> +- ShuffleQueryStage 1
> +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS,
> [plan_id=83]
> +- *(1) HashAggregate(keys=[key2#36L],
> functions=[partial_count(key2#36L)])
> +- *(1) Project [(key#27L % 10) AS key2#36L]
> +- TableCacheQueryStage 0
> +- InMemoryTableScan [key#27L]
> +- InMemoryRelation [key#27L, count(key)#33L],
> StorageLevel(disk, memory, deserialized, 1 replicas)
> +- AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[key#4L],
> functions=[count(key#4L)])
> +- Exchange hashpartitioning(key#4L,
> 200), ENSURE_REQUIREMENTS, [plan_id=33]
> +- HashAggregate(keys=[key#4L],
> functions=[partial_count(key#4L)])
> +- Project [(id#2L % 100) AS
> key#4L]
> +- Range (0, 1000, step=1,
> splits=10)
> +- == Initial Plan ==
> HashAggregate(keys=[key2#36L], functions=[count(key2#36L)])
> +- Exchange hashpartitioning(key2#36L, 200), ENSURE_REQUIREMENTS,
> [plan_id=30]
> +- HashAggregate(keys=[key2#36L], functions=[partial_count(key2#36L)])
> +- Project [(key#27L % 10) AS key2#36L]
> +- InMemoryTableScan [key#27L]
> +- InMemoryRelation [key#27L, count(key)#33L],
> StorageLevel(disk, memory, deserialized, 1 replicas)
> +- AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[key#4L],
> functions=[count(key#4L)])
> +- Exchange hashpartitioning(key#4L, 200),
> ENSURE_REQUIREMENTS, [plan_id=33]
> +- HashAggregate(keys=[key#4L],
> functions=[partial_count(key#4L)])
> +- Project [(id#2L % 100) AS key#4L]
> +- Range (0, 1000, step=1, splits=10)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]