This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 2128df848723 [SPARK-54528][CONNECT] Close URLClassLoader eagerly to 
avoid OOM
2128df848723 is described below

commit 2128df84872367d4c4816e3d93243bd194b206b6
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Nov 26 11:59:13 2025 -0800

    [SPARK-54528][CONNECT] Close URLClassLoader eagerly to avoid OOM
    
    ### What changes were proposed in this pull request?
    
    In Spark Connect, every client session has its own class loader in the 
server side (both driver and executors), for isolation purpose. However, when 
closing the session, we don't close the class loader. We rely on GC to close 
the class loader and release the resource, but it's unpredicatable and not 
efficient. When sessions are being created frequently, OOM may happen.
    
    This PR fixes the issue by closing URLClassLoader eagerly when closing a 
session. It also adds a config so that we can tune the session cache size in 
the executor side to save memory. With this patch, OOM is gone.
    
    ### Why are the changes needed?
    
    avoid OOM
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Our internal CI service has limited memory and 
`SparkConnectJdbcDataTypeSuite` keeps failing due to OOM. This test suite 
creates and closes sessions back to back frequently, and GC can't catch up to 
release the class loaders.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Generated-by: Cursor 2.1.36
    
    Closes #53233 from cloud-fan/mem.
    
    Lead-authored-by: Wenchen Fan <[email protected]>
    Co-authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit e2649651d78d7d5817ceca729b136d5bf9a00ff9)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../scala/org/apache/spark/executor/Executor.scala     | 18 ++++++++++++++++--
 .../org/apache/spark/internal/config/package.scala     |  9 +++++++++
 .../spark/sql/connect/test/RemoteSparkSession.scala    |  2 ++
 .../apache/spark/sql/artifact/ArtifactManager.scala    | 15 ++++++++++++++-
 4 files changed, 41 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fc22107e008b..04e966294336 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -20,7 +20,7 @@ package org.apache.spark.executor
 import java.io.{File, NotSerializableException}
 import java.lang.Thread.UncaughtExceptionHandler
 import java.lang.management.ManagementFactory
-import java.net.{URI, URL}
+import java.net.{URI, URL, URLClassLoader}
 import java.nio.ByteBuffer
 import java.util.{Locale, Properties}
 import java.util.concurrent._
@@ -212,7 +212,7 @@ private[spark] class Executor(
   val defaultSessionState: IsolatedSessionState = 
newSessionState(JobArtifactState("default", None))
 
   val isolatedSessionCache: Cache[String, IsolatedSessionState] = 
CacheBuilder.newBuilder()
-    .maximumSize(100)
+    .maximumSize(conf.get(EXECUTOR_ISOLATED_SESSION_CACHE_SIZE))
     .expireAfterAccess(30, TimeUnit.MINUTES)
     .removalListener(new RemovalListener[String, IsolatedSessionState]() {
       override def onRemoval(
@@ -220,6 +220,20 @@ private[spark] class Executor(
         val state = notification.getValue
         // Cache is always used for isolated sessions.
         assert(!isDefaultState(state.sessionUUID))
+        // Close the urlClassLoader to release resources.
+        try {
+          state.urlClassLoader match {
+            case urlClassLoader: URLClassLoader =>
+              urlClassLoader.close()
+              logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted 
session " +
+                log"${MDC(SESSION_ID, state.sessionUUID)}")
+            case _ =>
+          }
+        } catch {
+          case NonFatal(e) =>
+            logWarning(log"Failed to close urlClassLoader for session " +
+              log"${MDC(SESSION_ID, state.sessionUUID)}", e)
+        }
         val sessionBasedRoot = new File(SparkFiles.getRootDirectory(), 
state.sessionUUID)
         if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
           Utils.deleteRecursively(sessionBasedRoot)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9876848f654a..8b32c18aa3b6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -358,6 +358,15 @@ package object config {
       .intConf
       .createWithDefault(60)
 
+  private[spark] val EXECUTOR_ISOLATED_SESSION_CACHE_SIZE =
+    ConfigBuilder("spark.executor.isolatedSessionCache.size")
+      .doc("Maximum number of isolated sessions to cache in the executor. Each 
cached session " +
+        "maintains its own classloader for artifact isolation.")
+      .version("4.1.0")
+      .intConf
+      .checkValue(_ > 0, "The cache size must be positive.")
+      .createWithDefault(100)
+
   private[spark] val EXECUTOR_PROCESS_TREE_METRICS_ENABLED =
     ConfigBuilder("spark.executor.processTreeMetrics.enabled")
       .doc("Whether to collect process tree metrics (from the /proc 
filesystem) when collecting " +
diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
index 6d8d2edcf082..efac3bc7561f 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
@@ -135,6 +135,8 @@ object SparkConnectServerUtils {
       "spark.connect.execute.reattachable.senderMaxStreamSize=123",
       // Testing SPARK-49673, setting maxBatchSize to 10MiB
       s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
+      // Cache less sessions to save memory.
+      "spark.executor.isolatedSessionCache.size=5",
       // Disable UI
       "spark.ui.enabled=false").flatMap(v => "--conf" :: v :: Nil)
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 5889fe581d4e..346cdb832c3f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -450,7 +450,20 @@ class ArtifactManager(session: SparkSession) extends 
AutoCloseable with Logging
     pythonIncludeList.clear()
     sparkContextRelativePaths.clear()
 
-    // Removed cached classloader
+    // Close and remove cached classloader
+    cachedClassLoader.foreach {
+      case urlClassLoader: URLClassLoader =>
+        try {
+          urlClassLoader.close()
+          logDebug(log"Closed URLClassLoader for session " +
+            log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}")
+        } catch {
+          case e: IOException =>
+            logWarning(log"Failed to close URLClassLoader for session " +
+              log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}", e)
+        }
+      case _ =>
+    }
     cachedClassLoader = None
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to