[ https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772729#comment-17772729 ]
XiDuo You commented on SPARK-45443: ----------------------------------- hi [~erenavsarogullari] , it seems that, it depends on the behavior of rdd cache. Say, what happens if we materialize a cached rdd twice at the same time ? There are some race condition in block manager per rdd partition so it makes things slow. BTW, what's the behavior before we have TableCacheQueryStage ? Does not it have this issue ? > Revisit TableCacheQueryStage to avoid replicated InMemoryRelation > materialization > --------------------------------------------------------------------------------- > > Key: SPARK-45443 > URL: https://issues.apache.org/jira/browse/SPARK-45443 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.5.0 > Reporter: Eren Avsarogullari > Priority: Major > Attachments: IMR Materialization - Stage 2.png, IMR Materialization - > Stage 3.png > > > TableCacheQueryStage is created per InMemoryTableScanExec by > AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output > (cached RDD) to provide runtime stats in order to apply AQE optimizations > into remaining physical plan stages. TableCacheQueryStage materializes > InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage > instance. For example, if there are 2 TableCacheQueryStage instances > referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s > materialization takes longer, following logic will return false > (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR > materialization. This behavior can be more visible when cached RDD size is > high. > [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281] > Would like to get community feedback. Thanks in advance. > cc [~ulysses] [~cloud_fan] > *Sample Query to simulate the problem:* > // Both join legs uses same IMR instance > {code:java} > import spark.implicits._ > val arr = (1 to 12).map { i => { > val index = i % 5 > (index, s"Employee_$index", s"Department_$index") > } > } > val df = arr.toDF("id", "name", "department") > .filter('id >= 0) > .sort("id") > .groupBy('id, 'name, 'department) > .count().as("count") > df.persist() > val df2 = df.sort("count").filter('count <= 2) > val df3 = df.sort("count").filter('count >= 3) > val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter") > df4.show() {code} > *Physical Plan:* > {code:java} > == Physical Plan == > AdaptiveSparkPlan (31) > +- == Final Plan == > CollectLimit (21) > +- * Project (20) > +- * SortMergeJoin FullOuter (19) > :- * Sort (10) > : +- * Filter (9) > : +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, > rowCount=5) > : +- InMemoryTableScan (1) > : +- InMemoryRelation (2) > : +- AdaptiveSparkPlan (7) > : +- HashAggregate (6) > : +- Exchange (5) > : +- HashAggregate (4) > : +- LocalTableScan (3) > +- * Sort (18) > +- * Filter (17) > +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, > rowCount=5) > +- InMemoryTableScan (11) > +- InMemoryRelation (12) > +- AdaptiveSparkPlan (15) > +- HashAggregate (14) > +- Exchange (13) > +- HashAggregate (4) > +- LocalTableScan (3) {code} > *Stages DAGs materializing the same IMR instance:* > !IMR Materialization - Stage 2.png|width=303,height=134! > !IMR Materialization - Stage 3.png|width=303,height=134! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org