This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a85f22ebb80 [SPARK-49454][SQL] Avoid double normalization in the 
cache process
8a85f22ebb80 is described below

commit 8a85f22ebb8066be52dda420ef09861aa27a7421
Author: Xinyi Yu <[email protected]>
AuthorDate: Thu Aug 29 14:43:31 2024 -0700

    [SPARK-49454][SQL] Avoid double normalization in the cache process
    
    ### What changes were proposed in this pull request?
    This PR fixes the issue introduced in 
https://github.com/apache/spark/pull/46465, which is that normalization is 
applied twice during the cache process. Some normalization rules may not be 
idempotent, so applying them repeatedly may break the plan shape and cause an 
unexpected cache miss.
    
    ### Why are the changes needed?
    Fix a bug as stated above.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Trivial fix; run existing test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47923 from anchovYu/avoid-double-normalization.
    
    Authored-by: Xinyi Yu <[email protected]>
    Signed-off-by: Gengliang Wang <[email protected]>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 25 +++++++++++++++-------
 1 file changed, 17 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b96f257e6b5b..aae424afcb8a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -94,7 +94,13 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       query: Dataset[_],
       tableName: Option[String],
       storageLevel: StorageLevel): Unit = {
-    cacheQueryInternal(query.sparkSession, query.queryExecution.normalized, 
tableName, storageLevel)
+    cacheQueryInternal(
+      query.sparkSession,
+      query.queryExecution.analyzed,
+      query.queryExecution.normalized,
+      tableName,
+      storageLevel
+    )
   }
 
   /**
@@ -107,23 +113,25 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       tableName: Option[String],
       storageLevel: StorageLevel): Unit = {
     val normalized = QueryExecution.normalize(spark, planToCache)
-    cacheQueryInternal(spark, normalized, tableName, storageLevel)
+    cacheQueryInternal(spark, planToCache, normalized, tableName, storageLevel)
   }
 
-  // The `planToCache` should have been normalized.
+  // The `normalizedPlan` should have been normalized. It is the cache key.
   private def cacheQueryInternal(
       spark: SparkSession,
-      planToCache: LogicalPlan,
+      unnormalizedPlan: LogicalPlan,
+      normalizedPlan: LogicalPlan,
       tableName: Option[String],
       storageLevel: StorageLevel): Unit = {
     if (storageLevel == StorageLevel.NONE) {
       // Do nothing for StorageLevel.NONE since it will not actually cache any 
data.
-    } else if (lookupCachedDataInternal(planToCache).nonEmpty) {
+    } else if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
       logWarning("Asked to cache already cached data.")
     } else {
       val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
       val inMemoryRelation = sessionWithConfigsOff.withActive {
-        val qe = sessionWithConfigsOff.sessionState.executePlan(planToCache)
+        // it creates query execution from unnormalizedPlan plan to avoid 
multiple normalization.
+        val qe = 
sessionWithConfigsOff.sessionState.executePlan(unnormalizedPlan)
         InMemoryRelation(
           storageLevel,
           qe,
@@ -131,10 +139,11 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       }
 
       this.synchronized {
-        if (lookupCachedDataInternal(planToCache).nonEmpty) {
+        if (lookupCachedDataInternal(normalizedPlan).nonEmpty) {
           logWarning("Data has already been cached.")
         } else {
-          val cd = CachedData(planToCache, inMemoryRelation)
+          // the cache key is the normalized plan
+          val cd = CachedData(normalizedPlan, inMemoryRelation)
           cachedData = cd +: cachedData
           CacheManager.logCacheOperation(log"Added Dataframe cache entry:" +
             log"${MDC(DATAFRAME_CACHE_ENTRY, cd)}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to