This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 72e892dece26 [SPARK-55133][CONNECT] Fix race condition in 
IsolatedSessionState lifecycle management
72e892dece26 is described below

commit 72e892dece2634d9fbc7cfd0f073f092886a45ac
Author: Wenchen Fan <[email protected]>
AuthorDate: Fri Jan 30 22:24:07 2026 +0800

    [SPARK-55133][CONNECT] Fix race condition in IsolatedSessionState lifecycle 
management
    
    This PR fixes a race condition in `IsolatedSessionState` lifecycle 
management that could cause failures when 
`spark.executor.isolatedSessionCache.size` is set to a small value.
    
    Key changes:
    - Introduced `IsolatedSessionState.sessions` as the authoritative store for 
all isolated sessions, ensuring only one session exists per UUID at any time
    - Changed `refCount` and `evicted` from lock-free to synchronized access 
via a shared lock object to prevent race conditions between `acquire()`, 
`release()`, and `markEvicted()`
    - Added `acquire()` return value to indicate if the session was 
successfully acquired (returns false if already evicted)
    - Added `tryUnEvict()` method to allow reusing a deferred session that was 
evicted but still in use
    - Updated the cache loader to check the authoritative sessions map first 
and reuse existing sessions when possible
    
    When the isolated session cache is full and sessions are evicted, there's a 
race condition between:
    1. A task acquiring a session from the cache
    2. Another task triggering eviction of that session
    3. The evicted session being cleaned up (classloader closed, files deleted)
    
    This could cause:
    - `RemoteClassLoaderError` when trying to load classes with a closed 
classloader
    - `NoSuchFileException` when session files are deleted while still in use
    
    The fix ensures that:
    - Sessions are tracked in an authoritative map from creation until cleanup 
completes
    - Evicted sessions can be reused if still in use by other tasks
    - A task cannot acquire a session that's being cleaned up
    
    No.
    
    Existing tests and a new test suite
    
    Yes, cursor 2.3
    
    Closes #53914 from cloud-fan/session.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit 1da0e5357ad95f174f5e8b637d1948a488e017ac)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala | 201 ++++++++++++++++--
 .../ExecutorSideSessionManagementSuite.scala       | 233 +++++++++++++++++++++
 .../sql/connect/test/RemoteSparkSession.scala      |   2 +-
 3 files changed, 414 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index edab354b9607..c8843ac3427e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,7 @@ import java.net.{URI, URL, URLClassLoader}
 import java.nio.ByteBuffer
 import java.util.{Locale, Properties}
 import java.util.concurrent._
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 import javax.annotation.concurrent.GuardedBy
 
@@ -58,6 +58,91 @@ import org.apache.spark.storage.{StorageLevel, 
TaskResultBlockId}
 import org.apache.spark.util._
 import org.apache.spark.util.ArrayImplicits._
 
