This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b8c6854d7867 [SPARK-50889][CONNECT][TESTS] Fix Flaky Test: `SparkSessionE2ESuite.interrupt operation` (Hang) b8c6854d7867 is described below commit b8c6854d7867a65c3edbda8dc4fd437a8820a4a0 Author: Kousuke Saruta <saru...@amazon.co.jp> AuthorDate: Fri Jul 25 08:23:44 2025 +0900 [SPARK-50889][CONNECT][TESTS] Fix Flaky Test: `SparkSessionE2ESuite.interrupt operation` (Hang) ### What changes were proposed in this pull request? This PR fixes an issue that `SparkSessionE2ESuite.interrupt operation` occasionally hangs. This issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/m [...] In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true. The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case the `ExecutionResponseObserver` is marked as completed and all the responses are discarded. `ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable. ### Why are the changes needed? To fix a potential issue. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Tested manually. You can easily reproduce this issue without this change by inserting sleep to the test like as follows. ``` --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala -331,6 +331,7 class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession { // cancel val operationId = result.operationId val canceledId = spark.interruptOperation(operationId) + Thread.sleep(1000) assert(canceledId == Seq(operationId)) // and check that it got canceled val e = intercept[SparkException] { ``` After this change applied, the test above doesn't hang. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51638 from sarutak/SPARK-50889. Authored-by: Kousuke Saruta <saru...@amazon.co.jp> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala | 3 ++- .../apache/spark/sql/connect/execution/ExecuteResponseObserver.scala | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala index ff59789980ce..3a707495ff3f 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala @@ -232,7 +232,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message]( // 2. has a response to send def gotResponse = response.nonEmpty // 3. sent everything from the stream and the stream is finished - def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) + def streamFinished = executionObserver.getLastResponseIndex().exists(nextIndex > _) || + executionObserver.isCleaned() // 4. time deadline or size limit reached def deadlineLimitReached = sentResponsesSize > maximumResponseSize || deadlineTimeNs < System.nanoTime() diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala index 9d0cc2128dd4..bcb665eb01ef 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala @@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder: finalProducedIndex.isDefined } + // Returns if this observer has already been cleaned + def isCleaned(): Boolean = responseLock.synchronized { + completed() && responses.isEmpty + } + // For testing. private[connect] def undoCompletion(): Unit = responseLock.synchronized { finalProducedIndex = None --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org