This is an automated email from the ASF dual-hosted git repository.

ashrigondekar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b9848ac61a71 [SPARK-53561][SS] Catch Interruption Exception in 
TransformWithStateInPySparkStateServer during outputStream.flush to avoid the 
worker crash
b9848ac61a71 is described below

commit b9848ac61a71161730828e69e410402025269473
Author: huanliwang-db <huanli.w...@databricks.com>
AuthorDate: Thu Sep 11 17:19:39 2025 -0700

    [SPARK-53561][SS] Catch Interruption Exception in 
TransformWithStateInPySparkStateServer during outputStream.flush to avoid the 
worker crash
    
    When the `query.stop` is invoked, we first close the the state store 
running in the query main thread and then close the TWS state server thread as 
part of the [task completion listener 
event](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231).
 So it's possible that query main thread has already closed the state store 
while in TWS  [...]
    ```
    
    ERROR TransformWithStateInPySparkStateServer: Error reading message: 
[STATE_STORE_OPERATION_OUT_OF_ORDER]
    Streaming stateful operator attempted to access state store out of order. 
This is a bug, please retry. error_msg=Cannot
    update after ABORTED SQLSTATE: XXKST
    
org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder:
    [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted 
to access state store out of order.
    This is a bug, please retry. error_msg=Cannot update after ABORTED 
SQLSTATE: XXKST
    ```
    
    This exception is caught 
[here](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185).
 At this time, the TWS client in python worker may have already closed, then 
[outputStream.flush()](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/T
 [...]
    
    ### What changes were proposed in this pull request?
    
    Handle the interruption exception properly in 
TransformWithStateInPySparkStateServer to avoid the worker crash
    
    ### Why are the changes needed?
    
    worker will be crashed when `query.stop` is invoked for TWS python query. 
This is not desired state for isolation.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Built a image and ran the test manually and confirmed `query.stop` would 
not crash the worker
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #52318 from huanliwang-db/huanliwang-db/catch-interrupted-exception.
    
    Authored-by: huanliwang-db <huanli.w...@databricks.com>
    Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
---
 .../TransformWithStateInPySparkStateServer.scala   | 27 ++++++++++++++++++----
 1 file changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
index e2633dbbb131..f5fec2f85dff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.execution.python.streaming
 
-import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream, EOFException}
-import java.nio.channels.{Channels, ServerSocketChannel}
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream, EOFException, InterruptedIOException}
+import java.nio.channels.{Channels, ClosedByInterruptException, 
ServerSocketChannel}
 import java.time.Duration
 
 import scala.collection.mutable
@@ -172,15 +172,32 @@ class TransformWithStateInPySparkStateServer(
           logWarning(log"No more data to read from the socket")
           
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
           return
-        case _: InterruptedException =>
+        case _: InterruptedException | _: InterruptedIOException |
+             _: ClosedByInterruptException =>
+          // InterruptedIOException - thrown when an I/O operation is 
interrupted
+          // ClosedByInterruptException - thrown when an I/O operation upon a 
channel is interrupted
           logInfo(log"Thread interrupted, shutting down state server")
           Thread.currentThread().interrupt()
           
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
           return
         case e: Exception =>
           logError(log"Error reading message: ${MDC(LogKeys.ERROR, 
e.getMessage)}", e)
-          sendResponse(1, e.getMessage)
-          outputStream.flush()
+          try {
+            sendResponse(1, e.getMessage)
+            outputStream.flush()
+          } catch {
+            // InterruptedIOException - thrown when an I/O operation is 
interrupted
+            // ClosedByInterruptException - thrown when an I/O operation upon a
+            //                              channel is interrupted
+            case _: InterruptedException | _: InterruptedIOException |
+                 _: ClosedByInterruptException =>
+              logInfo(log"Thread is interrupted during flushing error 
response, " +
+                log"shutting down state server")
+            case e: Throwable =>
+              logError(log"Failed to flush with errorMsg=" +
+                log"${MDC(LogKeys.ERROR, e.getMessage)}", e)
+              throw e
+          }
           
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
           return
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to