[ 
https://issues.apache.org/jira/browse/SPARK-47609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Asif updated SPARK-47609:
-------------------------
    Affects Version/s: 4.0.0

> CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-47609
>                 URL: https://issues.apache.org/jira/browse/SPARK-47609
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0, 3.5.1
>            Reporter: Asif
>            Priority: Major
>              Labels: pull-request-available
>
> This issue became apparent while bringing my PR 
> [https://github.com/apache/spark/pull/43854]
> in synch with latest master.
> Though that PR is meant to do early collapse of projects so that the tree 
> size is kept at minimum when projects keep getting added , in the analyzer 
> phase itself.
> But as part of the work, the CacheManager lookup also needed to be modified.
> One of the newly added test in master failed. On analysis of failure it turns 
> out that the cache manager is not picking cached InMemoryRelation for a 
> subplan.
> This shows up in following existing test
> org.apache.spark.sql.DatasetCacheSuite
> {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
> val df = spark.range(0, 5).toDF("a")
> val df1 = df.withColumn("b", $"a" + 1)
> val df2 = df.filter($"a" > 1)
> df.cache()
> // Add df1 to the CacheManager; the buffer is currently empty.
> df1.cache()
> {color:#4c9aff}// After calling collect(), df1's buffer has been 
> loaded.{color}
> df1.collect()
> // Add df2 to the CacheManager; the buffer is currently empty.
> df2.cache()
> // Verify that df1 is a InMemoryRelation plan with dependency on another 
> cached plan.
> assertCacheDependency(df1)
> val df1InnerPlan = df1.queryExecution.withCachedData
> .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
> // Verify that df2 is a InMemoryRelation plan with dependency on another 
> cached plan.
> assertCacheDependency(df2)
> df.unpersist(blocking = true)
> {color:#00875a}// Verify that df1's cache has stayed the same, since df1's 
> cache already has data{color}
> // before df.unpersist().
> val df1Limit = df1.limit(2)
> val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst
> Unknown macro: \{ case i}
> assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
> // Verify that df2's cache has been re-cached, with a new physical plan rid 
> of dependency
> // on df, since df2's cache had not been loaded before df.unpersist().
> val df2Limit = df2.limit(2)
> val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst
> Unknown macro: \{ case i}
> {quote}
> {quote}*{color:#de350b}// This assertion is not right{color}*
> assert(df2LimitInnerPlan.isDefined &&
> !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
> }
> {quote}
>  
> Since df1 exists in the cache as InMemoryRelation,
> val df = spark.range(0, 5).toDF("a")
> val df1 = df.withColumn("b", $"a" + 1)
> val df2 = df.filter($"a" > 1)
> df2 is derivable from the cached df1.
> So when val df2Limit = df2.limit(2), is created,  it should utilize the 
> cached df1 .
>  
> The pull request for the same is 
> [https://github.com/apache/spark/pull/43854|https://github.com/apache/spark/pull/43854]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to