This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d824e9ee659c [SPARK-46625] CTE with Identifier clause as reference
d824e9ee659c is described below

commit d824e9ee659c82b61e54c6a08bc0dcbdf0eb9888
Author: Nebojsa Savic <[email protected]>
AuthorDate: Tue Jul 9 22:31:05 2024 +0800

    [SPARK-46625] CTE with Identifier clause as reference
    
    ### What changes were proposed in this pull request?
    DECLARE agg = 'max';
    DECLARE col = 'c1';
    DECLARE tab = 'T';
    
    WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
          T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
    SELECT IDENTIFIER(agg)(IDENTIFIER(col)) FROM IDENTIFIER(tab);
    
    -- OR
    
    WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
          T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
    SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('T');
    
    Currently we don't support Identifier clause as part of CTE reference.
    
    ### Why are the changes needed?
    Adding support for Identifier clause as part of CTE reference for both 
constant string expressions and session variables.
    
    ### Does this PR introduce _any_ user-facing change?
    It contains user facing changes in sense that identifier clause as cte 
reference will now be supported.
    
    ### How was this patch tested?
    Added tests as part of this PR.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47180 from nebojsa-db/SPARK-46625.
    
    Authored-by: Nebojsa Savic <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  3 +
 .../sql/catalyst/analysis/CTESubstitution.scala    | 13 +++-
 .../analysis/ResolveIdentifierClause.scala         | 15 ++++-
 .../spark/sql/catalyst/analysis/unresolved.scala   | 13 +++-
 .../spark/sql/catalyst/trees/TreePatterns.scala    |  1 +
 .../analyzer-results/identifier-clause.sql.out     | 73 ++++++++++++++++++++++
 .../sql-tests/inputs/identifier-clause.sql         | 16 +++++
 .../sql-tests/results/identifier-clause.sql.out    | 53 ++++++++++++++++
 8 files changed, 181 insertions(+), 6 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ba6764444bdf..95e2ddd40af1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1796,6 +1796,9 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       case s: Sort if !s.resolved || s.missingInput.nonEmpty =>
         resolveReferencesInSort(s)
 
+      case u: UnresolvedWithCTERelations =>
+        UnresolvedWithCTERelations(this.apply(u.unresolvedPlan), 
u.cteRelations)
+
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve 
${q.simpleString(conf.maxToStringFields)}")
         q.mapExpressions(resolveExpressionByPlanChildren(_, q, 
includeLastResort = true))
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
index 2982d8477fcc..ff0dbcd7ef15 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CTESubstitution.scala
@@ -23,7 +23,7 @@ import 
org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{Command, CTEInChildren, 
CTERelationDef, CTERelationRef, InsertIntoDir, LogicalPlan, ParsedStatement, 
SubqueryAlias, UnresolvedWith, WithCTE}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
-import org.apache.spark.sql.catalyst.util.TypeUtils._
+import org.apache.spark.sql.errors.DataTypeErrors.toSQLId
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.internal.SQLConf.LEGACY_CTE_PRECEDENCE_POLICY
@@ -272,7 +272,8 @@ object CTESubstitution extends Rule[LogicalPlan] {
       alwaysInline: Boolean,
       cteRelations: Seq[(String, CTERelationDef)]): LogicalPlan = {
     plan.resolveOperatorsUpWithPruning(
-        _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, 
PLAN_EXPRESSION)) {
+        _.containsAnyPattern(RELATION_TIME_TRAVEL, UNRESOLVED_RELATION, 
PLAN_EXPRESSION,
+          UNRESOLVED_IDENTIFIER)) {
       case RelationTimeTravel(UnresolvedRelation(Seq(table), _, _), _, _)
         if cteRelations.exists(r => plan.conf.resolver(r._1, table)) =>
         throw QueryCompilationErrors.timeTravelUnsupportedError(toSQLId(table))
@@ -287,6 +288,14 @@ object CTESubstitution extends Rule[LogicalPlan] {
           }
         }.getOrElse(u)
 
+      case p: PlanWithUnresolvedIdentifier =>
+        // We must look up CTE relations first when resolving 
`UnresolvedRelation`s,
+        // but we can't do it here as `PlanWithUnresolvedIdentifier` is a leaf 
node
+        // and may produce `UnresolvedRelation` later.
+        // Here we wrap it with `UnresolvedWithCTERelations` so that we can
+        // delay the CTE relations lookup after `PlanWithUnresolvedIdentifier` 
is resolved.
+        UnresolvedWithCTERelations(p, cteRelations)
+
       case other =>
         // This cannot be done in ResolveSubquery because ResolveSubquery does 
