This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new daa21cd2bd46 [SPARK-54873][SQL] Simplify V2TableReference resolution 
as only temp view may contain it
daa21cd2bd46 is described below

commit daa21cd2bd46c857cf07e8a70c162448c94aa408
Author: Wenchen Fan <[email protected]>
AuthorDate: Tue Jan 20 08:39:09 2026 +0800

    [SPARK-54873][SQL] Simplify V2TableReference resolution as only temp view 
may contain it
    
    ### What changes were proposed in this pull request?
    
    Currently we resolve `V2TableReference` at multiple places in the rule 
`ResolveRelations`. Actually only temp view with resolved plan (not SQL view) 
may contain `V2TableReference`, and we only need to resolve `V2TableReference` 
where we return the view plan.
    
    `V2TableReference` is marked as resolved, but it must be replaced with 
`DataSourceV2Relation` during analysis. This PR also validates it in 
`CheckAnalysis`.
    
    ### Why are the changes needed?
    
    Simplify code
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    cursor 2.2.44
    
    Closes #53646 from cloud-fan/refresh.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 35 +++++++++++-----------
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  4 +++
 .../sql/catalyst/analysis/RelationResolution.scala |  3 +-
 .../apache/spark/sql/execution/CacheManager.scala  |  4 ---
 4 files changed, 23 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 4e313933c453..6e899e958f15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -997,27 +997,30 @@ class Analyzer(
       case view: View if !view.child.resolved =>
         ViewResolution
           .resolve(view, options, resolveChild = executeSameContext, 
checkAnalysis = checkAnalysis)
+      // V2TableReference is a placeholder for DSv2 tables that needs to be 
resolved to
+      // DataSourceV2Relation on each view access. Only dataframe temp view 
may contain it
+      // as it stores resolved plans directly.
+      case view: View if view.isTempViewStoringAnalyzedPlan =>
+        view.copy(child = resolveTableReferences(view.child))
       case p @ SubqueryAlias(_, view: View) =>
         p.copy(child = resolveViews(view, options))
       case _ => plan
     }
 
+    // Unwrap temp views storing analyzed plans and resolve V2TableReference 
nodes in the child.
     private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
       EliminateSubqueryAliases(plan) match {
-        case v: View if v.isTempViewStoringAnalyzedPlan => v.child
+        case v: View if v.isTempViewStoringAnalyzedPlan => 
resolveTableReferences(v.child)
         case other => other
       }
     }
 
-    private def resolveAsV2Relation(plan: LogicalPlan): 
Option[DataSourceV2Relation] = {
-      plan match {
-        case ref: V2TableReference =>
-          EliminateSubqueryAliases(relationResolution.resolveReference(ref)) 
match {
-            case r: DataSourceV2Relation => Some(r)
-            case _ => None
-          }
-        case r: DataSourceV2Relation => Some(r)
-        case _ => None
+    // Resolve V2TableReference nodes in a plan. V2TableReference is only 
created for temp views
+    // (via V2TableReference.createForTempView), so we only need to resolve it 
when returning
+    // the plan of temp views (in resolveViews and unwrapRelationPlan).
+    private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {
+      plan.resolveOperatorsUp {
+        case r: V2TableReference => relationResolution.resolveReference(r)
       }
     }
 
@@ -1027,14 +1030,13 @@ class Analyzer(
         val relation = table match {
           case u: UnresolvedRelation if !u.isStreaming =>
             resolveRelation(u).getOrElse(u)
-          case r: V2TableReference =>
-            relationResolution.resolveReference(r)
           case other => other
         }
 
         // Inserting into a file-based temporary view is allowed.
         // (e.g., spark.read.parquet("path").createOrReplaceTempView("t").
         // Thus, we need to look at the raw plan if `relation` is a temporary 
view.
+        // unwrapRelationPlan also resolves V2TableReference nodes in temp 
view plans.
         unwrapRelationPlan(relation) match {
           case v: View =>
             throw 
QueryCompilationErrors.insertIntoViewNotAllowedError(v.desc.identifier, table)
@@ -1051,11 +1053,10 @@ class Analyzer(
               case u: UnresolvedCatalogRelation =>
                 throw QueryCompilationErrors.writeIntoV1TableNotAllowedError(
                   u.tableMeta.identifier, write)
-              case plan =>
-                resolveAsV2Relation(plan).map(write.withNewTable).getOrElse {
-                  throw 
QueryCompilationErrors.writeIntoTempViewNotAllowedError(
-                    u.multipartIdentifier.quoted)
-                }
+              case r: DataSourceV2Relation => write.withNewTable(r)
+              case _ =>
+                throw QueryCompilationErrors.writeIntoTempViewNotAllowedError(
+                  u.multipartIdentifier.quoted)
             }.getOrElse(write)
           case _ => write
         }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 5e251ceb222b..7e134d7f23eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -315,6 +315,10 @@ trait CheckAnalysis extends LookupCatalog with 
QueryErrorsBase with PlanToString
           context = u.origin.getQueryContext,
           summary = u.origin.context.summary)
 
+      case r: V2TableReference =>
+        throw SparkException.internalError(
+          s"V2TableReference should be resolved during analysis: ${r.name}")
+
       case u: UnresolvedInlineTable if 
unresolvedInlineTableContainsScalarSubquery(u) =>
         throw QueryCompilationErrors.inlineTableContainsScalarSubquery(u)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
index 15d5e4874dbb..6fe386c3c677 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala
@@ -274,8 +274,7 @@ class RelationResolution(
   private def loadRelation(ref: V2TableReference): LogicalPlan = {
     val table = ref.catalog.loadTable(ref.identifier)
     V2TableReferenceUtils.validateLoadedTable(table, ref)
-    val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
-    SubqueryAlias(tableName, ref.toRelation(table))
+    ref.toRelation(table)
   }
 
   private def adaptCachedRelation(cached: LogicalPlan, ref: V2TableReference): 
LogicalPlan = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 34e47084f656..cb6f7e588321 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -25,7 +25,6 @@ import org.apache.spark.internal.{Logging, MessageWithContext}
 import org.apache.spark.internal.LogKeys._
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.analysis.Resolver
-import org.apache.spark.sql.catalyst.analysis.V2TableReference
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -261,9 +260,6 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         val nameInCache = v2Ident.toQualifiedNameParts(catalog)
         isSameName(name, nameInCache, resolver) && (includeTimeTravel || 
timeTravelSpec.isEmpty)
 
-      case r: V2TableReference =>
-        isSameName(name, r.identifier.toQualifiedNameParts(r.catalog), 
resolver)
-
       case v: View =>
         isSameName(name, v.desc.identifier.nameParts, resolver)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to