This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new a852dbc  [SPARK-31324][SS] Include stream ID in the termination 
timeout error message
a852dbc is described below

commit a852dbc3adbf1017a5b673b16f8b74704858cb26
Author: Mukul Murthy <[email protected]>
AuthorDate: Thu Apr 2 12:37:58 2020 +0900

    [SPARK-31324][SS] Include stream ID in the termination timeout error message
    
    ### What changes were proposed in this pull request?
    
    This PR (SPARK-31324) aims to include stream ID in the error thrown when a 
stream does not stop() in time.
    
    ### Why are the changes needed?
    
    https://github.com/apache/spark/pull/26771/ added a conf to set a requested 
timeout for stopping a stream, after which the stop() method throws. From 
seeing this in a production use case with several streams running, it's helpful 
to include which stream failed to stop in the error message.
    
    ### Does this PR introduce any user-facing change?
    
    If a stream times out when terminating, the error message now includes the 
stream ID.
    
    Before:
    `Stream Execution thread failed to stop within 2000 milliseconds (specified 
by spark.sql.streaming.stopTimeout). See the cause on what was being executed 
in the streaming query thread.`
    
    After:
    `Stream Execution thread for stream [id = 
8513769d-b9d2-4902-9b36-3668bd022245, runId = 
21ed8c35-9bfe-423f-853d-c022d91818bc] failed to stop within 2000 milliseconds 
(specified by spark.sql.streaming.stopTimeout). See the cause on what was being 
executed in the streaming query thread.`
    
    ### How was this patch tested?
    
    Updated existing unit test
    
    Closes #28095 from mukulmurthy/31324-id.
    
    Authored-by: Mukul Murthy <[email protected]>
    Signed-off-by: HyukjinKwon <[email protected]>
    (cherry picked from commit 34abbb677dd3867e23279ff45d411ef3010adbe7)
    Signed-off-by: HyukjinKwon <[email protected]>
---
 .../org/apache/spark/sql/execution/streaming/StreamExecution.scala  | 6 +++---
 .../src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala | 3 ++-
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 8006437..9b1951a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -451,9 +451,9 @@ abstract class StreamExecution(
       val stackTraceException = new SparkException("The stream thread was last 
executing:")
       stackTraceException.setStackTrace(queryExecutionThread.getStackTrace)
       val timeoutException = new TimeoutException(
-        s"Stream Execution thread failed to stop within $timeout milliseconds 
(specified by " +
-        s"${SQLConf.STREAMING_STOP_TIMEOUT.key}). See the cause on what was " +
-        "being executed in the streaming query thread.")
+        s"Stream Execution thread for stream $prettyIdString failed to stop 
within $timeout " +
+        s"milliseconds (specified by ${SQLConf.STREAMING_STOP_TIMEOUT.key}). 
See the cause on " +
+        s"what was being executed in the streaming query thread.")
       timeoutException.initCause(stackTraceException)
       throw timeoutException
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index b661882..5bcc4e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -1244,9 +1244,10 @@ class StreamSuite extends StreamTest {
       failAfter(60.seconds) {
         val startTime = System.nanoTime()
         withSQLConf(SQLConf.STREAMING_STOP_TIMEOUT.key -> "2000") {
-          intercept[TimeoutException] {
+          val ex = intercept[TimeoutException] {
             sq.stop()
           }
+          assert(ex.getMessage.contains(sq.id.toString))
         }
         val duration = (System.nanoTime() - startTime) / 1e6
         assert(duration >= 2000,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to