This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new c35ca7cab62 [SPARK-45539][SS] Add assert and log to indicate watermark
definition is required for streaming aggregation queries in append mode
c35ca7cab62 is described below
commit c35ca7cab6267c3ea0b74631afac6203059207ae
Author: Anish Shrigondekar <[email protected]>
AuthorDate: Mon Oct 16 14:34:12 2023 +0900
[SPARK-45539][SS] Add assert and log to indicate watermark definition is
required for streaming aggregation queries in append mode
### What changes were proposed in this pull request?
Add assert and log to indicate watermark definition is required for
streaming aggregation queries in append mode
### Why are the changes needed?
We have a check for ensuring that watermark attributes are specified in
append mode based on the UnsupportedOperationChecker. However, in some cases we
got report where user hit this stack trace:
```
org.apache.spark.SparkException: Exception thrown in awaitResult: Job
aborted due to stage failure: Task 3 in stage 32.0 failed 4 times, most recent
failure: Lost task 3.3 in stage 32.0 (TID 606) (10.5.71.29 executor 0):
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$9(statefulOperators.scala:472)
at
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:708)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs(statefulOperators.scala:145)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter.timeTakenMs$(statefulOperators.scala:145)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:414)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.$anonfun$doExecute$5(statefulOperators.scala:470)
at
org.apache.spark.sql.execution.streaming.state.package$StateStoreOps.$anonfun$mapPartitionsWithStateStore$1(package.scala:63)
at
org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:127)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
```
In this case, the reason for failure is not immediately clear. Hence adding
an assert and log message to indicate why the query failed on the executor.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #43370 from anishshri-db/task/SPARK-45539.
Authored-by: Anish Shrigondekar <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../apache/spark/sql/execution/streaming/statefulOperators.scala | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 77645378f22..cb01fa9ff6d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -520,6 +520,12 @@ case class StateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
+ assert(watermarkPredicateForDataForLateEvents.isDefined,
+ "Watermark needs to be defined for streaming aggregation query
in append mode")
+
+ assert(watermarkPredicateForKeysForEviction.isDefined,
+ "Watermark needs to be defined for streaming aggregation query
in append mode")
+
allUpdatesTimeMs += timeTakenMs {
val filteredIter = applyRemovingRowsOlderThanWatermark(iter,
watermarkPredicateForDataForLateEvents.get)
@@ -777,6 +783,9 @@ case class SessionWindowStateStoreSaveExec(
// Update and output only rows being evicted from the StateStore
// Assumption: watermark predicates must be non-empty if append mode
is allowed
case Some(Append) =>
+ assert(watermarkPredicateForDataForEviction.isDefined,
+ "Watermark needs to be defined for session window query in
append mode")
+
allUpdatesTimeMs += timeTakenMs {
putToStore(iter, store)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]