This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c02727093e59 [SPARK-50193][SS] Fix exception handling for validating 
time modes
c02727093e59 is described below

commit c02727093e59601ca9510bd4322c33887d2be330
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Fri Nov 1 11:41:40 2024 +0900

    [SPARK-50193][SS] Fix exception handling for validating time modes
    
    ### What changes were proposed in this pull request?
    Fix exception handling for validating time modes
    
    ### Why are the changes needed?
    We were not throwing the exception correctly when time mode validation fails
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests. Test fails without the change
    
    ```
    [info] Run completed in 6 seconds, 548 milliseconds.
    [info] Total number of tests run: 3
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #48726 from anishshri-db/task/SPARK-50193.
    
    Authored-by: Anish Shrigondekar <[email protected]>
    Signed-off-by: Jungtaek Lim <[email protected]>
---
 .../streaming/TransformWithStateExec.scala         |  8 ++------
 .../TransformWithStateVariableUtils.scala          |  7 +++++++
 .../sql/streaming/TransformWithStateSuite.scala    | 22 +++++++++++++++++++++-
 3 files changed, 30 insertions(+), 7 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 42cd429587f3..4f7a10f88245 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -678,14 +678,10 @@ case class TransformWithStateExec(
   private def validateTimeMode(): Unit = {
     timeMode match {
       case ProcessingTime =>
-        if (batchTimestampMs.isEmpty) {
-          StateStoreErrors.missingTimeValues(timeMode.toString)
-        }
+        TransformWithStateVariableUtils.validateTimeMode(timeMode, 
batchTimestampMs)
 
       case EventTime =>
-        if (eventTimeWatermarkForEviction.isEmpty) {
-          StateStoreErrors.missingTimeValues(timeMode.toString)
-        }
+        TransformWithStateVariableUtils.validateTimeMode(timeMode, 
eventTimeWatermarkForEviction)
 
       case _ =>
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
index 4a192b3e51c7..bc67cee57fef 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
@@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{compact, render}
 import org.apache.spark.internal.Logging
 import 
org.apache.spark.sql.execution.streaming.StateVariableType.StateVariableType
 import org.apache.spark.sql.execution.streaming.state.StateStoreErrors
+import org.apache.spark.sql.streaming.TimeMode
 
 /**
  * This file contains utility classes and functions for managing state 
variables in
@@ -47,6 +48,12 @@ object TransformWithStateVariableUtils {
   def getTimerState(stateName: String): TransformWithStateVariableInfo = {
     TransformWithStateVariableInfo(stateName, StateVariableType.TimerState, 
ttlEnabled = false)
   }
+
+  def validateTimeMode(timeMode: TimeMode, timestampOpt: Option[Long]): Unit = 
{
+    if (timeMode != TimeMode.None() && timestampOpt.isEmpty) {
+      throw StateStoreErrors.missingTimeValues(timeMode.toString)
+    }
+  }
 }
 
 // Enum of possible State Variable types
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 1a7970302e5b..7d61c6fc7084 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.matchers.must.Matchers.be
 import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
 import org.scalatest.time.{Seconds, Span}
 
-import org.apache.spark.{SparkRuntimeException, 
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkRuntimeException, 
SparkUnsupportedOperationException}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, Encoders, Row}
 import org.apache.spark.sql.catalyst.util.stringToFile
@@ -1949,4 +1949,24 @@ class TransformWithStateValidationSuite extends 
StateStoreMetricsTest {
       }
     )
   }
+
+  test("transformWithState - validate timeModes") {
+    // validation tests should pass for TimeMode.None
+    TransformWithStateVariableUtils.validateTimeMode(TimeMode.None(), None)
+    TransformWithStateVariableUtils.validateTimeMode(TimeMode.None(), 
Some(10L))
+
+    // validation tests should fail for TimeMode.ProcessingTime and 
TimeMode.EventTime
+    // when time values are not provided
+    val ex = intercept[SparkException] {
+      
TransformWithStateVariableUtils.validateTimeMode(TimeMode.ProcessingTime(), 
None)
+    }
+    assert(ex.getMessage.contains("Failed to find time values"))
+    
TransformWithStateVariableUtils.validateTimeMode(TimeMode.ProcessingTime(), 
Some(10L))
+
+    val ex1 = intercept[SparkException] {
+      TransformWithStateVariableUtils.validateTimeMode(TimeMode.EventTime(), 
None)
+    }
+    assert(ex1.getMessage.contains("Failed to find time values"))
+    TransformWithStateVariableUtils.validateTimeMode(TimeMode.EventTime(), 
Some(10L))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to