This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b8c6854d7867 [SPARK-50889][CONNECT][TESTS] Fix Flaky Test: 
`SparkSessionE2ESuite.interrupt operation` (Hang)
b8c6854d7867 is described below

commit b8c6854d7867a65c3edbda8dc4fd437a8820a4a0
Author: Kousuke Saruta <saru...@amazon.co.jp>
AuthorDate: Fri Jul 25 08:23:44 2025 +0900

    [SPARK-50889][CONNECT][TESTS] Fix Flaky Test: 
`SparkSessionE2ESuite.interrupt operation` (Hang)
    
    ### What changes were proposed in this pull request?
    This PR fixes an issue that `SparkSessionE2ESuite.interrupt operation` 
occasionally  hangs.
    This issue happens if an execution thread for an operation id cleans up the 
corresponding `ExecutionHolder` as the result of interruption 
[here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175)
 before a response sender thread consumes a response 
[here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/m
 [...]
    In this case, the cleanup finally calls 
`ExecutorResponseObserver.removeAll()` and all the responses are discarded, and 
the response sender thread can't escape [this 
loop](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245)
 because neither `gotResponse` nor `streamFinished` becomes true.
    
    The solution this PR proposes is changing the definition of 
`streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as 
finished in case  the `ExecutionResponseObserver` is marked as completed and 
all the responses are discarded.
    `ExecutionResponseObserver.removeAll` is called when the corresponding 
`ExecutionHolder` is closed or cleaned up by interruption so this solution 
could be reasonable.
    
    ### Why are the changes needed?
    To fix a potential issue.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Tested manually.
    You can easily reproduce this issue without this change by inserting sleep 
to the test like as follows.
    ```
    --- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
    +++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
     -331,6 +331,7  class SparkSessionE2ESuite extends ConnectFunSuite with 
RemoteSparkSession {
             // cancel
             val operationId = result.operationId
             val canceledId = spark.interruptOperation(operationId)
    +        Thread.sleep(1000)
             assert(canceledId == Seq(operationId))
             // and check that it got canceled
             val e = intercept[SparkException] {
    ```
    
    After this change applied, the test above doesn't hang.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #51638 from sarutak/SPARK-50889.
    
    Authored-by: Kousuke Saruta <saru...@amazon.co.jp>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala      | 3 ++-
 .../apache/spark/sql/connect/execution/ExecuteResponseObserver.scala | 5 +++++
 2 files changed, 7 insertions(+), 1 deletion(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index ff59789980ce..3a707495ff3f 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -232,7 +232,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       // 2. has a response to send
       def gotResponse = response.nonEmpty
       // 3. sent everything from the stream and the stream is finished
-      def streamFinished = 
executionObserver.getLastResponseIndex().exists(nextIndex > _)
+      def streamFinished = 
executionObserver.getLastResponseIndex().exists(nextIndex > _) ||
+        executionObserver.isCleaned()
       // 4. time deadline or size limit reached
       def deadlineLimitReached =
         sentResponsesSize > maximumResponseSize || deadlineTimeNs < 
System.nanoTime()
diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index 9d0cc2128dd4..bcb665eb01ef 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -260,6 +260,11 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
     finalProducedIndex.isDefined
   }
 
+  // Returns if this observer has already been cleaned
+  def isCleaned(): Boolean = responseLock.synchronized {
+    completed() && responses.isEmpty
+  }
+
   // For testing.
   private[connect] def undoCompletion(): Unit = responseLock.synchronized {
     finalProducedIndex = None


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to