This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c02727093e59 [SPARK-50193][SS] Fix exception handling for validating
time modes
c02727093e59 is described below
commit c02727093e59601ca9510bd4322c33887d2be330
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Fri Nov 1 11:41:40 2024 +0900
[SPARK-50193][SS] Fix exception handling for validating time modes
### What changes were proposed in this pull request?
Fix exception handling for validating time modes
### Why are the changes needed?
We were not throwing the exception correctly when time mode validation fails
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Added unit tests. Test fails without the change
```
[info] Run completed in 6 seconds, 548 milliseconds.
[info] Total number of tests run: 3
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 3, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #48726 from anishshri-db/task/SPARK-50193.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../streaming/TransformWithStateExec.scala | 8 ++------
.../TransformWithStateVariableUtils.scala | 7 +++++++
.../sql/streaming/TransformWithStateSuite.scala | 22 +++++++++++++++++++++-
3 files changed, 30 insertions(+), 7 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
index 42cd429587f3..4f7a10f88245 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala
@@ -678,14 +678,10 @@ case class TransformWithStateExec(
private def validateTimeMode(): Unit = {
timeMode match {
case ProcessingTime =>
- if (batchTimestampMs.isEmpty) {
- StateStoreErrors.missingTimeValues(timeMode.toString)
- }
+ TransformWithStateVariableUtils.validateTimeMode(timeMode,
batchTimestampMs)
case EventTime =>
- if (eventTimeWatermarkForEviction.isEmpty) {
- StateStoreErrors.missingTimeValues(timeMode.toString)
- }
+ TransformWithStateVariableUtils.validateTimeMode(timeMode,
eventTimeWatermarkForEviction)
case _ =>
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
index 4a192b3e51c7..bc67cee57fef 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateVariableUtils.scala
@@ -25,6 +25,7 @@ import org.json4s.jackson.JsonMethods.{compact, render}
import org.apache.spark.internal.Logging
import
org.apache.spark.sql.execution.streaming.StateVariableType.StateVariableType
import org.apache.spark.sql.execution.streaming.state.StateStoreErrors
+import org.apache.spark.sql.streaming.TimeMode
/**
* This file contains utility classes and functions for managing state
variables in
@@ -47,6 +48,12 @@ object TransformWithStateVariableUtils {
def getTimerState(stateName: String): TransformWithStateVariableInfo = {
TransformWithStateVariableInfo(stateName, StateVariableType.TimerState,
ttlEnabled = false)
}
+
+ def validateTimeMode(timeMode: TimeMode, timestampOpt: Option[Long]): Unit =
{
+ if (timeMode != TimeMode.None() && timestampOpt.isEmpty) {
+ throw StateStoreErrors.missingTimeValues(timeMode.toString)
+ }
+ }
}
// Enum of possible State Variable types
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 1a7970302e5b..7d61c6fc7084 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.matchers.must.Matchers.be
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper
import org.scalatest.time.{Seconds, Span}
-import org.apache.spark.{SparkRuntimeException,
SparkUnsupportedOperationException}
+import org.apache.spark.{SparkException, SparkRuntimeException,
SparkUnsupportedOperationException}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Dataset, Encoders, Row}
import org.apache.spark.sql.catalyst.util.stringToFile
@@ -1949,4 +1949,24 @@ class TransformWithStateValidationSuite extends
StateStoreMetricsTest {
}
)
}
+
+ test("transformWithState - validate timeModes") {
+ // validation tests should pass for TimeMode.None
+ TransformWithStateVariableUtils.validateTimeMode(TimeMode.None(), None)
+ TransformWithStateVariableUtils.validateTimeMode(TimeMode.None(),
Some(10L))
+
+ // validation tests should fail for TimeMode.ProcessingTime and
TimeMode.EventTime
+ // when time values are not provided
+ val ex = intercept[SparkException] {
+
TransformWithStateVariableUtils.validateTimeMode(TimeMode.ProcessingTime(),
None)
+ }
+ assert(ex.getMessage.contains("Failed to find time values"))
+
TransformWithStateVariableUtils.validateTimeMode(TimeMode.ProcessingTime(),
Some(10L))
+
+ val ex1 = intercept[SparkException] {
+ TransformWithStateVariableUtils.validateTimeMode(TimeMode.EventTime(),
None)
+ }
+ assert(ex1.getMessage.contains("Failed to find time values"))
+ TransformWithStateVariableUtils.validateTimeMode(TimeMode.EventTime(),
Some(10L))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]