[ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17773480#comment-17773480
 ] 

Eren Avsarogullari commented on SPARK-45443:
--------------------------------------------

> But this idea only work for one query
Could you please provide more info on this? For example, same IMR instance can 
be used by multiple queries. Lets say, there are 2 queries as Q0 & Q1 and both 
of them use same IMR instance. Q0 will be materializing IMR instance by 
TableCacheQueryStage and IMR materialization has to be done before Q0 is 
completed. Q1 can still introduce TableCacheQueryStage instance to physical 
plan for same IMR instance, however, this TableCacheQueryStage instance will 
not submit IMR materialization job due to IMR already materialized at that 
level, right?
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281
Can we have any other use-cases not covered?

Also, thinking on following use-cases such as 
*1- Queries using AQE under IMR feature (a.k.a 
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning):* 
TableCacheQueryStage materializes IMR by submitting Spark job per 
TableCacheQueryStage/InMemoryTableScanExec instance. 

*2- Queries not using AQE under IMR feature:* 
IMR will be materialized by 
InMemoryTableScanExec.doExecute/doExecuteColumnar(). Can InMemoryTableScanExec 
based solution (to avoid replicated InMemoryRelation materialization) be more 
inclusive by covering all use-cases?
spark.sql.optimizer.canChangeCachedPlanOutputPartitioning is enabled for Spark 
3.5 as default but for queries using < Spark 3.5 or if the feature may need to 
be disabled in >= Spark 3.5.

> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-45443
>                 URL: https://issues.apache.org/jira/browse/SPARK-45443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats in order to apply AQE  optimizations 
> into remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance. For example, if there are 2 TableCacheQueryStage instances 
> referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
> materialization takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization. This behavior can be more visible when cached RDD size is 
> high.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> Would like to get community feedback. Thanks in advance.
> cc [~ulysses] [~cloud_fan]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
>     val index = i % 5
>     (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> *Stages DAGs materializing the same IMR instance:*
> !IMR Materialization - Stage 2.png|width=303,height=134!
> !IMR Materialization - Stage 3.png|width=303,height=134!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to