This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new a852dbc [SPARK-31324][SS] Include stream ID in the termination
timeout error message
a852dbc is described below
commit a852dbc3adbf1017a5b673b16f8b74704858cb26
Author: Mukul Murthy <[email protected]>
AuthorDate: Thu Apr 2 12:37:58 2020 +0900
[SPARK-31324][SS] Include stream ID in the termination timeout error message
### What changes were proposed in this pull request?
This PR (SPARK-31324) aims to include stream ID in the error thrown when a
stream does not stop() in time.
### Why are the changes needed?
https://github.com/apache/spark/pull/26771/ added a conf to set a requested
timeout for stopping a stream, after which the stop() method throws. From
seeing this in a production use case with several streams running, it's helpful
to include which stream failed to stop in the error message.
### Does this PR introduce any user-facing change?
If a stream times out when terminating, the error message now includes the
stream ID.
Before:
`Stream Execution thread failed to stop within 2000 milliseconds (specified
by spark.sql.streaming.stopTimeout). See the cause on what was being executed
in the streaming query thread.`
After:
`Stream Execution thread for stream [id =
8513769d-b9d2-4902-9b36-3668bd022245, runId =
21ed8c35-9bfe-423f-853d-c022d91818bc] failed to stop within 2000 milliseconds
(specified by spark.sql.streaming.stopTimeout). See the cause on what was being
executed in the streaming query thread.`
### How was this patch tested?
Updated existing unit test
Closes #28095 from mukulmurthy/31324-id.
Authored-by: Mukul Murthy <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 34abbb677dd3867e23279ff45d411ef3010adbe7)
Signed-off-by: HyukjinKwon <[email protected]>
---
.../org/apache/spark/sql/execution/streaming/StreamExecution.scala | 6 +++---
.../src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 3 ++-
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8006437..9b1951a 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -451,9 +451,9 @@ abstract class StreamExecution(
val stackTraceException = new SparkException("The stream thread was last
executing:")
stackTraceException.setStackTrace(queryExecutionThread.getStackTrace)
val timeoutException = new TimeoutException(
- s"Stream Execution thread failed to stop within $timeout milliseconds
(specified by " +
- s"${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on what was " +
- "being executed in the streaming query thread.")
+ s"Stream Execution thread for stream $prettyIdString failed to stop
within $timeout " +
+ s"milliseconds (specified by ${SQLConf.STREAMING_STOP_TIMEOUT.key}).
See the cause on " +
+ s"what was being executed in the streaming query thread.")
timeoutException.initCause(stackTraceException)
throw timeoutException
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b661882..5bcc4e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -1244,9 +1244,10 @@ class StreamSuite extends StreamTest {
failAfter(60.seconds) {
val startTime = System.nanoTime()
withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "2000") {
- intercept[TimeoutException] {
+ val ex = intercept[TimeoutException] {
sq.stop()
}
+ assert(ex.getMessage.contains(sq.id.toString))
}
val duration = (System.nanoTime() - startTime) / 1e6
assert(duration >= 2000,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]