This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ac42e2fe122 [SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic 
test failures
5ac42e2fe122 is described below

commit 5ac42e2fe122c972cd3a2ecc87b11b1a70758f72
Author: changgyoopark-db <[email protected]>
AuthorDate: Mon Dec 23 09:03:47 2024 +0900

    [SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic test failures
    
    ### What changes were proposed in this pull request?
    
    Fix sporadic Spark Connect test failures.
    1. SPARK-50534: VerifyEvents.this.listener.executeHolder was not declared 
"volatile", causing the thread to repeatedly read potentially outdated value. 
The data structure is only used by the test suite.
    2. SPARK-50535: 
org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions
 is susceptible to system time synchronization (e.g., NTP), leaving stale 
sessions. invalidateAllSessions is only used by test suites.
    
    ### Why are the changes needed?
    
    Fix sporadic test failures.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Repeatedly ran testOnly 
org.apache.spark.sql.connect.planner.SparkConnectServiceSuite and 
org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #49253 from changgyoopark-db/SPARK-50534.
    
    Authored-by: changgyoopark-db <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../spark/sql/connect/service/SparkConnectSessionManager.scala      | 6 ++++--
 .../apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala | 3 ++-
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
index a306856efa33..b0b74a36e187 100644
--- 
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
+++ 
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
@@ -289,8 +289,10 @@ class SparkConnectSessionManager extends Logging {
    * Used for testing
    */
   private[connect] def invalidateAllSessions(): Unit = {
-    periodicMaintenance(defaultInactiveTimeoutMs = 0L, ignoreCustomTimeout = 
true)
-    assert(sessionStore.isEmpty)
+    sessionStore.forEach((key, sessionHolder) => {
+      removeSessionHolder(key)
+      shutdownSessionHolder(sessionHolder)
+    })
     closedSessionsCache.invalidateAll()
   }
 
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index d6d137e6d91a..5e8872569165 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -919,7 +919,8 @@ class SparkConnectServiceSuite
   }
   class MockSparkListener() extends SparkListener {
     val semaphoreStarted = new Semaphore(0)
-    var executeHolder = Option.empty[ExecuteHolder]
+    // Accessed by multiple threads in parallel.
+    @volatile var executeHolder = Option.empty[ExecuteHolder]
     override def onOtherEvent(event: SparkListenerEvent): Unit = {
       event match {
         case e: SparkListenerConnectOperationStarted =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to