+private[spark] object IsolatedSessionState {
+  // Authoritative store for all isolated sessions. Sessions are put here when 
created
+  // and removed when cleanup runs. The Guava cache just tracks which sessions 
are
+  // "active" for LRU eviction policy, but this map is the source of truth.
+  // This ensures there's only ONE IsolatedSessionState per UUID at any time.
+  val sessions = new ConcurrentHashMap[String, IsolatedSessionState]()
+}
+
+/**
+ * Represents an isolated session state on the executor side, containing 
session-specific
+ * classloaders, files, jars, and archives. This class manages the lifecycle 
of these resources
+ * and prevents race conditions between concurrent task execution and cache 
eviction.
+ *
+ * == Architecture ==
+ *
+ * Sessions are managed through two mechanisms:
+ *  1. A Guava LRU cache (`isolatedSessionCache`) for active session lookup 
with size limits
+ *  2. An authoritative map (`IsolatedSessionState.sessions`) tracking all 
sessions until cleanup
+ *
+ * The Guava cache handles LRU eviction, while the authoritative map ensures 
there's only one
+ * IsolatedSessionState instance per UUID at any time and tracks sessions that 
are evicted but
+ * still in use.
+ *
+ * == State Machine ==
+ *
+ * Each session has two state variables protected by a synchronized lock:
+ *  - `refCount`: Number of tasks currently using this session
+ *  - `evicted`: Whether the session has been evicted from the Guava cache
+ *
+ * Valid state transitions:
+ * {{{
+ *   Common workflow (no contention):
+ *   [Created] --> acquire() --> [Active: refCount > 0]
+ *                                        |
+ *                                   release() (all tasks done)
+ *                                        |
+ *                                        v
+ *                               [Idle: refCount = 0]
+ *                                        |
+ *                               markEvicted() (cache eviction)
+ *                                        |
+ *                                        v
+ *                                    [Cleanup]
+ *
+ *   Contention case (eviction while tasks running):
+ *   [Active: refCount > 0] --> markEvicted() --> [Deferred: evicted = true]
+ *                                                       |
+ *             +---------------------------------------------+
+ *             |                                             |
+ *             v                                             v
+ *   release() (last task)                          tryUnEvict()
+ *             |                                             |
+ *             v                                             v
+ *         [Cleanup]                              [Active] (back in cache)
+ * }}}
+ *
+ * == Cleanup ==
+ *
+ * Cleanup happens when both conditions are met: `refCount == 0` AND `evicted 
== true`.
+ * This can occur either:
+ *  - Immediately when `markEvicted()` is called and no tasks are using the 
session
+ *  - Deferred when the last task calls `release()` after the session was 
evicted
+ *
+ * Cleanup closes the classloader, deletes session files, and removes the 
session
+ * from the authoritative map.
+ *
+ * == Concurrency Guarantees ==
+ *
+ * The key insight is that as long as a session is still in use (refCount > 
0), it
+ * remains in the authoritative map and we can get its instance. When a new 
task needs
+ * a session that was evicted from the LRU cache but is still in use:
+ *
+ *  - If cleanup has NOT started (refCount > 0): we can cancel the pending 
cleanup
+ *    via `tryUnEvict()`, put the instance back into the LRU cache, and safely 
reuse it.
+ *
+ *  - If cleanup HAS started (refCount became 0): cleanup runs synchronously 
under the
+ *    lock, so it must complete before any new task can proceed. Once cleanup 
finishes,
+ *    the session is removed from the authoritative map, and a fresh instance 
is created.
+ *
+ * This design ensures there is never a race where a task uses a session that 
is being
+ * or has been cleaned up. The `acquire()` and `tryUnEvict()` methods are 
intentionally
+ * separate: `tryUnEvict()` is only called from the cache loader to guarantee 
the
+ * session is put back into the LRU cache, maintaining the invariant that a 
non-evicted
+ * session is always in the cache.
+ */
 private[spark] class IsolatedSessionState(
     val sessionUUID: String,
     var urlClassLoader: MutableURLClassLoader,
@@ -68,29 +153,62 @@ private[spark] class IsolatedSessionState(
     val replClassDirUri: Option[String]) extends Logging {
 
   // Reference count for the number of running tasks using this session.
-  private val refCount: AtomicInteger = new AtomicInteger(0)
+  // Access is synchronized via `lock`.
+  private var refCount: Int = 0
 
   // Whether this session has been evicted from the cache.
-  @volatile private var evicted: Boolean = false
+  // Access is synchronized via `lock`.
+  private var evicted: Boolean = false
 
-  /** Increment the reference count, indicating a task is using this session. 
*/
-  def acquire(): Unit = refCount.incrementAndGet()
+  // Lock to synchronize all state changes.
+  private val lock = new Object
+
+  /**
+   * Increment the reference count, indicating a task is using this session.
+   * @return true if the session was successfully acquired, false if it was 
already evicted
+   */
+  def acquire(): Boolean = lock.synchronized {
+    if (evicted) {
+      false
+    } else {
+      refCount += 1
+      true
+    }
+  }
+
+  /**
+   * Try to un-evict this session so it can be reused.
+   * This is called from the cache loader to reuse a deferred session.
+   * The caller should call acquire() separately after the session is in cache.
+   * @return true if successfully un-evicted, false if already cleaned up or 
refCount is 0
+   */
+  def tryUnEvict(): Boolean = lock.synchronized {
+    if (evicted && refCount > 0) {
+      evicted = false
+      logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} un-evicted, " +
+        log"still in use by ${MDC(LogKeys.COUNT, refCount)} task(s)")
+      true
+    } else {
+      false
+    }
+  }
 
   /** Decrement the reference count. If evicted and no more tasks, clean up. */
