This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 34ab617083c1 [SPARK-55492][SQL][SS] Validate that eventTime in
withWatermark is top-level column
34ab617083c1 is described below
commit 34ab617083c17c0ccdff720554e8eaad0a663061
Author: Dmytro Fedoriaka <[email protected]>
AuthorDate: Fri Feb 20 11:15:07 2026 +0900
[SPARK-55492][SQL][SS] Validate that eventTime in withWatermark is
top-level column
### What changes were proposed in this pull request?
Validate that eventTime in withWatermark is top-level column.
The validation is behind a config flag
"spark.sql.streaming.validateEventTimeWatermarkColumn", which is true by
default. It can be set to "false" to disable this validation.
### Why are the changes needed?
Currently passing nested field to withWatermark results in an internal
error.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a unit test.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: claude-4.6-opus-high
Closes #54254 from fedimser/treenode-fix.
Lead-authored-by: Dmytro Fedoriaka <[email protected]>
Co-authored-by: Jungtaek Lim <[email protected]>
Co-authored-by: Dima Fedoriaka <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +++
.../spark/sql/catalyst/analysis/Analyzer.scala | 35 ++++++++++++++++
.../org/apache/spark/sql/internal/SQLConf.scala | 7 ++++
.../streaming/ClientStreamingQuerySuite.scala | 14 +++++++
.../sql/streaming/EventTimeWatermarkSuite.scala | 46 +++++++++++++++++++++-
5 files changed, 107 insertions(+), 1 deletion(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 24a12248ce72..188106ee3ddb 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1675,6 +1675,12 @@
],
"sqlState" : "42K09"
},
+ "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN" : {
+ "message" : [
+ "The event time column <eventExpr> must be a top-level column in the
schema."
+ ],
+ "sqlState" : "42K09"
+ },
"EXCEED_LIMIT_LENGTH" : {
"message" : [
"Exceeds char/varchar type length limitation: <limit>."
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 979bb6f9cff2..0cddb847c349 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -450,6 +450,7 @@ class Analyzer(
DeduplicateRelations ::
ResolveCollationName ::
ResolveMergeIntoSchemaEvolution ::
+ ValidateEventTimeWatermarkColumn ::
new ResolveReferences(catalogManager) ::
// Please do not insert any other rules in between. See the TODO
comments in rule
// ResolveLateralColumnAliasReference for more details.
@@ -4019,6 +4020,40 @@ object CleanupAliases extends Rule[LogicalPlan] with
AliasHelper {
}
}
+/**
+ * Validates that the event time column in EventTimeWatermark is a top-level
column reference
+ * (e.g. a single name), not a nested field (e.g. "struct_col.field").
+ *
+ * Multi-part names are allowed when they resolve to a top-level attribute via
a table alias
+ * (e.g. "alias.column"), but rejected when they resolve to a nested struct
field extraction.
+ */
+object ValidateEventTimeWatermarkColumn extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ if (!conf.getConf(SQLConf.STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN))
{
+ return plan
+ }
+ plan.resolveOperatorsWithPruning(
+ _.containsPattern(EVENT_TIME_WATERMARK)) {
+ case etw: EventTimeWatermark =>
+ etw.eventTime match {
+ case u: UnresolvedAttribute if u.nameParts.length > 1 =>
+ // Try to resolve the multi-part name against the child output.
+ // An alias-qualified column (e.g. "a.eventTime") resolves to an
Attribute,
+ // while a nested struct field (e.g. "struct_col.field") resolves
to an
+ // Alias(ExtractValue(...)) which is not an Attribute.
+ etw.child.resolve(u.nameParts, conf.resolver) match {
+ case Some(_: Attribute) => etw
+ case _ =>
+ etw.failAnalysis(
+ errorClass = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+ messageParameters = Map("eventExpr" -> u.sql))
+ }
+ case _ => etw
+ }
+ }
+ }
+}
+
/**
* Ignore event time watermark in batch query, which is only supported in
Structured Streaming.
*/
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f17199547665..495ece401949 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3460,6 +3460,13 @@ object SQLConf {
"Valid values are 'min' and 'max'")
.createWithDefault("min") // must be same as
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
+ val STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN =
+ buildConf("spark.sql.streaming.validateEventTimeWatermarkColumn")
+ .doc("When true, check that eventTime in withWatermark is a top-level
column.")
+ .version("4.2.0")
+ .booleanConf
+ .createWithDefault(true)
+
val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
.internal()
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
index ac6a35a9db58..c8a25652dacb 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
@@ -846,6 +846,20 @@ class ClientStreamingQuerySuite extends QueryTest with
RemoteSparkSession with L
}
}
}
+
+ test("withWatermark fails for nested column") {
+ val df = spark.sql(
+ "SELECT 1 as id, struct(to_timestamp('2024-01-01 10:00:00') as
timestamp, 'val1' as value) " +
+ "as nested_struct")
+ val e = intercept[AnalysisException] {
+ df.withWatermark("nested_struct.timestamp", "0 seconds").schema
+ }
+ checkError(
+ e,
+ condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+ parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
+ matchPVals = true)
+ }
}
class TestForeachWriter[T] extends ForeachWriter[T] {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index db28ef4fc35a..d26908d11477 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -35,7 +35,7 @@ import
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
import
org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeStats,
StateStoreSaveExec}
import org.apache.spark.sql.execution.streaming.runtime._
import org.apache.spark.sql.execution.streaming.sources.MemorySink
-import org.apache.spark.sql.functions.{count, expr, timestamp_seconds, window}
+import org.apache.spark.sql.functions.{count, expr, struct, timestamp_seconds,
to_timestamp, window}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.tags.SlowSQLTest
@@ -131,6 +131,50 @@ class EventTimeWatermarkSuite extends StreamTest with
BeforeAndAfter with Matche
assert(e.getMessage contains "int")
}
+ test("error on nested column") {
+ // Cannot past nested column as `eventTime` to `withWatermark`.
+ val e = intercept[AnalysisException] {
+ Seq((1, ("2024-01-01 10:00:00", "val1")))
+ .toDF("id", "data")
+ .select(
+ $"id",
+ struct(
+ to_timestamp($"data._1").as("timestamp"),
+ $"data._2".as("value")
+ ).as("nested_struct")
+ )
+ .withWatermark("nested_struct.timestamp", "0 seconds")
+ .schema
+ }
+ checkError(
+ e,
+ condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+ parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
+ matchPVals = true)
+ }
+
+ test("withWatermark should work with alias-qualified column name") {
+ // When a DataFrame has an alias, referencing the event time column via
+ // "alias.columnName" should be allowed because it still refers to a
top-level column.
+ val inputData = MemoryStream[Int]
+ val df = inputData.toDF()
+ .withColumn("eventTime", timestamp_seconds($"value"))
+ .alias("a")
+ .withWatermark("a.eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as Symbol("window"))
+ .agg(count("*") as Symbol("count"))
+ .select($"window".getField("start").cast("long").as[Long],
$"count".as[Long])
+
+ testStream(df)(
+ AddData(inputData, 15),
+ CheckAnswer(),
+ AddData(inputData, 10, 12, 14),
+ CheckAnswer(),
+ AddData(inputData, 25),
+ CheckAnswer((10, 3))
+ )
+ }
+
test("event time and watermark metrics") {
// No event time metrics when there is no watermarking
val inputData1 = MemoryStream[Int]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]