This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5ac42e2fe122 [SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic
test failures
5ac42e2fe122 is described below
commit 5ac42e2fe122c972cd3a2ecc87b11b1a70758f72
Author: changgyoopark-db <[email protected]>
AuthorDate: Mon Dec 23 09:03:47 2024 +0900
[SPARK-50534][SPARK-50535][TEST][CONNECT] Fix sporadic test failures
### What changes were proposed in this pull request?
Fix sporadic Spark Connect test failures.
1. SPARK-50534: VerifyEvents.this.listener.executeHolder was not declared
"volatile", causing the thread to repeatedly read potentially outdated value.
The data structure is only used by the test suite.
2. SPARK-50535:
org.apache.spark.sql.connect.service.SparkConnectSessionManager.invalidateAllSessions
is susceptible to system time synchronization (e.g., NTP), leaving stale
sessions. invalidateAllSessions is only used by test suites.
### Why are the changes needed?
Fix sporadic test failures.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Repeatedly ran testOnly
org.apache.spark.sql.connect.planner.SparkConnectServiceSuite and
org.apache.spark.sql.connect.service.SparkConnectServiceE2ESuite.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #49253 from changgyoopark-db/SPARK-50534.
Authored-by: changgyoopark-db <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../spark/sql/connect/service/SparkConnectSessionManager.scala | 6 ++++--
.../apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala | 3 ++-
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
index a306856efa33..b0b74a36e187 100644
---
a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
+++
b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectSessionManager.scala
@@ -289,8 +289,10 @@ class SparkConnectSessionManager extends Logging {
* Used for testing
*/
private[connect] def invalidateAllSessions(): Unit = {
- periodicMaintenance(defaultInactiveTimeoutMs = 0L, ignoreCustomTimeout =
true)
- assert(sessionStore.isEmpty)
+ sessionStore.forEach((key, sessionHolder) => {
+ removeSessionHolder(key)
+ shutdownSessionHolder(sessionHolder)
+ })
closedSessionsCache.invalidateAll()
}
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index d6d137e6d91a..5e8872569165 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -919,7 +919,8 @@ class SparkConnectServiceSuite
}
class MockSparkListener() extends SparkListener {
val semaphoreStarted = new Semaphore(0)
- var executeHolder = Option.empty[ExecuteHolder]
+ // Accessed by multiple threads in parallel.
+ @volatile var executeHolder = Option.empty[ExecuteHolder]
override def onOtherEvent(event: SparkListenerEvent): Unit = {
event match {
case e: SparkListenerConnectOperationStarted =>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]