-  def release(): Unit = {
-    if (refCount.decrementAndGet() == 0 && evicted) {
+  def release(): Unit = lock.synchronized {
+    refCount -= 1
+    if (refCount == 0 && evicted) {
       cleanup()
     }
   }
 
-  /** Mark this session as evicted. If no tasks are using it, clean up 
immediately. */
-  def markEvicted(): Unit = {
+  /** Mark this session as evicted. Cleans up immediately if refCount is 0. */
+  def markEvicted(): Unit = lock.synchronized {
     evicted = true
-    if (refCount.get() == 0) {
+    if (refCount == 0) {
       cleanup()
     } else {
       logInfo(log"Session ${MDC(SESSION_ID, sessionUUID)} evicted but still in 
use by " +
-        log"${MDC(LogKeys.COUNT, refCount.get())} task(s), deferring cleanup")
+        log"${MDC(LogKeys.COUNT, refCount)} task(s), deferring cleanup")
     }
   }
 
@@ -108,11 +226,15 @@ private[spark] class IsolatedSessionState(
         logWarning(log"Failed to close urlClassLoader for session " +
           log"${MDC(SESSION_ID, sessionUUID)}", e)
     }
+
     // Delete session files.
     val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), sessionUUID)
     if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
       Utils.deleteRecursively(sessionBasedRoot)
     }
+
+    // Remove from authoritative sessions map after cleanup
+    IsolatedSessionState.sessions.remove(sessionUUID)
     logInfo(log"Session cleaned up: ${MDC(SESSION_ID, sessionUUID)}")
   }
 }
@@ -241,13 +363,17 @@ private[spark] class Executor(
         isDefaultState(jobArtifactState.uuid))
     val replClassLoader = addReplClassLoaderIfNeeded(
       urlClassLoader, jobArtifactState.replClassDirUri, jobArtifactState.uuid)
-    new IsolatedSessionState(
+    val state = new IsolatedSessionState(
       jobArtifactState.uuid, urlClassLoader, replClassLoader,
       currentFiles,
       currentJars,
       currentArchives,
       jobArtifactState.replClassDirUri
     )
+    // Store in the authoritative sessions map immediately.
+    // This ensures there's only one session per UUID at any time.
+    IsolatedSessionState.sessions.put(jobArtifactState.uuid, state)
+    state
   }
 
   private def isStubbingEnabledForState(name: String) = {
@@ -258,7 +384,7 @@ private[spark] class Executor(
   private def isDefaultState(name: String) = name == "default"
 
   // Classloader isolation
-  // The default isolation group
+  // The default isolation group. Not in the cache, never evicted.
   val defaultSessionState: IsolatedSessionState = 
newSessionState(JobArtifactState("default", None))
 
   val isolatedSessionCache: Cache[String, IsolatedSessionState] = 
CacheBuilder.newBuilder()
@@ -270,8 +396,8 @@ private[spark] class Executor(
         val state = notification.getValue
         // Cache is always used for isolated sessions.
         assert(!isDefaultState(state.sessionUUID))
-        // Mark evicted - cleanup will happen immediately if no tasks are 
using it,
-        // or when the last task releases it.
+        // Mark evicted. The session stays in the authoritative sessions map 
until cleanup.
+        // If refCount > 0, cleanup is deferred until all tasks release.
         state.markEvicted()
       }
     })
@@ -625,18 +751,48 @@ private[spark] class Executor(
       (accums, accUpdates)
     }
 
+    /**
+     * Obtains an IsolatedSessionState for the given job artifact state.
+     * Gets or creates a session from the cache, then acquires it. We need to 
retry the cache
+     * lookup if the session was evicted between get() and acquire(). This can 
happen when the
+     * cache is full and another task triggers eviction.
+     */
+    private def obtainSession(jobArtifactState: JobArtifactState): 
IsolatedSessionState = {
+      var session: IsolatedSessionState = null
+      var acquired = false
+      while (!acquired) {
+        // Get or create session. The loader uses sessions map as the 
authoritative store.
+        // This ensures there's only one IsolatedSessionState per UUID at any 
time.
+        session = isolatedSessionCache.get(jobArtifactState.uuid, () => {
+          // Check the authoritative sessions map first. tryUnEvict() will 
block if
+          // cleanup is in progress, so when it returns false, the session is 
already
+          // removed from the map and it's safe to create a new one.
+          val existingSession = 
IsolatedSessionState.sessions.get(jobArtifactState.uuid)
+          if (existingSession != null && existingSession.tryUnEvict()) {
+            existingSession
+          } else {
+            newSessionState(jobArtifactState)
+          }
+        })
+        // acquire() can return false if session was evicted between get() and 
now.
+        // In that case, retry - the session is already removed from cache.
+        acquired = session.acquire()
+      }
+      session
+    }
+
     override def run(): Unit = {
 
       // Classloader isolation
       val isolatedSession = taskDescription.artifacts.state match {
         case Some(jobArtifactState) =>
-          isolatedSessionCache.get(jobArtifactState.uuid, () => 
newSessionState(jobArtifactState))
-        case _ => defaultSessionState
+          obtainSession(jobArtifactState)
+        case _ =>
+          // The default session is never in the cache and never evicted,
+          // so no need to acquire/release.
+          defaultSessionState
       }
 
-      // Pin the session to prevent its class loader from being closed while 
this task is running.
-      isolatedSession.acquire()
-
       setMDCForTask(taskName, mdcProperties)
       threadId = Thread.currentThread.getId
       Thread.currentThread.setName(threadName)
@@ -943,7 +1099,10 @@ private[spark] class Executor(
           metricsPoller.onTaskCompletion(taskId, task.stageId, 
task.stageAttemptId)
         }
         // Release the session reference. If evicted and this was the last 
task, cleanup happens.
-        isolatedSession.release()
+        // Skip for defaultSessionState since it's never evicted.
+        if (isolatedSession ne defaultSessionState) {
+          isolatedSession.release()
+        }
       }
     }
 
