This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 8a85f22ebb80 [SPARK-49454][SQL] Avoid double normalization in the
cache process
8a85f22ebb80 is described below
commit 8a85f22ebb8066be52dda420ef09861aa27a7421
Author: Xinyi Yu <[email protected]>
AuthorDate: Thu Aug 29 14:43:31 2024 -0700
[SPARK-49454][SQL] Avoid double normalization in the cache process
### What changes were proposed in this pull request?
This PR fixes the issue introduced in
https://github.com/apache/spark/pull/46465, which is that normalization is
applied twice during the cache process. Some normalization rules may not be
idempotent, so applying them repeatedly may break the plan shape and cause an
unexpected cache miss.
### Why are the changes needed?
Fix a bug as stated above.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Trivial fix; run existing test.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #47923 from anchovYu/avoid-double-normalization.
Authored-by: Xinyi Yu <[email protected]>
Signed-off-by: Gengliang Wang <[email protected]>
---
.../apache/spark/sql/execution/CacheManager.scala | 25 +++++++++++++++-------
1 file changed, 17 insertions(+), 8 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b96f257e6b5b..aae424afcb8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -94,7 +94,13 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
query: Dataset[_],
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
- cacheQueryInternal(query.sparkSession, query.queryExecution.normalized,
tableName, storageLevel)
+ cacheQueryInternal(
+ query.sparkSession,
+ query.queryExecution.analyzed,
+ query.queryExecution.normalized,
+ tableName,
+ storageLevel
+ )
}
/**
@@ -107,23 +113,25 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
val normalized = QueryExecution.normalize(spark, planToCache)
- cacheQueryInternal(spark, normalized, tableName, storageLevel)
+ cacheQueryInternal(spark, planToCache, normalized, tableName, storageLevel)
}
- // The `planToCache` should have been normalized.
+ // The `normalizedPlan` should have been normalized. It is the cache key.
private def cacheQueryInternal(
spark: SparkSession,
- planToCache: LogicalPlan,
+ unnormalizedPlan: LogicalPlan,
+ normalizedPlan: LogicalPlan,
tableName: Option[String],
storageLevel: StorageLevel): Unit = {
if (storageLevel == StorageLevel.NONE) {
// Do nothing for StorageLevel.NONE since it will not actually cache any
data.
- } else if (lookupCachedDataInternal(planToCache).nonEmpty) {
+ } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val inMemoryRelation = sessionWithConfigsOff.withActive {
- val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
+ // it creates query execution from unnormalizedPlan plan to avoid
multiple normalization.
+ val qe =
sessionWithConfigsOff.sessionState.executePlan(unnormalizedPlan)
InMemoryRelation(
storageLevel,
qe,
@@ -131,10 +139,11 @@ class CacheManager extends Logging with
AdaptiveSparkPlanHelper {
}
this.synchronized {
- if (lookupCachedDataInternal(planToCache).nonEmpty) {
+ if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
logWarning("Data has already been cached.")
} else {
- val cd = CachedData(planToCache, inMemoryRelation)
+ // the cache key is the normalized plan
+ val cd = CachedData(normalizedPlan, inMemoryRelation)
cachedData = cd +: cachedData
CacheManager.logCacheOperation(log"Added Dataframe cache entry:" +
log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]