This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 665a428cb6cd [SPARK-53905][SQL] Refactor RelationResolution to enable 
code reuse
665a428cb6cd is described below

commit 665a428cb6cd14a4bd5bc895797180749d4ed09d
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Oct 29 11:34:25 2025 -0700

    [SPARK-53905][SQL] Refactor RelationResolution to enable code reuse
    
    ### What changes were proposed in this pull request?
    
    This PR refactor RelationResolution to enable code reuse.
    
    ### Why are the changes needed?
    
    These changes are needed to simplify subsequent PRs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests in `PlanResolutionSuite` already cover plan ID cloning and 
cache hits.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52781 from aokolnychyi/spark-53905.
    
    Authored-by: Anton Okolnychyi <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../sql/catalyst/analysis/RelationResolution.scala | 54 +++++++++++++---------
 1 file changed, 31 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 9e9ad63b3a44..ea5836b8ec2d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.collection.mutable
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
@@ -49,8 +51,13 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
     with Logging
     with LookupCatalog
     with SQLConfHelper {
+
+  type CacheKey = (Seq[String], Option[TimeTravelSpec])
+
   val v1SessionCatalog = catalogManager.v1SessionCatalog
 
+  private def relationCache: mutable.Map[CacheKey, LogicalPlan] = 
AnalysisContext.get.relationCache
+
   /**
    * If we are resolving database objects (relations, functions, etc.) inside 
views, we may need to
    * expand single or multi-part identifiers with the current catalog and 
namespace of when the
@@ -109,12 +116,9 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
     }.orElse {
       expandIdentifier(u.multipartIdentifier) match {
         case CatalogAndIdentifier(catalog, ident) =>
-          val key =
-            (
-              (catalog.name +: ident.namespace :+ 
ident.name).toImmutableArraySeq,
-              finalTimeTravelSpec
-            )
-          AnalysisContext.get.relationCache
+          val key = toCacheKey(catalog, ident, finalTimeTravelSpec)
+          val planId = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
+          relationCache
             .get(key)
             .map { cache =>
               val cachedRelation = cache.transform {
@@ -123,13 +127,7 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
                   newRelation.copyTagsFrom(multi)
                   newRelation
               }
-              u.getTagValue(LogicalPlan.PLAN_ID_TAG)
-                .map { planId =>
-                  val cachedConnectRelation = cachedRelation.clone()
-                  cachedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, 
planId)
-                  cachedConnectRelation
-                }
-                .getOrElse(cachedRelation)
+              cloneWithPlanId(cachedRelation, planId)
             }
             .orElse {
               val writePrivilegesString =
@@ -144,16 +142,8 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
                 u.isStreaming,
                 finalTimeTravelSpec
               )
-              loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
-              u.getTagValue(LogicalPlan.PLAN_ID_TAG)
-                .map { planId =>
-                  loaded.map { loadedRelation =>
-                    val loadedConnectRelation = loadedRelation.clone()
-                    loadedConnectRelation.setTagValue(LogicalPlan.PLAN_ID_TAG, 
planId)
-                    loadedConnectRelation
-                  }
-                }
-                .getOrElse(loaded)
+              loaded.foreach(relationCache.update(key, _))
+              loaded.map(cloneWithPlanId(_, planId))
             }
         case _ => None
       }
@@ -263,4 +253,22 @@ class RelationResolution(override val catalogManager: 
CatalogManager)
       }
     } else None
   }
+
+  private def toCacheKey(
+      catalog: CatalogPlugin,
+      ident: Identifier,
+      timeTravelSpec: Option[TimeTravelSpec] = None): CacheKey = {
+    ((catalog.name +: ident.namespace :+ ident.name).toImmutableArraySeq, 
timeTravelSpec)
+  }
+
+  private def cloneWithPlanId(plan: LogicalPlan, planId: Option[Long]): 
LogicalPlan = {
+    planId match {
+      case Some(id) =>
+        val clone = plan.clone()
+        clone.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
+        clone
+      case None =>
+        plan
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to