[ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17772729#comment-17772729
 ] 

XiDuo You commented on SPARK-45443:
-----------------------------------

hi [~erenavsarogullari] , it seems that, it depends on the behavior of rdd 
cache. Say, what happens if we materialize a cached rdd twice at the same time 
? There are some race condition in block manager per rdd partition so it makes 
things slow. BTW, what's the behavior before we have TableCacheQueryStage ? 
Does not it have this issue ?

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-45443
>                 URL: https://issues.apache.org/jira/browse/SPARK-45443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats in order to apply AQE  optimizations 
> into remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance. For example, if there are 2 TableCacheQueryStage instances 
> referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
> materialization takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization. This behavior can be more visible when cached RDD size is 
> high.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> Would like to get community feedback. Thanks in advance.
> cc [~ulysses] [~cloud_fan]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
>     val index = i % 5
>     (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> *Stages DAGs materializing the same IMR instance:*
> !IMR Materialization - Stage 2.png|width=303,height=134!
> !IMR Materialization - Stage 3.png|width=303,height=134!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to