This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d3f638f3f8f4 [SPARK-48163][CONNECT][TESTS] Fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` undefined d3f638f3f8f4 is described below commit d3f638f3f8f43c7eb223628fe05cbb2f4d0a296e Author: Kousuke Saruta <saru...@amazon.co.jp> AuthorDate: Mon Sep 8 08:51:05 2025 -0700 [SPARK-48163][CONNECT][TESTS] Fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` undefined ### What changes were proposed in this pull request? This PR aims to fix flaky tests in `SparkConnectServiceSuite` which are caused by `executorHolder` [undefined](https://github.com/apache/spark/blob/ab9a63626018156b3e0f267f14409c30031692b7/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L908). The conditions to reproduce this issue are: (1) The operation finishes before its `executeHolder` is set in [MockSparkListener#onOtherEvent](https://github.com/apache/spark/blob/ab9a63626018156b3e0f267f14409c30031692b7/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala#L961). (2) `executeHolder` is accessed through calling `verifyEvents.onComplete` after the operation finishes. `SparkListenerConnectOperationStarted` is posted asynchronously with the corresponding operation so the condition (1) can be met. After an operation finishes, `executeHolder` is [removed from a map](https://github.com/apache/spark/blob/af16aa8e11c223642f928b0b9893854a851d70bb/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala#L153) so if the condition (1) is met, `executeHolder` is never set because `SparkConnectService.executionM [...] One example of the test affected by this issue is `SPARK-43923: commands send events - get_resources_command`. You can easily reproduce this issue by inserting sleep into `MockSparkListener#onOtherEvent` like as follows. ``` val executeKey = ExecuteKey(sessionHolder.userId, sessionHolder.sessionId, e.operationId) + Thread.sleep(1000) executeHolder = SparkConnectService.executionManager.getExecuteHolder(executeKey) ``` And then, run test. ``` $ build/sbt 'connect/testOnly org.apache.spark.sql.connect.planner.SparkConnectServiceSuite -- -z "get_resources_command"' ``` To resolve this issue, this PR proposes: * Change `VerifyEvents#onCompleted` just to assert `executeHolder.eventsManager.getProducedRowCount == producedRowCount` * Call `VerifyEvents#onCompleted` from `StreamObserver#onCompleted` * Add `VerifyEvents#assertClosed` to check if the status is `Closed` ### Why are the changes needed? For test stability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Inserting `Thread.sleep(1000)` like mentioned above and then run `SparkConnectServiceSuite`. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52264 from sarutak/SPARK-48163. Authored-by: Kousuke Saruta <saru...@amazon.co.jp> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/connect/planner/SparkConnectServiceSuite.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index 3dcd025855bd..b02507f79648 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -192,10 +192,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(Some(100)) done = true } }) - verifyEvents.onCompleted(Some(100)) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. This is here to make sure it is. assert(done) @@ -293,10 +294,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(Some(6)) done = true } }) - verifyEvents.onCompleted(Some(6)) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. This is here to make sure it is. assert(done) @@ -529,10 +531,11 @@ class SparkConnectServiceSuite } override def onCompleted(): Unit = { + verifyEvents.onCompleted(producedNumRows) done = true } }) - verifyEvents.onCompleted(producedNumRows) + verifyEvents.assertClosed() // The current implementation is expected to be blocking. // This is here to make sure it is. assert(done) @@ -620,7 +623,7 @@ class SparkConnectServiceSuite } }) thread.join() - verifyEvents.onCompleted() + verifyEvents.assertClosed() } } @@ -683,7 +686,7 @@ class SparkConnectServiceSuite } }) assert(failures.isEmpty, s"this should have no failures but got $failures") - verifyEvents.onCompleted() + verifyEvents.assertClosed() } } @@ -925,6 +928,8 @@ class SparkConnectServiceSuite } def onCompleted(producedRowCount: Option[Long] = None): Unit = { assert(executeHolder.eventsManager.getProducedRowCount == producedRowCount) + } + def assertClosed(): Unit = { // The eventsManager is closed asynchronously Eventually.eventually(EVENT_WAIT_TIMEOUT) { assert( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org