This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3036f86a3716 [SPARK-55492][SQL][SS][FOLLOWUP] Clarify error message;
do not use non-local return
3036f86a3716 is described below
commit 3036f86a3716151230c05ee8921a78f735a195a4
Author: Dmytro Fedoriaka <[email protected]>
AuthorDate: Tue Mar 3 13:40:28 2026 +0900
[SPARK-55492][SQL][SS][FOLLOWUP] Clarify error message; do not use
non-local return
### What changes were proposed in this pull request?
1. Changed error message to make it more clear and suggest resolution.
2. Do not use non-local return in a closure.
This is a follow-up on https://github.com/apache/spark/pull/54254.
### Why are the changes needed?
To make error message more informative and avoid a bug-prone pattern.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #54583 from fedimser/fix-10.
Authored-by: Dmytro Fedoriaka <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 2 +-
.../spark/sql/catalyst/analysis/Analyzer.scala | 43 +++++++++++-----------
2 files changed, 23 insertions(+), 22 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 9012dc43d9c0..63f57f0315c0 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1683,7 +1683,7 @@
},
"EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN" : {
"message" : [
- "The event time column <eventExpr> must be a top-level column in the
schema."
+ "The event time in withWatermark must be a top-level column, but
'<eventExpr>' is a nested field. To use it, alias it to a top-level column in a
select before withWatermark."
],
"sqlState" : "42K09"
},
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 80cf2bb548b7..dd86c6c52cb9 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -4070,27 +4070,28 @@ object CleanupAliases extends Rule[LogicalPlan] with
AliasHelper {
*/
object ValidateEventTimeWatermarkColumn extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = {
- if (!conf.getConf(SQLConf.STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN))
{
- return plan
- }
- plan.resolveOperatorsWithPruning(
- _.containsPattern(EVENT_TIME_WATERMARK)) {
- case etw: EventTimeWatermark =>
- etw.eventTime match {
- case u: UnresolvedAttribute if u.nameParts.length > 1 =>
- // Try to resolve the multi-part name against the child output.
- // An alias-qualified column (e.g. "a.eventTime") resolves to an
Attribute,
- // while a nested struct field (e.g. "struct_col.field") resolves
to an
- // Alias(ExtractValue(...)) which is not an Attribute.
- etw.child.resolve(u.nameParts, conf.resolver) match {
- case Some(_: Attribute) => etw
- case _ =>
- etw.failAnalysis(
- errorClass = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
- messageParameters = Map("eventExpr" -> u.sql))
- }
- case _ => etw
- }
+ if (conf.getConf(SQLConf.STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN)) {
+
plan.resolveOperatorsWithPruning(_.containsPattern(EVENT_TIME_WATERMARK)) {
+ case etw: EventTimeWatermark =>
+ etw.eventTime match {
+ case u: UnresolvedAttribute if u.nameParts.length > 1 =>
+ // Try to resolve the multi-part name against the child output.
+ // An alias-qualified column (e.g. "a.eventTime") resolves to an
Attribute,
+ // while a nested struct field (e.g. "struct_col.field")
resolves to an
+ // Alias(ExtractValue(...)) which is not an Attribute.
+ etw.child.resolve(u.nameParts, conf.resolver) match {
+ case Some(_: Attribute) => etw
+ case _ =>
+ etw.failAnalysis(
+ errorClass = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+ messageParameters = Map("eventExpr" -> u.sql)
+ )
+ }
+ case _ => etw
+ }
+ }
+ } else {
+ plan
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]