[
https://issues.apache.org/jira/browse/SPARK-41214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Eren Avsarogullari updated SPARK-41214:
---------------------------------------
Summary: SQL metrics are missing from Spark UI when AQE for Cached
DataFrame is enabled (was: SubPlan metrics are missed when AQE is enabled
under InMemoryRelation)
> SQL metrics are missing from Spark UI when AQE for Cached DataFrame is enabled
> ------------------------------------------------------------------------------
>
> Key: SPARK-41214
> URL: https://issues.apache.org/jira/browse/SPARK-41214
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.4.0
> Reporter: Eren Avsarogullari
> Priority: Major
> Attachments: DAG when AQE=ON and AQECachedDFSupport=ON with fix.png,
> DAG when AQE=ON and AQECachedDFSupport=ON without fix.png
>
>
> *spark.sql.optimizer.canChangeCachedPlanOutputPartitioning* enables AQE
> optimizations under InMemoryRelation(IMR) nodes. Following sample query has
> IMR node on both BroadcastHashJoin legs. However,
> when spark.sql.optimizer.canChangeCachedPlanOutputPartitioning = true,
> following datas are missed due to lack of final sub-plans (under IMR)
> submissions (into UI).
> {code:java}
> - Physical operators' SQLMetrics (before AdaptiveSparkPlan) are missed such
> as Exchange and HashAggregate on left BHJ leg and HashAggregate on right BHJ
> leg,
> - WSCG blocks are missed on left BHJ leg,
> - AQEShuffleRead node is missed on left BHJ leg. {code}
> *Sample to reproduce:*
> {code:java}
> val spark = SparkSession
> .builder()
> .config("spark.sql.adaptive.enabled", "true")
> .config("spark.sql.optimizer.canChangeCachedPlanOutputPartitioning",
> "true")
> .master("local[*]")
> .getOrCreate()
> import spark.implicits._
> // Create 1th DF
> val arr = Seq(
> (1, "Employee_1", "Department_1"),
> (2, "Employee_2", "Department_2"))
> val df = arr.toDF("id", "name", "department")
> .filter('id < 3)
> .groupBy('name)
> .count()
> df.cache()
> // Create 2th DF
> val arr2 = Seq((1, "Employee_1", "Department_1"))
> val df2 = arr2.toDF("id", "name", "department")
> .filter('id > 0)
> .groupBy('name)
> .count()
> df2.cache()
> // Trigger query execution
> val df3 = df.join(df2, "name")
> df3.show() {code}
> *DAG before fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON without
> fix.png|width=33,height=86!*
> *DAG after fix:*
> *!DAG when AQE=ON and AQECachedDFSupport=ON with fix.png|width=33,height=82!*
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]