This is an automated email from the ASF dual-hosted git repository.

dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b294189a1098 [SPARK-56794][SQL] Move `WindowSubstitution` Pipe Sql 
resolution logic to `AstBuilder`
b294189a1098 is described below

commit b294189a1098ef25f8e7e4ef5c3c63d1744c5262
Author: Mihailo Aleksic <[email protected]>
AuthorDate: Fri May 8 13:54:09 2026 -0700

    [SPARK-56794][SQL] Move `WindowSubstitution` Pipe Sql resolution logic to 
`AstBuilder`
    
    ### What changes were proposed in this pull request?
    Move `WindowSubstitution` Pipe Sql resolution logic to `AstBuilder`.
    
    ### Why are the changes needed?
    To simplify analysis code
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes.
    
    Closes #55760 from mihailoale-db/windowdefparserpipe.
    
    Authored-by: Mihailo Aleksic <[email protected]>
    Signed-off-by: Daniel Tenedorio <[email protected]>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala  |  9 ++-------
 .../apache/spark/sql/catalyst/parser/AstBuilder.scala  | 18 ++++++++++++++++--
 .../catalyst/plans/logical/basicLogicalOperators.scala |  3 +--
 .../spark/sql/catalyst/parser/PlanParserSuite.scala    |  6 +++---
 .../sql-tests/analyzer-results/pipe-operators.sql.out  |  4 ++--
 .../resources/sql-tests/results/pipe-operators.sql.out |  4 ++--
 .../spark/sql/execution/SparkSqlParserSuite.scala      |  5 ++---
 7 files changed, 28 insertions(+), 21 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f000c8fbb671..95b5c54f782f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -648,18 +648,13 @@ class Analyzer(
     def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUpWithPruning(
       _.containsAnyPattern(WITH_WINDOW_DEFINITION, 
UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
       // Lookup WindowSpecDefinitions. This rule works with unresolved 
children.
-      case WithWindowDefinition(windowDefinitions, child, forPipeSQL) =>
-        val resolveWindowExpression: PartialFunction[Expression, Expression] = 
{
+      case WithWindowDefinition(windowDefinitions, child) =>
+        child.resolveExpressions {
           case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) 
=>
             val windowSpecDefinition = windowDefinitions.getOrElse(windowName,
               throw 
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
             WindowExpression(c, windowSpecDefinition)
         }
-        if (forPipeSQL) {
-          child.transformExpressions(resolveWindowExpression)
-        } else {
-          child.resolveExpressions(resolveWindowExpression)
-        }
     }
   }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9db80f894da7..12446b7cc9d7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1974,7 +1974,8 @@ class AstBuilder extends DataTypeAstBuilder
   }
 
   /**
-   * Add a [[WithWindowDefinition]] operator to a logical plan.
+   * Add a [[WithWindowDefinition]] operator to a logical plan, or for pipe 
SQL, substitute
+   * [[UnresolvedWindowExpression]]s with [[WindowExpression]]s directly.
    */
   private def withWindowClause(
       ctx: WindowClauseContext,
@@ -2012,7 +2013,20 @@ class AstBuilder extends DataTypeAstBuilder
 
     // Note that mapValues creates a view instead of materialized map. We 
force materialization by
     // mapping over identity.
-    WithWindowDefinition(windowMapView.map(identity), query, forPipeSQL)
+    val windowDefinitions = windowMapView.map(identity)
+    if (forPipeSQL) {
+      // For pipe SQL, substitute window references directly. Each pipe 
operator's WINDOW clause
+      // is scoped to that operator only, and the definitions and expressions 
are in the same
+      // grammar rule, so we can resolve them here without a 
WithWindowDefinition wrapper.
+      query.transformExpressions {
+        case UnresolvedWindowExpression(child, 
WindowSpecReference(windowName)) =>
+          val spec = windowDefinitions.getOrElse(windowName,
+            throw 
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
+          WindowExpression(child, spec)
+      }
+    } else {
+      WithWindowDefinition(windowDefinitions, query)
+    }
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 8e9f264698ca..4b9186c9bf4a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -941,8 +941,7 @@ object View {
 
 case class WithWindowDefinition(
     windowDefinitions: Map[String, WindowSpecDefinition],
-    child: LogicalPlan,
-    forPipeSQL: Boolean) extends UnaryNode {
+    child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
   final override val nodePatterns: Seq[TreePattern] = 
Seq(WITH_WINDOW_DEFINITION)
   override protected def withNewChildInternal(newChild: LogicalPlan): 
WithWindowDefinition =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 6124c69fbedd..58a3b0de8a6f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -376,9 +376,9 @@ class PlanParserSuite extends AnalysisTest {
     val limitWindowClauses = Seq(
       ("", (p: LogicalPlan) => p),
       (" limit 10", (p: LogicalPlan) => p.limit(10)),
-      (" window w1 as ()", (p: LogicalPlan) => WithWindowDefinition(ws, p, 
forPipeSQL = false)),
+      (" window w1 as ()", (p: LogicalPlan) => WithWindowDefinition(ws, p)),
       (" window w1 as () limit 10", (p: LogicalPlan) =>
-        WithWindowDefinition(ws, p, forPipeSQL = false).limit(10))
+        WithWindowDefinition(ws, p).limit(10))
     )
 
     val orderSortDistrClusterClauses = Seq(
@@ -529,7 +529,7 @@ class PlanParserSuite extends AnalysisTest {
          |window w1 as (partition by a, b order by c rows between 1 preceding 
and 1 following),
          |       w2 as w1,
          |       w3 as w1""".stripMargin,
-      WithWindowDefinition(ws1, plan, forPipeSQL = false))
+      WithWindowDefinition(ws1, plan))
   }
 
   test("lateral view") {
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 84ec13334ffd..09fb8996633c 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -3834,7 +3834,7 @@ table windowTestData
 |> select cate, val, sum(val) over w, first_value(val) over w
    window w1 as (partition by cate order by val)
 -- !query analysis
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "MISSING_WINDOW_SPECIFICATION",
   "sqlState" : "42P20",
@@ -3923,7 +3923,7 @@ table windowTestData
    window w1 as (partition by cate)
    window w2 as (order by val)
 -- !query analysis
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "MISSING_WINDOW_SPECIFICATION",
   "sqlState" : "42P20",
diff --git 
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out 
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 2a0041e45e2d..858ccb7c4272 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -3480,7 +3480,7 @@ table windowTestData
 -- !query schema
 struct<>
 -- !query output
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "MISSING_WINDOW_SPECIFICATION",
   "sqlState" : "42P20",
@@ -3564,7 +3564,7 @@ table windowTestData
 -- !query schema
 struct<>
 -- !query output
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
 {
   "errorClass" : "MISSING_WINDOW_SPECIFICATION",
   "sqlState" : "42P20",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index f1acab77fbc2..5528587d2e8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -691,8 +691,7 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
                   UnresolvedFunction("max", Seq(UnresolvedAttribute("c")), 
isDistinct = false),
                   WindowSpecReference("w")), None)
             ),
-            UnresolvedRelation(TableIdentifier("testData"))),
-          forPipeSQL = false
+            UnresolvedRelation(TableIdentifier("testData")))
         ),
         ioSchema))
 
@@ -1035,7 +1034,7 @@ class SparkSqlParserSuite extends AnalysisTest with 
SharedSparkSession {
     checkAggregate("SELECT a, b FROM t |> AGGREGATE GROUP BY b")
     checkAggregate("SELECT a, b FROM t |> AGGREGATE COUNT(*) AS result GROUP 
BY b")
     // Window
-    def checkWindow(query: String): Unit = check(query, 
Seq(WITH_WINDOW_DEFINITION))
+    def checkWindow(query: String): Unit = check(query, Seq(WINDOW_EXPRESSION))
     checkWindow(
       """
         |TABLE windowTestData


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to