This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 72cb18c7377a [SPARK-48163][CONNECT][TESTS][4.0] Fix flaky tests in 
`SparkConnectServiceSuite` which are caused by `executorHolder` undefined
72cb18c7377a is described below

commit 72cb18c7377aa391c964091be6870abaf5ab22a1
Author: Kousuke Saruta <saru...@amazon.co.jp>
AuthorDate: Tue Sep 9 08:48:16 2025 -0700

    [SPARK-48163][CONNECT][TESTS][4.0] Fix flaky tests in 
`SparkConnectServiceSuite` which are caused by `executorHolder` undefined
    
    ### What changes were proposed in this pull request?
    This PR backports #52264 to `branch-4.0`.
    
    This PR aims to fix flaky tests in `SparkConnectServiceSuite` which are 
caused by `executorHolder` 
[undefined](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L880).
    The conditions to reproduce this issue are:
    
    (1) The operation finishes before its `executeHolder` is set in 
[MockSparkListener#onOtherEvent](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L929).
    (2) `executeHolder` is accessed through calling `verifyEvents.onComplete` 
after the operation finishes.
    
    `SparkListenerConnectOperationStarted` is posted asynchronously with the 
corresponding operation so the condition (1) can be met. After an operation 
finishes, `executeHolder` is [removed from a 
map](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala#L153)
 so if the condition (1) is met, `executeHolder` is never set because 
`SparkConnectService.executionM [...]
    
    One example of the test affected by this issue is `SPARK-43923: commands 
send events - get_resources_command`.
    You can easily reproduce this issue by inserting sleep into 
`MockSparkListener#onOtherEvent` like as follows.
    
    ```
       val executeKey =
         ExecuteKey(sessionHolder.userId, sessionHolder.sessionId, 
e.operationId)
    +  Thread.sleep(1000)
       executeHolder = 
SparkConnectService.executionManager.getExecuteHolder(executeKey)
    ```
    
    And then, run test.
    ```
    $ build/sbt 'connect/testOnly 
org.apache.spark.sql.connect.planner.SparkConnectServiceSuite -- -z 
"get_resources_command"'
    ```
    To resolve this issue, this PR proposes:
    
    * Change `VerifyEvents#onCompleted` just to assert 
`executeHolder.eventsManager.getProducedRowCount == producedRowCount`
    * Call `VerifyEvents#onCompleted` from `StreamObserver#onCompleted`
    * Add `VerifyEvents#assertClosed` to check if the status is `Closed`
    
    ### Why are the changes needed?
    For test stability.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Inserting `Thread.sleep(1000)` like mentioned above and then run 
`SparkConnectServiceSuite`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52281 from sarutak/followup-SPARK-48163.
    
    Authored-by: Kousuke Saruta <saru...@amazon.co.jp>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../sql/connect/planner/SparkConnectServiceSuite.scala    | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index a73da34d3647..c6daa92e9735 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -193,10 +193,11 @@ class SparkConnectServiceSuite
           }
 
           override def onCompleted(): Unit = {
+            verifyEvents.onCompleted(Some(100))
             done = true
           }
         })
-      verifyEvents.onCompleted(Some(100))
+      verifyEvents.assertClosed()
       // The current implementation is expected to be blocking. This is here 
to make sure it is.
       assert(done)
 
@@ -294,10 +295,11 @@ class SparkConnectServiceSuite
           }
 
           override def onCompleted(): Unit = {
+            verifyEvents.onCompleted(Some(6))
             done = true
           }
         })
-      verifyEvents.onCompleted(Some(6))
+      verifyEvents.assertClosed()
       // The current implementation is expected to be blocking. This is here 
to make sure it is.
       assert(done)
 
@@ -530,10 +532,11 @@ class SparkConnectServiceSuite
           }
 
           override def onCompleted(): Unit = {
+            verifyEvents.onCompleted(producedNumRows)
             done = true
           }
         })
-      verifyEvents.onCompleted(producedNumRows)
+      verifyEvents.assertClosed()
       // The current implementation is expected to be blocking.
       // This is here to make sure it is.
       assert(done)
@@ -621,7 +624,7 @@ class SparkConnectServiceSuite
           }
         })
       thread.join()
-      verifyEvents.onCompleted()
+      verifyEvents.assertClosed()
     }
   }
 
@@ -684,7 +687,7 @@ class SparkConnectServiceSuite
           }
         })
       assert(failures.isEmpty, s"this should have no failures but got 
$failures")
-      verifyEvents.onCompleted()
+      verifyEvents.assertClosed()
     }
   }
 
@@ -893,6 +896,8 @@ class SparkConnectServiceSuite
     }
     def onCompleted(producedRowCount: Option[Long] = None): Unit = {
       assert(executeHolder.eventsManager.getProducedRowCount == 
producedRowCount)
+    }
+    def assertClosed(): Unit = {
       // The eventsManager is closed asynchronously
       Eventually.eventually(EVENT_WAIT_TIMEOUT) {
         assert(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to