Asif created SPARK-47609:
----------------------------
Summary: CacheManager Lookup can miss picking InMemoryRelation
corresponding to subplan
Key: SPARK-47609
URL: https://issues.apache.org/jira/browse/SPARK-47609
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.5.1
Reporter: Asif
This issue became apparent while bringing my PR
[https://github.com/apache/spark/pull/43854]
in synch with latest master.
Though that PR is meant to do early collapse of projects so that the tree size
is kept at minimum when projects keep getting added , in the analyzer phase
itself.
But as part of the work, the CacheManager lookup also needed to be modified.
One of the newly added test in master failed. On analysis of failure it turns
out that the cache manager is not picking cached InMemoryRelation for a subplan.
This shows up in following existing test
{quote}test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)
df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
{color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color}
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()
// Verify that df1 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached
plan.
assertCacheDependency(df2)
df.unpersist(blocking = true)
{color:#00875a}// Verify that df1's cache has stayed the same, since df1's
cache already has data{color}
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)
// Verify that df2's cache has been re-cached, with a new physical plan rid of
dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}{quote}
{quote}*{color:#de350b}// This assertion is not right{color}*
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
}{quote}
Since df1 exists in the cache as InMemoryRelation,
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", $"a" + 1)
val df2 = df.filter($"a" > 1)
df2 is derivable from the cached df1.
So when val df2Limit = df2.limit(2), is created, it should utilize the cached
df1 .
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]