This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-4.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.0 by this push: new 72cb18c7377a [SPARK-48163][CONNECT][TESTS][4.0] Fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` undefined 72cb18c7377a is described below commit 72cb18c7377aa391c964091be6870abaf5ab22a1 Author: Kousuke Saruta <saru...@amazon.co.jp> AuthorDate: Tue Sep 9 08:48:16 2025 -0700 [SPARK-48163][CONNECT][TESTS][4.0] Fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` undefined ### What changes were proposed in this pull request? This PR backports #52264 to `branch-4.0`. This PR aims to fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` [undefined](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L880). The conditions to reproduce this issue are: (1) The operation finishes before its `executeHolder` is set in [MockSparkListener#onOtherEvent](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L929). (2) `executeHolder` is accessed through calling `verifyEvents.onComplete` after the operation finishes. `SparkListenerConnectOperationStarted` is posted asynchronously with the corresponding operation so the condition (1) can be met. After an operation finishes, `executeHolder` is [removed from a map](https://github.com/apache/spark/blob/82351703526b71078ccd6599e34063d05e292461/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala#L153) so if the condition (1) is met, `executeHolder` is never set because `SparkConnectService.executionM [...] One example of the test affected by this issue is `SPARK-43923: commands send events - get_resources_command`. You can easily reproduce this issue by inserting sleep into `MockSparkListener#onOtherEvent` like as follows. ``` val executeKey = ExecuteKey(sessionHolder.userId, sessionHolder.sessionId, e.operationId) + Thread.sleep(1000) executeHolder = SparkConnectService.executionManager.getExecuteHolder(executeKey) ``` And then, run test. ``` $ build/sbt 'connect/testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite -- -z "get_resources_command"' ``` To resolve this issue, this PR proposes: * Change `VerifyEvents#onCompleted` just to assert `executeHolder.eventsManager.getProducedRowCount == producedRowCount` * Call `VerifyEvents#onCompleted` from `StreamObserver#onCompleted` * Add `VerifyEvents#assertClosed` to check if the status is `Closed` ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Inserting `Thread.sleep(1000)` like mentioned above and then run `SparkConnectServiceSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52281 from sarutak/followup-SPARK-48163. Authored-by: Kousuke Saruta <saru...@amazon.co.jp> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/connect/planner/SparkConnectServiceSuite.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index a73da34d3647..c6daa92e9735 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -193,10 +193,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(Some(100)) done = true } }) - verifyEvents.onCompleted(Some(100)) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. This is here to make sure it is. assert(done) @@ -294,10 +295,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(Some(6)) done = true } }) - verifyEvents.onCompleted(Some(6)) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. This is here to make sure it is. assert(done) @@ -530,10 +532,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(producedNumRows) done = true } }) - verifyEvents.onCompleted(producedNumRows) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. // This is here to make sure it is. assert(done) @@ -621,7 +624,7 @@ class SparkConnectServiceSuite } }) thread.join() - verifyEvents.onCompleted() + verifyEvents.assertClosed() } } @@ -684,7 +687,7 @@ class SparkConnectServiceSuite } }) assert(failures.isEmpty, s"this should have no failures but got $failures") - verifyEvents.onCompleted() + verifyEvents.assertClosed() } } @@ -893,6 +896,8 @@ class SparkConnectServiceSuite } def onCompleted(producedRowCount: Option[Long] = None): Unit = { assert(executeHolder.eventsManager.getProducedRowCount == producedRowCount) + } + def assertClosed(): Unit = { // The eventsManager is closed asynchronously Eventually.eventually(EVENT_WAIT_TIMEOUT) { assert( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org