This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 34ab617083c1 [SPARK-55492][SQL][SS] Validate that eventTime in 
withWatermark is top-level column
34ab617083c1 is described below

commit 34ab617083c17c0ccdff720554e8eaad0a663061
Author: Dmytro Fedoriaka <[email protected]>
AuthorDate: Fri Feb 20 11:15:07 2026 +0900

    [SPARK-55492][SQL][SS] Validate that eventTime in withWatermark is 
top-level column
    
    ### What changes were proposed in this pull request?
    
    Validate that eventTime in withWatermark is top-level column.
    
    The validation is behind a config flag 
"spark.sql.streaming.validateEventTimeWatermarkColumn", which is true by 
default. It can be set to "false" to disable this validation.
    
    ### Why are the changes needed?
    
    Currently passing nested field to withWatermark results in an internal 
error.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: claude-4.6-opus-high
    
    Closes #54254 from fedimser/treenode-fix.
    
    Lead-authored-by: Dmytro Fedoriaka <[email protected]>
    Co-authored-by: Jungtaek Lim <[email protected]>
    Co-authored-by: Dima Fedoriaka <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  6 +++
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 35 ++++++++++++++++
 .../org/apache/spark/sql/internal/SQLConf.scala    |  7 ++++
 .../streaming/ClientStreamingQuerySuite.scala      | 14 +++++++
 .../sql/streaming/EventTimeWatermarkSuite.scala    | 46 +++++++++++++++++++++-
 5 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 24a12248ce72..188106ee3ddb 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -1675,6 +1675,12 @@
     ],
     "sqlState" : "42K09"
   },
+  "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN" : {
+    "message" : [
+      "The event time column <eventExpr> must be a top-level column in the 
schema."
+    ],
+    "sqlState" : "42K09"
+  },
   "EXCEED_LIMIT_LENGTH" : {
     "message" : [
       "Exceeds char/varchar type length limitation: <limit>."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 979bb6f9cff2..0cddb847c349 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -450,6 +450,7 @@ class Analyzer(
       DeduplicateRelations ::
       ResolveCollationName ::
       ResolveMergeIntoSchemaEvolution ::
+      ValidateEventTimeWatermarkColumn ::
       new ResolveReferences(catalogManager) ::
       // Please do not insert any other rules in between. See the TODO 
comments in rule
       // ResolveLateralColumnAliasReference for more details.
@@ -4019,6 +4020,40 @@ object CleanupAliases extends Rule[LogicalPlan] with 
AliasHelper {
   }
 }
 
+/**
+ * Validates that the event time column in EventTimeWatermark is a top-level 
column reference
+ * (e.g. a single name), not a nested field (e.g. "struct_col.field").
+ *
+ * Multi-part names are allowed when they resolve to a top-level attribute via 
a table alias
+ * (e.g. "alias.column"), but rejected when they resolve to a nested struct 
field extraction.
+ */
+object ValidateEventTimeWatermarkColumn extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    if (!conf.getConf(SQLConf.STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN)) 
{
+      return plan
+    }
+    plan.resolveOperatorsWithPruning(
+      _.containsPattern(EVENT_TIME_WATERMARK)) {
+      case etw: EventTimeWatermark =>
+        etw.eventTime match {
+          case u: UnresolvedAttribute if u.nameParts.length > 1 =>
+            // Try to resolve the multi-part name against the child output.
+            // An alias-qualified column (e.g. "a.eventTime") resolves to an 
Attribute,
+            // while a nested struct field (e.g. "struct_col.field") resolves 
to an
+            // Alias(ExtractValue(...)) which is not an Attribute.
+            etw.child.resolve(u.nameParts, conf.resolver) match {
+              case Some(_: Attribute) => etw
+              case _ =>
+                etw.failAnalysis(
+                  errorClass = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+                  messageParameters = Map("eventExpr" -> u.sql))
+            }
+          case _ => etw
+        }
+    }
+  }
+}
+
 /**
  * Ignore event time watermark in batch query, which is only supported in 
Structured Streaming.
  */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index f17199547665..495ece401949 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3460,6 +3460,13 @@ object SQLConf {
           "Valid values are 'min' and 'max'")
       .createWithDefault("min") // must be same as 
