This is an automated email from the ASF dual-hosted git repository. ashrigondekar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b9848ac61a71 [SPARK-53561][SS] Catch Interruption Exception in TransformWithStateInPySparkStateServer during outputStream.flush to avoid the worker crash b9848ac61a71 is described below commit b9848ac61a71161730828e69e410402025269473 Author: huanliwang-db <huanli.w...@databricks.com> AuthorDate: Thu Sep 11 17:19:39 2025 -0700 [SPARK-53561][SS] Catch Interruption Exception in TransformWithStateInPySparkStateServer during outputStream.flush to avoid the worker crash When the `query.stop` is invoked, we first close the the state store running in the query main thread and then close the TWS state server thread as part of the [task completion listener event](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231). So it's possible that query main thread has already closed the state store while in TWS [...] ``` ERROR TransformWithStateInPySparkStateServer: Error reading message: [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to access state store out of order. This is a bug, please retry. error_msg=Cannot update after ABORTED SQLSTATE: XXKST org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder: [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to access state store out of order. This is a bug, please retry. error_msg=Cannot update after ABORTED SQLSTATE: XXKST ``` This exception is caught [here](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185). At this time, the TWS client in python worker may have already closed, then [outputStream.flush()](https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/T [...] ### What changes were proposed in this pull request? Handle the interruption exception properly in TransformWithStateInPySparkStateServer to avoid the worker crash ### Why are the changes needed? worker will be crashed when `query.stop` is invoked for TWS python query. This is not desired state for isolation. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Built a image and ran the test manually and confirmed `query.stop` would not crash the worker ### Was this patch authored or co-authored using generative AI tooling? No Closes #52318 from huanliwang-db/huanliwang-db/catch-interrupted-exception. Authored-by: huanliwang-db <huanli.w...@databricks.com> Signed-off-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> --- .../TransformWithStateInPySparkStateServer.scala | 27 ++++++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala index e2633dbbb131..f5fec2f85dff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.python.streaming -import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException} -import java.nio.channels.{Channels, ServerSocketChannel} +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException, InterruptedIOException} +import java.nio.channels.{Channels, ClosedByInterruptException, ServerSocketChannel} import java.time.Duration import scala.collection.mutable @@ -172,15 +172,32 @@ class TransformWithStateInPySparkStateServer( logWarning(log"No more data to read from the socket") statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) return - case _: InterruptedException => + case _: InterruptedException | _: InterruptedIOException | + _: ClosedByInterruptException => + // InterruptedIOException - thrown when an I/O operation is interrupted + // ClosedByInterruptException - thrown when an I/O operation upon a channel is interrupted logInfo(log"Thread interrupted, shutting down state server") Thread.currentThread().interrupt() statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) return case e: Exception => logError(log"Error reading message: ${MDC(LogKeys.ERROR, e.getMessage)}", e) - sendResponse(1, e.getMessage) - outputStream.flush() + try { + sendResponse(1, e.getMessage) + outputStream.flush() + } catch { + // InterruptedIOException - thrown when an I/O operation is interrupted + // ClosedByInterruptException - thrown when an I/O operation upon a + // channel is interrupted + case _: InterruptedException | _: InterruptedIOException | + _: ClosedByInterruptException => + logInfo(log"Thread is interrupted during flushing error response, " + + log"shutting down state server") + case e: Throwable => + logError(log"Failed to flush with errorMsg=" + + log"${MDC(LogKeys.ERROR, e.getMessage)}", e) + throw e + } statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) return } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org