[ 
https://issues.apache.org/jira/browse/SPARK-45443?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eren Avsarogullari updated SPARK-45443:
---------------------------------------
    Description: 
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats in order to apply AQE  optimizations onto 
remaining physical plan stages. TableCacheQueryStage materializes 
InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
instance. For example, if there are 2 TableCacheQueryStage instances 
referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
materialization takes longer, following logic will return false 
(inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
materialization. This behavior can be more visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
    val index = i % 5
    (index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!

  was:
TableCacheQueryStage is created per InMemoryTableScanExec by 
AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output (cached 
RDD) to provide runtime stats to apply AQE  optimizations onto remaining 
physical plan stages. TableCacheQueryStage materializes InMemoryTableScanExec 
eagerly by submitting job per TableCacheQueryStage instance. For example, if 
there are 2 TableCacheQueryStage instances referencing same IMR instance 
(cached RDD) and first InMemoryTableScanExec' s materialization takes longer, 
following logic will return false (inMemoryTableScan.isMaterialized => false) 
and this may cause replicated IMR materialization. This behavior can be more 
visible when cached RDD size is high.
[https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]

Would like to get community feedback. Thanks in advance.
cc [~ulysses] [~cloud_fan]

*Sample Query to simulate the problem:*
// Both join legs uses same IMR instance
{code:java}
import spark.implicits._

val arr = (1 to 12).map { i => {
    val index = i % 5
    (index, s"Employee_$index", s"Department_$index")
  }
}
val df = arr.toDF("id", "name", "department")
  .filter('id >= 0)
  .sort("id")
  .groupBy('id, 'name, 'department)
  .count().as("count")
df.persist()

val df2 = df.sort("count").filter('count <= 2)
val df3 = df.sort("count").filter('count >= 3)
val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")

df4.show() {code}
*Physical Plan:*
{code:java}
== Physical Plan ==
AdaptiveSparkPlan (31)
+- == Final Plan ==
   CollectLimit (21)
   +- * Project (20)
      +- * SortMergeJoin FullOuter (19)
         :- * Sort (10)
         :  +- * Filter (9)
         :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
         :        +- InMemoryTableScan (1)
         :              +- InMemoryRelation (2)
         :                    +- AdaptiveSparkPlan (7)
         :                       +- HashAggregate (6)
         :                          +- Exchange (5)
         :                             +- HashAggregate (4)
         :                                +- LocalTableScan (3)
         +- * Sort (18)
            +- * Filter (17)
               +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
rowCount=5)
                  +- InMemoryTableScan (11)
                        +- InMemoryRelation (12)
                              +- AdaptiveSparkPlan (15)
                                 +- HashAggregate (14)
                                    +- Exchange (13)
                                       +- HashAggregate (4)
                                          +- LocalTableScan (3) {code}
*Stages DAGs materializing the same IMR instance:*
!IMR Materialization - Stage 2.png|width=303,height=134!
!IMR Materialization - Stage 3.png|width=303,height=134!


> Revisit TableCacheQueryStage to avoid replicated InMemoryRelation 
> materialization
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-45443
>                 URL: https://issues.apache.org/jira/browse/SPARK-45443
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.5.0
>            Reporter: Eren Avsarogullari
>            Priority: Major
>         Attachments: IMR Materialization - Stage 2.png, IMR Materialization - 
> Stage 3.png
>
>
> TableCacheQueryStage is created per InMemoryTableScanExec by 
> AdaptiveSparkPlanExec and it materializes InMemoryTableScanExec output 
> (cached RDD) to provide runtime stats in order to apply AQE  optimizations 
> onto remaining physical plan stages. TableCacheQueryStage materializes 
> InMemoryTableScanExec eagerly by submitting job per TableCacheQueryStage 
> instance. For example, if there are 2 TableCacheQueryStage instances 
> referencing same IMR instance (cached RDD) and first InMemoryTableScanExec' s 
> materialization takes longer, following logic will return false 
> (inMemoryTableScan.isMaterialized => false) and this may cause replicated IMR 
> materialization. This behavior can be more visible when cached RDD size is 
> high.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L281]
> Would like to get community feedback. Thanks in advance.
> cc [~ulysses] [~cloud_fan]
> *Sample Query to simulate the problem:*
> // Both join legs uses same IMR instance
> {code:java}
> import spark.implicits._
> val arr = (1 to 12).map { i => {
>     val index = i % 5
>     (index, s"Employee_$index", s"Department_$index")
>   }
> }
> val df = arr.toDF("id", "name", "department")
>   .filter('id >= 0)
>   .sort("id")
>   .groupBy('id, 'name, 'department)
>   .count().as("count")
> df.persist()
> val df2 = df.sort("count").filter('count <= 2)
> val df3 = df.sort("count").filter('count >= 3)
> val df4 = df2.join(df3, Seq("id", "name", "department"), "fullouter")
> df4.show() {code}
> *Physical Plan:*
> {code:java}
> == Physical Plan ==
> AdaptiveSparkPlan (31)
> +- == Final Plan ==
>    CollectLimit (21)
>    +- * Project (20)
>       +- * SortMergeJoin FullOuter (19)
>          :- * Sort (10)
>          :  +- * Filter (9)
>          :     +- TableCacheQueryStage (8), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>          :        +- InMemoryTableScan (1)
>          :              +- InMemoryRelation (2)
>          :                    +- AdaptiveSparkPlan (7)
>          :                       +- HashAggregate (6)
>          :                          +- Exchange (5)
>          :                             +- HashAggregate (4)
>          :                                +- LocalTableScan (3)
>          +- * Sort (18)
>             +- * Filter (17)
>                +- TableCacheQueryStage (16), Statistics(sizeInBytes=210.0 B, 
> rowCount=5)
>                   +- InMemoryTableScan (11)
>                         +- InMemoryRelation (12)
>                               +- AdaptiveSparkPlan (15)
>                                  +- HashAggregate (14)
>                                     +- Exchange (13)
>                                        +- HashAggregate (4)
>                                           +- LocalTableScan (3) {code}
> *Stages DAGs materializing the same IMR instance:*
> !IMR Materialization - Stage 2.png|width=303,height=134!
> !IMR Materialization - Stage 3.png|width=303,height=134!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to