This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1da0e5357ad9 [SPARK-55133][CONNECT] Fix race condition in
IsolatedSessionState lifecycle management
1da0e5357ad9 is described below
commit 1da0e5357ad95f174f5e8b637d1948a488e017ac
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Jan 30 22:24:07 2026 +0800
[SPARK-55133][CONNECT] Fix race condition in IsolatedSessionState lifecycle
management
### What changes were proposed in this pull request?
This PR fixes a race condition in `IsolatedSessionState` lifecycle
management that could cause failures when
`spark.executor.isolatedSessionCache.size` is set to a small value.
Key changes:
- Introduced `IsolatedSessionState.sessions` as the authoritative store for
all isolated sessions, ensuring only one session exists per UUID at any time
- Changed `refCount` and `evicted` from lock-free to synchronized access
via a shared lock object to prevent race conditions between `acquire()`,
`release()`, and `markEvicted()`
- Added `acquire()` return value to indicate if the session was
successfully acquired (returns false if already evicted)
- Added `tryUnEvict()` method to allow reusing a deferred session that was
evicted but still in use
- Updated the cache loader to check the authoritative sessions map first
and reuse existing sessions when possible
### Why are the changes needed?
When the isolated session cache is full and sessions are evicted, there's a
race condition between:
1. A task acquiring a session from the cache
2. Another task triggering eviction of that session
3. The evicted session being cleaned up (classloader closed, files deleted)
This could cause:
- `RemoteClassLoaderError` when trying to load classes with a closed
classloader
- `NoSuchFileException` when session files are deleted while still in use
The fix ensures that:
- Sessions are tracked in an authoritative map from creation until cleanup
completes
- Evicted sessions can be reused if still in use by other tasks
- A task cannot acquire a session that's being cleaned up
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests and a new test suite
### Was this patch authored or co-authored using generative AI tooling?
Yes, cursor 2.3
Closes #53914 from cloud-fan/session.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../scala/org/apache/spark/executor/Executor.scala | 201 ++++++++++++++++--
.../ExecutorSideSessionManagementSuite.scala | 233 +++++++++++++++++++++
.../sql/connect/test/RemoteSparkSession.scala | 2 +-
3 files changed, 414 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 121623f46add..72d2eb87af67 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,7 @@ import java.net.{URI, URL, URLClassLoader}
import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import javax.annotation.concurrent.GuardedBy
@@ -58,6 +58,91 @@ import org.apache.spark.storage.{StorageLevel,
TaskResultBlockId}
import org.apache.spark.util._
import org.apache.spark.util.ArrayImplicits._
+private[spark] object IsolatedSessionState {
+ // Authoritative store for all isolated sessions. Sessions are put here when
created
+ // and removed when cleanup runs. The Guava cache just tracks which sessions
are
+ // "active" for LRU eviction policy, but this map is the source of truth.
+ // This ensures there's only ONE IsolatedSessionState per UUID at any time.
+ val sessions = new ConcurrentHashMap[String, IsolatedSessionState]()
+}
+
+/**
+ * Represents an isolated session state on the executor side, containing
session-specific
+ * classloaders, files, jars, and archives. This class manages the lifecycle
of these resources
+ * and prevents race conditions between concurrent task execution and cache
eviction.
+ *
+ * == Architecture ==
+ *
+ * Sessions are managed through two mechanisms:
+ * 1. A Guava LRU cache (`isolatedSessionCache`) for active session lookup
with size limits
+ * 2. An authoritative map (`IsolatedSessionState.sessions`) tracking all
sessions until cleanup
+ *
+ * The Guava cache handles LRU eviction, while the authoritative map ensures
there's only one
+ * IsolatedSessionState instance per UUID at any time and tracks sessions that
are evicted but
+ * still in use.
+ *
+ * == State Machine ==
+ *
+ * Each session has two state variables protected by a synchronized lock:
+ * - `refCount`: Number of tasks currently using this session
+ * - `evicted`: Whether the session has been evicted from the Guava cache
+ *
+ * Valid state transitions:
+ * {{{
+ * Common workflow (no contention):
+ * [Created] --> acquire() --> [Active: refCount > 0]
+ * |
+ * release() (all tasks done)
+ * |
+ * v
+ * [Idle: refCount = 0]
+ * |
+ * markEvicted() (cache eviction)
+ * |
+ * v
+ * [Cleanup]
+ *
+ * Contention case (eviction while tasks running):
+ * [Active: refCount > 0] --> markEvicted() --> [Deferred: evicted = true]
+ * |
+ * +---------------------------------------------+
+ * | |
+ * v v
+ * release() (last task) tryUnEvict()
+ * | |
+ * v v
+ * [Cleanup] [Active] (back in cache)
+ * }}}
+ *
+ * == Cleanup ==
+ *
+ * Cleanup happens when both conditions are met: `refCount == 0` AND `evicted
== true`.
+ * This can occur either:
+ * - Immediately when `markEvicted()` is called and no tasks are using the
session
+ * - Deferred when the last task calls `release()` after the session was
evicted
+ *
+ * Cleanup closes the classloader, deletes session files, and removes the
session
+ * from the authoritative map.
+ *
+ * == Concurrency Guarantees ==
+ *
+ * The key insight is that as long as a session is still in use (refCount >
0), it
+ * remains in the authoritative map and we can get its instance. When a new
task needs
+ * a session that was evicted from the LRU cache but is still in use:
+ *
+ * - If cleanup has NOT started (refCount > 0): we can cancel the pending
cleanup
+ * via `tryUnEvict()`, put the instance back into the LRU cache, and safely
reuse it.
+ *
+ * - If cleanup HAS started (refCount became 0): cleanup runs synchronously
under the
+ * lock, so it must complete before any new task can proceed. Once cleanup
finishes,
+ * the session is removed from the authoritative map, and a fresh instance
is created.
+ *
+ * This design ensures there is never a race where a task uses a session that
is being
+ * or has been cleaned up. The `acquire()` and `tryUnEvict()` methods are
intentionally
+ * separate: `tryUnEvict()` is only called from the cache loader to guarantee
the
+ * session is put back into the LRU cache, maintaining the invariant that a
non-evicted
+ * session is always in the cache.
+ */
private[spark] class IsolatedSessionState(
val sessionUUID: String,
var urlClassLoader: MutableURLClassLoader,
@@ -68,29 +153,62 @@ private[spark] class IsolatedSessionState(
val replClassDirUri: Option[String]) extends Logging {
// Reference count for the number of running tasks using this session.
- private val refCount: AtomicInteger = new AtomicInteger(0)
+ // Access is synchronized via `lock`.
+ private var refCount: Int = 0
// Whether this session has been evicted from the cache.
- @volatile private var evicted: Boolean = false
+ // Access is synchronized via `lock`.
+ private var evicted: Boolean = false
- /** Increment the reference count, indicating a task is using this session.
*/
- def acquire(): Unit = refCount.incrementAndGet()
+ // Lock to synchronize all state changes.
+ private val lock = new Object
+
+ /**
+ * Increment the reference count, indicating a task is using this session.
+ * @return true if the session was successfully acquired, false if it was
already evicted
+ */
+ def acquire(): Boolean = lock.synchronized {
+ if (evicted) {
+ false
+ } else {
+ refCount += 1
+ true
+ }
+ }
+
+ /**
+ * Try to un-evict this session so it can be reused.
+ * This is called from the cache loader to reuse a deferred session.
+ * The caller should call acquire() separately after the session is in cache.
+ * @return true if successfully un-evicted, false if already cleaned up or
refCount is 0
+ */
+ def tryUnEvict(): Boolean = lock.synchronized {
+ if (evicted && refCount > 0) {
+ evicted = false
+ logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} un-evicted, " +
+ log"still in use by ${MDC(LogKeys.COUNT, refCount)} task(s)")
+ true
+ } else {
+ false
+ }
+ }
/** Decrement the reference count. If evicted and no more tasks, clean up. */
- def release(): Unit = {
- if (refCount.decrementAndGet() == 0 && evicted) {
+ def release(): Unit = lock.synchronized {
+ refCount -= 1
+ if (refCount == 0 && evicted) {
cleanup()
}
}
- /** Mark this session as evicted. If no tasks are using it, clean up
immediately. */
- def markEvicted(): Unit = {
+ /** Mark this session as evicted. Cleans up immediately if refCount is 0. */
+ def markEvicted(): Unit = lock.synchronized {
evicted = true
- if (refCount.get() == 0) {
+ if (refCount == 0) {
cleanup()
} else {
logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} evicted but still in
use by " +
- log"${MDC(LogKeys.COUNT, refCount.get())} task(s), deferring cleanup")
+ log"${MDC(LogKeys.COUNT, refCount)} task(s), deferring cleanup")
}
}
@@ -108,11 +226,15 @@ private[spark] class IsolatedSessionState(
logWarning(log"Failed to close urlClassLoader for session " +
log"${MDC(SESSION_ID, sessionUUID)}", e)
}
+
// Delete session files.
val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), sessionUUID)
if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
Utils.deleteRecursively(sessionBasedRoot)
}
+
+ // Remove from authoritative sessions map after cleanup
+ IsolatedSessionState.sessions.remove(sessionUUID)
logInfo(log"Session cleaned up: ${MDC(SESSION_ID, sessionUUID)}")
}
}
@@ -241,13 +363,17 @@ private[spark] class Executor(
isDefaultState(jobArtifactState.uuid))
val replClassLoader = addReplClassLoaderIfNeeded(
urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
- new IsolatedSessionState(
+ val state = new IsolatedSessionState(
jobArtifactState.uuid, urlClassLoader, replClassLoader,
currentFiles,
currentJars,
currentArchives,
jobArtifactState.replClassDirUri
)
+ // Store in the authoritative sessions map immediately.
+ // This ensures there's only one session per UUID at any time.
+ IsolatedSessionState.sessions.put(jobArtifactState.uuid, state)
+ state
}
private def isStubbingEnabledForState(name: String) = {
@@ -258,7 +384,7 @@ private[spark] class Executor(
private def isDefaultState(name: String) = name == "default"
// Classloader isolation
- // The default isolation group
+ // The default isolation group. Not in the cache, never evicted.
val defaultSessionState: IsolatedSessionState =
newSessionState(JobArtifactState("default", None))
val isolatedSessionCache: Cache[String, IsolatedSessionState] =
CacheBuilder.newBuilder()
@@ -270,8 +396,8 @@ private[spark] class Executor(
val state = notification.getValue
// Cache is always used for isolated sessions.
assert(!isDefaultState(state.sessionUUID))
- // Mark evicted - cleanup will happen immediately if no tasks are
using it,
- // or when the last task releases it.
+ // Mark evicted. The session stays in the authoritative sessions map
until cleanup.
+ // If refCount > 0, cleanup is deferred until all tasks release.
state.markEvicted()
}
})
@@ -625,18 +751,48 @@ private[spark] class Executor(
(accums, accUpdates)
}
+ /**
+ * Obtains an IsolatedSessionState for the given job artifact state.
+ * Gets or creates a session from the cache, then acquires it. We need to
retry the cache
+ * lookup if the session was evicted between get() and acquire(). This can
happen when the
+ * cache is full and another task triggers eviction.
+ */
+ private def obtainSession(jobArtifactState: JobArtifactState):
IsolatedSessionState = {
+ var session: IsolatedSessionState = null
+ var acquired = false
+ while (!acquired) {
+ // Get or create session. The loader uses sessions map as the
authoritative store.
+ // This ensures there's only one IsolatedSessionState per UUID at any
time.
+ session = isolatedSessionCache.get(jobArtifactState.uuid, () => {
+ // Check the authoritative sessions map first. tryUnEvict() will
block if
+ // cleanup is in progress, so when it returns false, the session is
already
+ // removed from the map and it's safe to create a new one.
+ val existingSession =
IsolatedSessionState.sessions.get(jobArtifactState.uuid)
+ if (existingSession != null && existingSession.tryUnEvict()) {
+ existingSession
+ } else {
+ newSessionState(jobArtifactState)
+ }
+ })
+ // acquire() can return false if session was evicted between get() and
now.
+ // In that case, retry - the session is already removed from cache.
+ acquired = session.acquire()
+ }
+ session
+ }
+
override def run(): Unit = {
// Classloader isolation
val isolatedSession = taskDescription.artifacts.state match {
case Some(jobArtifactState) =>
- isolatedSessionCache.get(jobArtifactState.uuid, () =>
newSessionState(jobArtifactState))
- case _ => defaultSessionState
+ obtainSession(jobArtifactState)
+ case _ =>
+ // The default session is never in the cache and never evicted,
+ // so no need to acquire/release.
+ defaultSessionState
}
- // Pin the session to prevent its class loader from being closed while
this task is running.
- isolatedSession.acquire()
-
setMDCForTask(taskName, mdcProperties)
threadId = Thread.currentThread.getId
Thread.currentThread.setName(threadName)
@@ -943,7 +1099,10 @@ private[spark] class Executor(
metricsPoller.onTaskCompletion(taskId, task.stageId,
task.stageAttemptId)
}
// Release the session reference. If evicted and this was the last
task, cleanup happens.
- isolatedSession.release()
+ // Skip for defaultSessionState since it's never evicted.
+ if (isolatedSession ne defaultSessionState) {
+ isolatedSession.release()
+ }
Thread.currentThread().setName(s"$IDLE_TASK_THREAD_NAME#$threadId" )
}
}
diff --git
a/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
b/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
new file mode 100644
index 000000000000..f127951054e7
--- /dev/null
+++
b/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
+
+/**
+ * Unit tests for IsolatedSessionState lifecycle management.
+ * These tests verify the fix for race conditions in session
acquire/release/eviction.
+ */
+class ExecutorSideSessionManagementSuite
+ extends SparkFunSuite
+ with BeforeAndAfterEach
+ with MockitoSugar {
+
+ private var testSessionCounter = 0
+ private var tempDir: File = _
+
+ override def beforeEach(): Unit = {
+ super.beforeEach()
+ // Clear the sessions map before each test
+ IsolatedSessionState.sessions.clear()
+ testSessionCounter = 0
+
+ // Set up a mock SparkEnv so that cleanup() can access
SparkFiles.getRootDirectory()
+ tempDir = Utils.createTempDir()
+ val mockEnv = mock[SparkEnv]
+ val conf = new SparkConf(false)
+ when(mockEnv.conf).thenReturn(conf)
+ when(mockEnv.driverTmpDir).thenReturn(Some(tempDir.getAbsolutePath))
+ SparkEnv.set(mockEnv)
+ }
+
+ override def afterEach(): Unit = {
+ // Clear the sessions map after each test
+ IsolatedSessionState.sessions.clear()
+ SparkEnv.set(null)
+ if (tempDir != null && tempDir.exists()) {
+ Utils.deleteRecursively(tempDir)
+ tempDir = null
+ }
+ super.afterEach()
+ }
+
+ /**
+ * Creates a test IsolatedSessionState with a mock classloader and unique
UUID.
+ */
+ private def createTestSession(uuid: String): IsolatedSessionState = {
+ val classLoader = new MutableURLClassLoader(
+ Array.empty,
+ Thread.currentThread().getContextClassLoader
+ )
+ val session = new IsolatedSessionState(
+ sessionUUID = uuid,
+ urlClassLoader = classLoader,
+ replClassLoader = classLoader,
+ currentFiles = new HashMap[String, Long](),
+ currentJars = new HashMap[String, Long](),
+ currentArchives = new HashMap[String, Long](),
+ replClassDirUri = None
+ )
+ // Register in authoritative sessions map as would happen in production
+ IsolatedSessionState.sessions.put(uuid, session)
+ session
+ }
+
+ private def nextUniqueUuid(): String = {
+ testSessionCounter += 1
+ s"test-uuid-$testSessionCounter"
+ }
+
+ test("acquire returns true for new session") {
+ val session = createTestSession(nextUniqueUuid())
+ assert(session.acquire())
+ }
+
+ test("acquire returns true for session acquired multiple times") {
+ val session = createTestSession(nextUniqueUuid())
+ assert(session.acquire())
+ assert(session.acquire())
+ assert(session.acquire())
+ }
+
+ test("acquire returns false after session is evicted with no references") {
+ val session = createTestSession(nextUniqueUuid())
+ session.markEvicted()
+ // Session should be cleaned up immediately since refCount is 0
+ assert(!IsolatedSessionState.sessions.containsKey(session.sessionUUID))
+ // Cannot acquire an evicted session.
+ assert(!session.acquire())
+ }
+
+ test("acquire returns false after session is evicted even with existing
references") {
+ val uuid = nextUniqueUuid()
+ val session = createTestSession(uuid)
+
+ // First task acquires the session
+ assert(session.acquire())
+
+ // Session gets evicted (e.g., due to cache pressure)
+ session.markEvicted()
+ // Session should still be in the map because refCount > 0 (deferred
cleanup)
+ assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+ // A new task tries to acquire the same session - should fail because it's
evicted
+ assert(!session.acquire())
+
+ // The original task releases - now cleanup happens
+ session.release()
+ assert(!IsolatedSessionState.sessions.containsKey(uuid))
+ }
+
+ test("deferred cleanup with multiple references") {
+ val uuid = nextUniqueUuid()
+ val session = createTestSession(uuid)
+
+ // Acquire the session multiple times (simulating multiple tasks)
+ assert(session.acquire())
+ assert(session.acquire())
+ assert(session.acquire())
+
+ // Evict the session - cleanup should be deferred
+ session.markEvicted()
+ assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+ // Release twice - cleanup should still be deferred
+ session.release()
+ assert(IsolatedSessionState.sessions.containsKey(uuid))
+ session.release()
+ assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+ // Release the last reference - cleanup should happen now
+ session.release()
+ assert(!IsolatedSessionState.sessions.containsKey(uuid))
+ }
+
+ test("tryUnEvict succeeds when session is evicted but still has references")
{
+ val session = createTestSession(nextUniqueUuid())
+
+ // Acquire the session
+ assert(session.acquire())
+
+ // Evict the session
+ session.markEvicted()
+
+ // Try to un-evict - should succeed because refCount > 0
+ assert(session.tryUnEvict())
+
+ // Now acquire should succeed again
+ assert(session.acquire())
+ }
+
+ test("tryUnEvict fails when session is not evicted") {
+ val session = createTestSession(nextUniqueUuid())
+
+ // Acquire without eviction
+ assert(session.acquire())
+
+ // Try to un-evict - should fail because session is not evicted
+ assert(!session.tryUnEvict())
+ }
+
+ test("tryUnEvict and acquire fail when session has no references") {
+ val uuid = nextUniqueUuid()
+ val session = createTestSession(uuid)
+
+ // Evict with no references - triggers immediate cleanup
+ session.markEvicted()
+ assert(!IsolatedSessionState.sessions.containsKey(uuid))
+
+ // tryUnEvict should fail because refCount is 0 and session is already
cleaned up
+ assert(!session.tryUnEvict())
+
+ // acquire should also fail
+ assert(!session.acquire())
+ }
+
+ test("session reuse via tryUnEvict keeps session in map when not evicted") {
+ // Note: This test verifies `IsolatedSessionState.sessions` behavior in
isolation.
+ // In production, after tryUnEvict(), the session is put back into the
Guava cache.
+ // When the cache eventually evicts it again (due to LRU policy),
markEvicted() will be called,
+ // and cleanup will happen if refCount is 0. So there's no resource leak
in practice.
+ val uuid = nextUniqueUuid()
+ val session = createTestSession(uuid)
+
+ // Simulate task 1 acquiring the session
+ assert(session.acquire())
+
+ // Session gets evicted (e.g., due to cache pressure)
+ session.markEvicted()
+ assert(IsolatedSessionState.sessions.containsKey(uuid)) // Deferred cleanup
+
+ // Simulate cache loader trying to reuse the session via tryUnEvict
+ assert(session.tryUnEvict())
+
+ // Now a new task can acquire the session
+ assert(session.acquire())
+
+ // Task 1 releases
+ session.release()
+ assert(IsolatedSessionState.sessions.containsKey(uuid)) // Still has 1
reference
+
+ // Task 2 releases - session stays in map because it's not evicted
+ session.release()
+ // Session stays in map because it's not evicted anymore (was un-evicted).
+ // In production, the Guava cache would eventually evict it again,
triggering cleanup.
+ assert(IsolatedSessionState.sessions.containsKey(uuid))
+ }
+}
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
index 535c0d2180d4..8bd6c5cf0168 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
@@ -134,7 +134,7 @@ object SparkConnectServerUtils {
// Testing SPARK-49673, setting maxBatchSize to 10MiB
s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
// Cache less sessions to save memory.
- "spark.executor.isolatedSessionCache.size=5",
+ "spark.executor.isolatedSessionCache.size=10",
// Disable UI
"spark.ui.enabled=false").flatMap(v => "--conf" :: v :: Nil)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]