not know the CTE.
         
other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
index f04b7799e35e..e0142c445ae8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveIdentifierClause.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.expressions.{AliasHelper, EvalHelper, 
Expression}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{CTERelationRef, 
LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_IDENTIFIER
+import org.apache.spark.sql.catalyst.trees.TreePattern.{UNRESOLVED_IDENTIFIER, 
UNRESOLVED_IDENTIFIER_WITH_CTE}
 import org.apache.spark.sql.types.StringType
 
 /**
@@ -35,9 +35,18 @@ class ResolveIdentifierClause(earlyBatches: 
Seq[RuleExecutor[LogicalPlan]#Batch]
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
-    _.containsAnyPattern(UNRESOLVED_IDENTIFIER)) {
+    _.containsAnyPattern(UNRESOLVED_IDENTIFIER, 
UNRESOLVED_IDENTIFIER_WITH_CTE)) {
     case p: PlanWithUnresolvedIdentifier if p.identifierExpr.resolved =>
       
executor.execute(p.planBuilder.apply(evalIdentifierExpr(p.identifierExpr)))
+    case u @ UnresolvedWithCTERelations(p, cteRelations) =>
+      this.apply(p) match {
+        case u @ UnresolvedRelation(Seq(table), _, _) =>
+          cteRelations.find(r => plan.conf.resolver(r._1, table)).map { case 
(_, d) =>
+            // Add a `SubqueryAlias` for hint-resolving rules to match 
relation names.
+            SubqueryAlias(table, CTERelationRef(d.id, d.resolved, d.output, 
d.isStreaming))
+          }.getOrElse(u)
+        case other => other
+      }
     case other =>
       
other.transformExpressionsWithPruning(_.containsAnyPattern(UNRESOLVED_IDENTIFIER))
 {
         case e: ExpressionWithUnresolvedIdentifier if 
e.identifierExpr.resolved =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index a2cab60b392b..abb7e7956f18 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.{FunctionIdentifier, 
InternalRow, TableIden
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, 
UnaryNode}
+import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, LeafNode, 
LogicalPlan, UnaryNode}
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
@@ -65,6 +65,17 @@ case class PlanWithUnresolvedIdentifier(
   final override val nodePatterns: Seq[TreePattern] = 
Seq(UNRESOLVED_IDENTIFIER)
 }
 
+/**
+ * A logical plan placeholder which delays CTE resolution
+ * to moment when PlanWithUnresolvedIdentifier gets resolved
+ */
+case class UnresolvedWithCTERelations(
+   unresolvedPlan: LogicalPlan,
+   cteRelations: Seq[(String, CTERelationDef)])
+  extends UnresolvedLeafNode {
+  final override val nodePatterns: Seq[TreePattern] = 
Seq(UNRESOLVED_IDENTIFIER_WITH_CTE)
+}
+
 /**
  * An expression placeholder that holds the identifier clause string 
expression. It will be
  * replaced by the actual expression with the evaluated identifier string.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
index c5cc1eaf8f05..6258bd615b44 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala
@@ -151,6 +151,7 @@ object TreePattern extends Enumeration  {
   val UNRESOLVED_FUNCTION: Value = Value
   val UNRESOLVED_HINT: Value = Value
   val UNRESOLVED_WINDOW_EXPRESSION: Value = Value
+  val UNRESOLVED_IDENTIFIER_WITH_CTE: Value = Value
 
   // Unresolved Plan patterns (Alphabetically ordered)
   val UNRESOLVED_FUNC: Value = Value
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
index b3e2cd5ada95..f0bf8b883dd8 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/identifier-clause.sql.out
@@ -985,6 +985,79 @@ DropTable false, false
 +- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.t2
 
 
+-- !query
+DECLARE agg = 'max'
+-- !query analysis
+CreateVariable defaultvalueexpression(max, 'max'), false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.agg
+
+
+-- !query
+DECLARE col = 'c1'
+-- !query analysis
+CreateVariable defaultvalueexpression(c1, 'c1'), false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.col
+
+
+-- !query
+DECLARE tab = 'T'
+-- !query analysis
+CreateVariable defaultvalueexpression(T, 'T'), false
++- ResolvedIdentifier 
org.apache.spark.sql.catalyst.analysis.FakeSystemCatalog$@xxxxxxxx, session.tab
+
+
+-- !query
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER(agg)(IDENTIFIER(col)) FROM IDENTIFIER(tab)
+-- !query analysis
+WithCTE
+:- CTERelationDef xxxx, false
+:  +- SubqueryAlias S
+:     +- Project [col1#x AS c1#x, col2#x AS c2#x]
+:        +- LocalRelation [col1#x, col2#x]
+:- CTERelationDef xxxx, false
+:  +- SubqueryAlias T
+:     +- Project [col1#x AS c1#x, col2#x AS c2#x]
+:        +- LocalRelation [col1#x, col2#x]
++- Aggregate [max(c1#x) AS max(c1)#x]
+   +- SubqueryAlias T
+      +- CTERelationRef xxxx, true, [c1#x, c2#x], false
+
+
+-- !query
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('T')
+-- !query analysis
+WithCTE
+:- CTERelationDef xxxx, false
+:  +- SubqueryAlias S
+:     +- Project [col1#x AS c1#x, col2#x AS c2#x]
+:        +- LocalRelation [col1#x, col2#x]
+:- CTERelationDef xxxx, false
+:  +- SubqueryAlias T
+:     +- Project [col1#x AS c1#x, col2#x AS c2#x]
+:        +- LocalRelation [col1#x, col2#x]
++- Aggregate [max(c1#x) AS max(c1)#x]
+   +- SubqueryAlias T
+      +- CTERelationRef xxxx, true, [c1#x, c2#x], false
+
+
+-- !query
+WITH ABC(c1, c2) AS (VALUES(1, 2), (2, 3))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('A' || 'BC')
+-- !query analysis
+WithCTE
+:- CTERelationDef xxxx, false
+:  +- SubqueryAlias ABC
+:     +- Project [col1#x AS c1#x, col2#x AS c2#x]
+:        +- LocalRelation [col1#x, col2#x]
++- Aggregate [max(c1#x) AS max(c1)#x]
+   +- SubqueryAlias ABC
+      +- CTERelationRef xxxx, true, [c1#x, c2#x], false
+
+
 -- !query
 SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW 
win AS (ORDER BY c1)
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql 
b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
index 46461dcd048e..4aa8019097fd 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/identifier-clause.sql
@@ -141,6 +141,22 @@ drop view v1;
 drop table t1;
 drop table t2;
 
+-- SPARK-46625: CTE reference with identifier clause and session variables
+DECLARE agg = 'max';
+DECLARE col = 'c1';
+DECLARE tab = 'T';
+
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER(agg)(IDENTIFIER(col)) FROM IDENTIFIER(tab);
+
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('T');
+
+WITH ABC(c1, c2) AS (VALUES(1, 2), (2, 3))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('A' || 'BC');
+
 -- Not supported
 SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW 
win AS (ORDER BY c1);
 SELECT T1.c1 FROM VALUES(1) AS T1(c1) JOIN VALUES(1) AS T2(c1) USING 
(IDENTIFIER('c1'));
diff --git 
a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out 
b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
index 2aa809324a76..952fb8fdc2bd 100644
--- a/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/identifier-clause.sql.out
@@ -1115,6 +1115,59 @@ struct<>
 
 
 
+-- !query
+DECLARE agg = 'max'
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DECLARE col = 'c1'
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+DECLARE tab = 'T'
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER(agg)(IDENTIFIER(col)) FROM IDENTIFIER(tab)
+-- !query schema
+struct<max(c1):string>
+-- !query output
+c
+
+
+-- !query
+WITH S(c1, c2) AS (VALUES(1, 2), (2, 3)),
+     T(c1, c2) AS (VALUES ('a', 'b'), ('c', 'd'))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('T')
+-- !query schema
+struct<max(c1):string>
+-- !query output
+c
+
+
+-- !query
+WITH ABC(c1, c2) AS (VALUES(1, 2), (2, 3))
+SELECT IDENTIFIER('max')(IDENTIFIER('c1')) FROM IDENTIFIER('A' || 'BC')
+-- !query schema
+struct<max(c1):int>
+-- !query output
+2
+
+
 -- !query
 SELECT row_number() OVER IDENTIFIER('x.win') FROM VALUES(1) AS T(c1) WINDOW 
win AS (ORDER BY c1)
 -- !query schema


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to