This is an automated email from the ASF dual-hosted git repository.
dtenedor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b294189a1098 [SPARK-56794][SQL] Move `WindowSubstitution` Pipe Sql
resolution logic to `AstBuilder`
b294189a1098 is described below
commit b294189a1098ef25f8e7e4ef5c3c63d1744c5262
Author: Mihailo Aleksic <[email protected]>
AuthorDate: Fri May 8 13:54:09 2026 -0700
[SPARK-56794][SQL] Move `WindowSubstitution` Pipe Sql resolution logic to
`AstBuilder`
### What changes were proposed in this pull request?
Move `WindowSubstitution` Pipe Sql resolution logic to `AstBuilder`.
### Why are the changes needed?
To simplify analysis code
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Yes.
Closes #55760 from mihailoale-db/windowdefparserpipe.
Authored-by: Mihailo Aleksic <[email protected]>
Signed-off-by: Daniel Tenedorio <[email protected]>
---
.../apache/spark/sql/catalyst/analysis/Analyzer.scala | 9 ++-------
.../apache/spark/sql/catalyst/parser/AstBuilder.scala | 18 ++++++++++++++++--
.../catalyst/plans/logical/basicLogicalOperators.scala | 3 +--
.../spark/sql/catalyst/parser/PlanParserSuite.scala | 6 +++---
.../sql-tests/analyzer-results/pipe-operators.sql.out | 4 ++--
.../resources/sql-tests/results/pipe-operators.sql.out | 4 ++--
.../spark/sql/execution/SparkSqlParserSuite.scala | 5 ++---
7 files changed, 28 insertions(+), 21 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f000c8fbb671..95b5c54f782f 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -648,18 +648,13 @@ class Analyzer(
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(WITH_WINDOW_DEFINITION,
UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
// Lookup WindowSpecDefinitions. This rule works with unresolved
children.
- case WithWindowDefinition(windowDefinitions, child, forPipeSQL) =>
- val resolveWindowExpression: PartialFunction[Expression, Expression] =
{
+ case WithWindowDefinition(windowDefinitions, child) =>
+ child.resolveExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName))
=>
val windowSpecDefinition = windowDefinitions.getOrElse(windowName,
throw
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
WindowExpression(c, windowSpecDefinition)
}
- if (forPipeSQL) {
- child.transformExpressions(resolveWindowExpression)
- } else {
- child.resolveExpressions(resolveWindowExpression)
- }
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 9db80f894da7..12446b7cc9d7 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1974,7 +1974,8 @@ class AstBuilder extends DataTypeAstBuilder
}
/**
- * Add a [[WithWindowDefinition]] operator to a logical plan.
+ * Add a [[WithWindowDefinition]] operator to a logical plan, or for pipe
SQL, substitute
+ * [[UnresolvedWindowExpression]]s with [[WindowExpression]]s directly.
*/
private def withWindowClause(
ctx: WindowClauseContext,
@@ -2012,7 +2013,20 @@ class AstBuilder extends DataTypeAstBuilder
// Note that mapValues creates a view instead of materialized map. We
force materialization by
// mapping over identity.
- WithWindowDefinition(windowMapView.map(identity), query, forPipeSQL)
+ val windowDefinitions = windowMapView.map(identity)
+ if (forPipeSQL) {
+ // For pipe SQL, substitute window references directly. Each pipe
operator's WINDOW clause
+ // is scoped to that operator only, and the definitions and expressions
are in the same
+ // grammar rule, so we can resolve them here without a
WithWindowDefinition wrapper.
+ query.transformExpressions {
+ case UnresolvedWindowExpression(child,
WindowSpecReference(windowName)) =>
+ val spec = windowDefinitions.getOrElse(windowName,
+ throw
QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
+ WindowExpression(child, spec)
+ }
+ } else {
+ WithWindowDefinition(windowDefinitions, query)
+ }
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 8e9f264698ca..4b9186c9bf4a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -941,8 +941,7 @@ object View {
case class WithWindowDefinition(
windowDefinitions: Map[String, WindowSpecDefinition],
- child: LogicalPlan,
- forPipeSQL: Boolean) extends UnaryNode {
+ child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
final override val nodePatterns: Seq[TreePattern] =
Seq(WITH_WINDOW_DEFINITION)
override protected def withNewChildInternal(newChild: LogicalPlan):
WithWindowDefinition =
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 6124c69fbedd..58a3b0de8a6f 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -376,9 +376,9 @@ class PlanParserSuite extends AnalysisTest {
val limitWindowClauses = Seq(
("", (p: LogicalPlan) => p),
(" limit 10", (p: LogicalPlan) => p.limit(10)),
- (" window w1 as ()", (p: LogicalPlan) => WithWindowDefinition(ws, p,
forPipeSQL = false)),
+ (" window w1 as ()", (p: LogicalPlan) => WithWindowDefinition(ws, p)),
(" window w1 as () limit 10", (p: LogicalPlan) =>
- WithWindowDefinition(ws, p, forPipeSQL = false).limit(10))
+ WithWindowDefinition(ws, p).limit(10))
)
val orderSortDistrClusterClauses = Seq(
@@ -529,7 +529,7 @@ class PlanParserSuite extends AnalysisTest {
|window w1 as (partition by a, b order by c rows between 1 preceding
and 1 following),
| w2 as w1,
| w3 as w1""".stripMargin,
- WithWindowDefinition(ws1, plan, forPipeSQL = false))
+ WithWindowDefinition(ws1, plan))
}
test("lateral view") {
diff --git
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
index 84ec13334ffd..09fb8996633c 100644
---
a/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
+++
b/sql/core/src/test/resources/sql-tests/analyzer-results/pipe-operators.sql.out
@@ -3834,7 +3834,7 @@ table windowTestData
|> select cate, val, sum(val) over w, first_value(val) over w
window w1 as (partition by cate order by val)
-- !query analysis
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "MISSING_WINDOW_SPECIFICATION",
"sqlState" : "42P20",
@@ -3923,7 +3923,7 @@ table windowTestData
window w1 as (partition by cate)
window w2 as (order by val)
-- !query analysis
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "MISSING_WINDOW_SPECIFICATION",
"sqlState" : "42P20",
diff --git
a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
index 2a0041e45e2d..858ccb7c4272 100644
--- a/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/pipe-operators.sql.out
@@ -3480,7 +3480,7 @@ table windowTestData
-- !query schema
struct<>
-- !query output
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "MISSING_WINDOW_SPECIFICATION",
"sqlState" : "42P20",
@@ -3564,7 +3564,7 @@ table windowTestData
-- !query schema
struct<>
-- !query output
-org.apache.spark.sql.AnalysisException
+org.apache.spark.sql.catalyst.parser.ParseException
{
"errorClass" : "MISSING_WINDOW_SPECIFICATION",
"sqlState" : "42P20",
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index f1acab77fbc2..5528587d2e8f 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -691,8 +691,7 @@ class SparkSqlParserSuite extends AnalysisTest with
SharedSparkSession {
UnresolvedFunction("max", Seq(UnresolvedAttribute("c")),
isDistinct = false),
WindowSpecReference("w")), None)
),
- UnresolvedRelation(TableIdentifier("testData"))),
- forPipeSQL = false
+ UnresolvedRelation(TableIdentifier("testData")))
),
ioSchema))
@@ -1035,7 +1034,7 @@ class SparkSqlParserSuite extends AnalysisTest with
SharedSparkSession {
checkAggregate("SELECT a, b FROM t |> AGGREGATE GROUP BY b")
checkAggregate("SELECT a, b FROM t |> AGGREGATE COUNT(*) AS result GROUP
BY b")
// Window
- def checkWindow(query: String): Unit = check(query,
Seq(WITH_WINDOW_DEFINITION))
+ def checkWindow(query: String): Unit = check(query, Seq(WINDOW_EXPRESSION))
checkWindow(
"""
|TABLE windowTestData
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]