diff --git 
a/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
new file mode 100644
index 000000000000..f127951054e7
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSideSessionManagementSuite.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.io.File
+
+import scala.collection.mutable.HashMap
+
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfterEach
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
+import org.apache.spark.util.{MutableURLClassLoader, Utils}
+
+/**
+ * Unit tests for IsolatedSessionState lifecycle management.
+ * These tests verify the fix for race conditions in session 
acquire/release/eviction.
+ */
+class ExecutorSideSessionManagementSuite
+    extends SparkFunSuite
+    with BeforeAndAfterEach
+    with MockitoSugar {
+
+  private var testSessionCounter = 0
+  private var tempDir: File = _
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    // Clear the sessions map before each test
+    IsolatedSessionState.sessions.clear()
+    testSessionCounter = 0
+
+    // Set up a mock SparkEnv so that cleanup() can access 
SparkFiles.getRootDirectory()
+    tempDir = Utils.createTempDir()
+    val mockEnv = mock[SparkEnv]
+    val conf = new SparkConf(false)
+    when(mockEnv.conf).thenReturn(conf)
+    when(mockEnv.driverTmpDir).thenReturn(Some(tempDir.getAbsolutePath))
+    SparkEnv.set(mockEnv)
+  }
+
+  override def afterEach(): Unit = {
+    // Clear the sessions map after each test
+    IsolatedSessionState.sessions.clear()
+    SparkEnv.set(null)
+    if (tempDir != null && tempDir.exists()) {
+      Utils.deleteRecursively(tempDir)
+      tempDir = null
+    }
+    super.afterEach()
+  }
+
+  /**
+   * Creates a test IsolatedSessionState with a mock classloader and unique 
UUID.
+   */
+  private def createTestSession(uuid: String): IsolatedSessionState = {
+    val classLoader = new MutableURLClassLoader(
+      Array.empty,
+      Thread.currentThread().getContextClassLoader
+    )
+    val session = new IsolatedSessionState(
+      sessionUUID = uuid,
+      urlClassLoader = classLoader,
+      replClassLoader = classLoader,
+      currentFiles = new HashMap[String, Long](),
+      currentJars = new HashMap[String, Long](),
+      currentArchives = new HashMap[String, Long](),
+      replClassDirUri = None
+    )
+    // Register in authoritative sessions map as would happen in production
+    IsolatedSessionState.sessions.put(uuid, session)
+    session
+  }
+
+  private def nextUniqueUuid(): String = {
+    testSessionCounter += 1
+    s"test-uuid-$testSessionCounter"
+  }
+
+  test("acquire returns true for new session") {
+    val session = createTestSession(nextUniqueUuid())
+    assert(session.acquire())
+  }
+
+  test("acquire returns true for session acquired multiple times") {
+    val session = createTestSession(nextUniqueUuid())
+    assert(session.acquire())
+    assert(session.acquire())
+    assert(session.acquire())
+  }
+
+  test("acquire returns false after session is evicted with no references") {
+    val session = createTestSession(nextUniqueUuid())
+    session.markEvicted()
+    // Session should be cleaned up immediately since refCount is 0
+    assert(!IsolatedSessionState.sessions.containsKey(session.sessionUUID))
+    // Cannot acquire an evicted session.
+    assert(!session.acquire())
+  }
+
+  test("acquire returns false after session is evicted even with existing 
references") {
+    val uuid = nextUniqueUuid()
+    val session = createTestSession(uuid)
+
+    // First task acquires the session
+    assert(session.acquire())
+
+    // Session gets evicted (e.g., due to cache pressure)
+    session.markEvicted()
+    // Session should still be in the map because refCount > 0 (deferred 
cleanup)
+    assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+    // A new task tries to acquire the same session - should fail because it's 
evicted
+    assert(!session.acquire())
+
+    // The original task releases - now cleanup happens
+    session.release()
+    assert(!IsolatedSessionState.sessions.containsKey(uuid))
+  }
+
+  test("deferred cleanup with multiple references") {
+    val uuid = nextUniqueUuid()
+    val session = createTestSession(uuid)
+
+    // Acquire the session multiple times (simulating multiple tasks)
+    assert(session.acquire())
+    assert(session.acquire())
+    assert(session.acquire())
+
+    // Evict the session - cleanup should be deferred
+    session.markEvicted()
+    assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+    // Release twice - cleanup should still be deferred
+    session.release()
+    assert(IsolatedSessionState.sessions.containsKey(uuid))
+    session.release()
+    assert(IsolatedSessionState.sessions.containsKey(uuid))
+
+    // Release the last reference - cleanup should happen now
+    session.release()
+    assert(!IsolatedSessionState.sessions.containsKey(uuid))
+  }
+
+  test("tryUnEvict succeeds when session is evicted but still has references") 
{
+    val session = createTestSession(nextUniqueUuid())
+
+    // Acquire the session
+    assert(session.acquire())
+
+    // Evict the session
+    session.markEvicted()
+
+    // Try to un-evict - should succeed because refCount > 0
+    assert(session.tryUnEvict())
+
+    // Now acquire should succeed again
+    assert(session.acquire())
+  }
+
+  test("tryUnEvict fails when session is not evicted") {
+    val session = createTestSession(nextUniqueUuid())
+
+    // Acquire without eviction
+    assert(session.acquire())
+
+    // Try to un-evict - should fail because session is not evicted
+    assert(!session.tryUnEvict())
+  }
+
+  test("tryUnEvict and acquire fail when session has no references") {
+    val uuid = nextUniqueUuid()
+    val session = createTestSession(uuid)
+
+    // Evict with no references - triggers immediate cleanup
+    session.markEvicted()
+    assert(!IsolatedSessionState.sessions.containsKey(uuid))
+
+    // tryUnEvict should fail because refCount is 0 and session is already 
cleaned up
+    assert(!session.tryUnEvict())
+
+    // acquire should also fail
+    assert(!session.acquire())
+  }
+
+  test("session reuse via tryUnEvict keeps session in map when not evicted") {
+    // Note: This test verifies `IsolatedSessionState.sessions` behavior in 
isolation.
+    // In production, after tryUnEvict(), the session is put back into the 
Guava cache.
+    // When the cache eventually evicts it again (due to LRU policy), 
markEvicted() will be called,
+    // and cleanup will happen if refCount is 0. So there's no resource leak 
in practice.
+    val uuid = nextUniqueUuid()
+    val session = createTestSession(uuid)
+
+    // Simulate task 1 acquiring the session
+    assert(session.acquire())
+
+    // Session gets evicted (e.g., due to cache pressure)
+    session.markEvicted()
+    assert(IsolatedSessionState.sessions.containsKey(uuid)) // Deferred cleanup
+
+    // Simulate cache loader trying to reuse the session via tryUnEvict
+    assert(session.tryUnEvict())
+
+    // Now a new task can acquire the session
+    assert(session.acquire())
+
+    // Task 1 releases
+    session.release()
+    assert(IsolatedSessionState.sessions.containsKey(uuid)) // Still has 1 
reference
+
+    // Task 2 releases - session stays in map because it's not evicted
+    session.release()
+    // Session stays in map because it's not evicted anymore (was un-evicted).
+    // In production, the Guava cache would eventually evict it again, 
triggering cleanup.
+    assert(IsolatedSessionState.sessions.containsKey(uuid))
+  }
+}
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
index 535c0d2180d4..8bd6c5cf0168 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
@@ -134,7 +134,7 @@ object SparkConnectServerUtils {
       // Testing SPARK-49673, setting maxBatchSize to 10MiB
       s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
       // Cache less sessions to save memory.
-      "spark.executor.isolatedSessionCache.size=5",
+      "spark.executor.isolatedSessionCache.size=10",
       // Disable UI
       "spark.ui.enabled=false").flatMap(v => "--conf" :: v :: Nil)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to