MultipleWatermarkPolicy.DEFAULT_POLICY_NAME
 
+  val STREAMING_VALIDATE_EVENT_TIME_WATERMARK_COLUMN =
+    buildConf("spark.sql.streaming.validateEventTimeWatermarkColumn")
+      .doc("When true, check that eventTime in withWatermark is a top-level 
column.")
+      .version("4.2.0")
+      .booleanConf
+      .createWithDefault(true)
+
   val OBJECT_AGG_SORT_BASED_FALLBACK_THRESHOLD =
     buildConf("spark.sql.objectHashAggregate.sortBased.fallbackThreshold")
       .internal()
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
index ac6a35a9db58..c8a25652dacb 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/streaming/ClientStreamingQuerySuite.scala
@@ -846,6 +846,20 @@ class ClientStreamingQuerySuite extends QueryTest with 
RemoteSparkSession with L
       }
     }
   }
+
+  test("withWatermark fails for nested column") {
+    val df = spark.sql(
+      "SELECT 1 as id, struct(to_timestamp('2024-01-01 10:00:00') as 
timestamp, 'val1' as value) " +
+        "as nested_struct")
+    val e = intercept[AnalysisException] {
+      df.withWatermark("nested_struct.timestamp", "0 seconds").schema
+    }
+    checkError(
+      e,
+      condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+      parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
+      matchPVals = true)
+  }
 }
 
 class TestForeachWriter[T] extends ForeachWriter[T] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index db28ef4fc35a..d26908d11477 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.catalyst.util.DateTimeTestUtils.UTC
 import 
org.apache.spark.sql.execution.streaming.operators.stateful.{EventTimeStats, 
StateStoreSaveExec}
 import org.apache.spark.sql.execution.streaming.runtime._
 import org.apache.spark.sql.execution.streaming.sources.MemorySink
-import org.apache.spark.sql.functions.{count, expr, timestamp_seconds, window}
+import org.apache.spark.sql.functions.{count, expr, struct, timestamp_seconds, 
to_timestamp, window}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.OutputMode._
 import org.apache.spark.tags.SlowSQLTest
@@ -131,6 +131,50 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     assert(e.getMessage contains "int")
   }
 
+  test("error on nested column") {
+    // Cannot past nested column as `eventTime` to `withWatermark`.
+    val e = intercept[AnalysisException] {
+      Seq((1, ("2024-01-01 10:00:00", "val1")))
+        .toDF("id", "data")
+        .select(
+          $"id",
+          struct(
+            to_timestamp($"data._1").as("timestamp"),
+            $"data._2".as("value")
+          ).as("nested_struct")
+        )
+        .withWatermark("nested_struct.timestamp", "0 seconds")
+        .schema
+    }
+    checkError(
+      e,
+      condition = "EVENT_TIME_MUST_BE_TOP_LEVEL_COLUMN",
+      parameters = Map("eventExpr" -> ".*nested_struct.*timestamp.*"),
+      matchPVals = true)
+  }
+
+  test("withWatermark should work with alias-qualified column name") {
+    // When a DataFrame has an alias, referencing the event time column via
+    // "alias.columnName" should be allowed because it still refers to a 
top-level column.
+    val inputData = MemoryStream[Int]
+    val df = inputData.toDF()
+      .withColumn("eventTime", timestamp_seconds($"value"))
+      .alias("a")
+      .withWatermark("a.eventTime", "10 seconds")
+      .groupBy(window($"eventTime", "5 seconds") as Symbol("window"))
+      .agg(count("*") as Symbol("count"))
+      .select($"window".getField("start").cast("long").as[Long], 
$"count".as[Long])
+
+    testStream(df)(
+      AddData(inputData, 15),
+      CheckAnswer(),
+      AddData(inputData, 10, 12, 14),
+      CheckAnswer(),
+      AddData(inputData, 25),
+      CheckAnswer((10, 3))
+    )
+  }
+
   test("event time and watermark metrics") {
     // No event time metrics when there is no watermarking
     val inputData1 = MemoryStream[Int]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to