This is an automated email from the ASF dual-hosted git repository.

HeartSaVioR pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 18e2939f6263 [SPARK-55731][SS] Assign error class for multiple event 
time columns
18e2939f6263 is described below

commit 18e2939f62631f7e0aba34b452f30df067ed2078
Author: Adithya Ajith <[email protected]>
AuthorDate: Thu May 14 07:36:31 2026 -0700

    [SPARK-55731][SS] Assign error class for multiple event time columns
    
    ### What changes were proposed in this pull request?
    This PR replaces `_LEGACY_ERROR_TEMP_3077` with a proper error class, 
`MULTIPLE_EVENT_TIME_COLUMNS`([SPARK-55731](https://issues.apache.org/jira/browse/SPARK-55731)).
    
    The error is thrown when a stateful streaming operator receives an input 
DataFrame with more than one distinct event-time column while multiple 
event-time columns are not allowed.
    
    This PR:
    - Adds `MULTIPLE_EVENT_TIME_COLUMNS` to `error-conditions.json`
    - Assigns SQLSTATE `42K09`, matching nearby event-time analysis errors
    - Updates both throw sites in `WatermarkSupport`
    - Updates the existing `EventTimeWatermarkSuite` test to verify the new 
error condition
    
    ### Why are the changes needed?
    `_LEGACY_ERROR_TEMP_3077` is a temporary legacy error class and should be 
replaced with a stable, descriptive error class.
    
    The query behavior is already correct. This change improves the structured 
error reporting by assigning a real error condition for the multiple event-time 
columns case.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    For this error case, the error condition changes from 
`_LEGACY_ERROR_TEMP_3077` to `MULTIPLE_EVENT_TIME_COLUMNS`.
    The error message remains the same, and the query behavior is unchanged. 
The error now also has SQLSTATE `42K09`.
    
    ### How was this patch tested?
    Tested by running
    `build/sbt 'sql / Test / testOnly 
org.apache.spark.sql.streaming.EventTimeWatermarkSuite -- -z "multiple event 
time columns in an input DataFrame for stateful operator is not allowed"'
    `
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes. Generated-by:  OpenAI GPT-5.5 Codex
    
    Closes #55745 from XdithyX/SPARK-55731.
    
    Authored-by: Adithya Ajith <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 common/utils/src/main/resources/error/error-conditions.json   | 11 ++++++-----
 .../streaming/operators/stateful/statefulOperators.scala      |  6 ++----
 .../apache/spark/sql/streaming/EventTimeWatermarkSuite.scala  |  1 +
 3 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 889ecf9f7b08..bbe5c3ced6f5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -5269,6 +5269,12 @@
     ],
     "sqlState" : "0A000"
   },
+  "MULTIPLE_EVENT_TIME_COLUMNS" : {
+    "message" : [
+      "More than one event time columns are available. Please ensure there is 
at most one event time column per stream. event time columns: <eventTimeCols>"
+    ],
+    "sqlState" : "42K09"
+  },
   "MULTIPLE_PRIMARY_KEYS" : {
     "message" : [
       "Multiple primary keys are defined: <columns>. Please ensure that only 
one primary key is defined for the table."
@@ -10917,11 +10923,6 @@
       "Redefining watermark is disallowed. You can set the config '<config>' 
to 'false' to restore the previous behavior. Note that multiple stateful 
operators will be disallowed."
     ]
   },
-  "_LEGACY_ERROR_TEMP_3077" : {
-    "message" : [
-      "More than one event time columns are available. Please ensure there is 
at most one event time column per stream. event time columns: <eventTimeCols>"
-    ]
-  },
   "_LEGACY_ERROR_TEMP_3079" : {
     "message" : [
       "Dynamic partition cannot be the parent of a static partition."
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
index 76b395d22504..9fcc0a506570 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala
@@ -670,8 +670,7 @@ object WatermarkSupport {
       val eventTimeColsSet = eventTimeCols.map(_.exprId).toSet
       if (eventTimeColsSet.size > 1) {
         throw new AnalysisException(
-          // TODO: [SPARK-55731] Assign error class for _LEGACY_ERROR_TEMP_3077
-          errorClass = "_LEGACY_ERROR_TEMP_3077",
+          errorClass = "MULTIPLE_EVENT_TIME_COLUMNS",
           messageParameters = Map("eventTimeCols" -> 
eventTimeCols.mkString("(", ",", ")")))
       }
 
@@ -707,8 +706,7 @@ object WatermarkSupport {
       val eventTimeColsSet = eventTimeCols.map(_._1.exprId).toSet
       if (eventTimeColsSet.size > 1) {
         throw new AnalysisException(
-          // TODO: [SPARK-55731] Assign error class for _LEGACY_ERROR_TEMP_3077
-          errorClass = "_LEGACY_ERROR_TEMP_3077",
+          errorClass = "MULTIPLE_EVENT_TIME_COLUMNS",
           messageParameters = Map("eventTimeCols" -> 
eventTimeCols.mkString("(", ",", ")")))
       }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index d26908d11477..59ac06ccadc7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -696,6 +696,7 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
           (input2, Seq(("A", 190L), ("C", 350L)))
         ),
         ExpectFailure[AnalysisException](assertFailure = ex => {
+          assert(ex.asInstanceOf[AnalysisException].getCondition === 
"MULTIPLE_EVENT_TIME_COLUMNS")
           assert(ex.getMessage.contains("More than one event time columns are 
available."))
           assert(ex.getMessage.contains(
             "Please ensure there is at most one event time column per 
stream."))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to