This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9677b44e7de8 [SPARK-52727][SQL] Refactor Window resolution in order to
reuse it in single-pass analyzer
9677b44e7de8 is described below
commit 9677b44e7de81e642c1b6fe0655503e9556fc3be
Author: Nikola Jovićević <[email protected]>
AuthorDate: Fri Jul 11 20:30:55 2025 +0800
[SPARK-52727][SQL] Refactor Window resolution in order to reuse it in
single-pass analyzer
### What changes were proposed in this pull request?
Refactor existing logic for `ResolveWindowFrame` and `ResolveWindowOrder`
rules by extracting their implementations into separate `WindowResolution`
object.
### Why are the changes needed?
The same repeating logic is required both in the fixed-point and
single-pass analyzer implementations, thus extracting it into an object ensures
reusable and modular structure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51433 from nikola-jovicevic-db/SPARK-52727.
Authored-by: Nikola Jovićević <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 24 +-----
.../sql/catalyst/analysis/WindowResolution.scala | 95 ++++++++++++++++++++++
2 files changed, 97 insertions(+), 22 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 78ae9c8afe46..964a9d2ef0b4 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3632,23 +3632,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
object ResolveWindowFrame extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveExpressionsWithPruning(
_.containsPattern(WINDOW_EXPRESSION), ruleId) {
- case WindowExpression(wf: FrameLessOffsetWindowFunction,
- WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)) if wf.frame != f
=>
- throw
QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
- case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f:
SpecifiedWindowFrame))
- if wf.frame != UnspecifiedFrame && wf.frame != f =>
- throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f,
wf.frame)
- case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _,
UnspecifiedFrame))
- if wf.frame != UnspecifiedFrame =>
- WindowExpression(wf, s.copy(frameSpecification = wf.frame))
- case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o,
UnspecifiedFrame))
- if e.resolved =>
- val frame = if (o.nonEmpty) {
- SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
- } else {
- SpecifiedWindowFrame(RowFrame, UnboundedPreceding,
UnboundedFollowing)
- }
- we.copy(windowSpec = s.copy(frameSpecification = frame))
+ case we => WindowResolution.resolveFrame(we)
}
}
@@ -3658,11 +3642,7 @@ class Analyzer(override val catalogManager:
CatalogManager) extends RuleExecutor
object ResolveWindowOrder extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveExpressionsWithPruning(
_.containsPattern(WINDOW_EXPRESSION), ruleId) {
- case WindowExpression(wf: WindowFunction, spec) if
spec.orderSpec.isEmpty =>
- throw
QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)
- case WindowExpression(rank: RankLike, spec) if spec.resolved =>
- val order = spec.orderSpec.map(_.child)
- WindowExpression(rank.withOrder(order), spec)
+ case we => WindowResolution.resolveOrder(we)
}
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
new file mode 100644
index 000000000000..9a48c24d709c
--- /dev/null
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/WindowResolution.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.expressions.{
+ CurrentRow,
+ Expression,
+ FrameLessOffsetWindowFunction,
+ RangeFrame,
+ RankLike,
+ RowFrame,
+ SpecifiedWindowFrame,
+ UnboundedFollowing,
+ UnboundedPreceding,
+ UnspecifiedFrame,
+ WindowExpression,
+ WindowFunction,
+ WindowSpecDefinition
+}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * Utility object for resolving [[WindowExpression]].
+ *
+ * It ensures that window frame defintions and order specs are consistent
between the
+ * [[WindowFunction]] and [[WindowSpecDefinition]], throwing errors if
configurations are
+ * incompatible or missing.
+ */
+object WindowResolution {
+
+ /**
+ * Validates the window frame of a [[WindowExpression]].
+ *
+ * It enforces that the frame in [[WindowExpression.windowFunction]] matches
the frame
+ * in [[WindowExpression.windowSpec]], alterantively it provides a default
frame when it
+ * is unspecified.
+ */
+ def resolveFrame(expression: Expression): Expression = expression match {
+ case WindowExpression(
+ wf: FrameLessOffsetWindowFunction,
+ WindowSpecDefinition(_, _, f: SpecifiedWindowFrame)
+ ) if wf.frame != f =>
+ throw QueryCompilationErrors.cannotSpecifyWindowFrameError(wf.prettyName)
+
+ case WindowExpression(wf: WindowFunction, WindowSpecDefinition(_, _, f:
SpecifiedWindowFrame))
+ if wf.frame != UnspecifiedFrame && wf.frame != f =>
+ throw QueryCompilationErrors.windowFrameNotMatchRequiredFrameError(f,
wf.frame)
+
+ case WindowExpression(wf: WindowFunction, s @ WindowSpecDefinition(_, _,
UnspecifiedFrame))
+ if wf.frame != UnspecifiedFrame =>
+ WindowExpression(wf, s.copy(frameSpecification = wf.frame))
+
+ case we @ WindowExpression(e, s @ WindowSpecDefinition(_, o,
UnspecifiedFrame)) if e.resolved =>
+ val frame = if (o.nonEmpty) {
+ SpecifiedWindowFrame(RangeFrame, UnboundedPreceding, CurrentRow)
+ } else {
+ SpecifiedWindowFrame(RowFrame, UnboundedPreceding, UnboundedFollowing)
+ }
+ we.copy(windowSpec = s.copy(frameSpecification = frame))
+
+ case e => e
+ }
+
+ /**
+ * Ensures that [[WindowExpression.windowSpec.orderSpec]] is not missing.
+ *
+ * In case of [[RankLike]] window functions, it attaches the resolved order
to the
+ * function to finalize it.
+ */
+ def resolveOrder(expression: Expression): Expression = expression match {
+ case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty
=>
+ throw
QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)
+
+ case WindowExpression(rank: RankLike, spec) if spec.resolved =>
+ val order = spec.orderSpec.map(_.child)
+ WindowExpression(rank.withOrder(order), spec)
+
+ case e => e
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]