This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 2128df848723 [SPARK-54528][CONNECT] Close URLClassLoader eagerly to
avoid OOM
2128df848723 is described below
commit 2128df84872367d4c4816e3d93243bd194b206b6
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Nov 26 11:59:13 2025 -0800
[SPARK-54528][CONNECT] Close URLClassLoader eagerly to avoid OOM
### What changes were proposed in this pull request?
In Spark Connect, every client session has its own class loader in the
server side (both driver and executors), for isolation purpose. However, when
closing the session, we don't close the class loader. We rely on GC to close
the class loader and release the resource, but it's unpredicatable and not
efficient. When sessions are being created frequently, OOM may happen.
This PR fixes the issue by closing URLClassLoader eagerly when closing a
session. It also adds a config so that we can tune the session cache size in
the executor side to save memory. With this patch, OOM is gone.
### Why are the changes needed?
avoid OOM
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Our internal CI service has limited memory and
`SparkConnectJdbcDataTypeSuite` keeps failing due to OOM. This test suite
creates and closes sessions back to back frequently, and GC can't catch up to
release the class loaders.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Cursor 2.1.36
Closes #53233 from cloud-fan/mem.
Lead-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e2649651d78d7d5817ceca729b136d5bf9a00ff9)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../scala/org/apache/spark/executor/Executor.scala | 18 ++++++++++++++++--
.../org/apache/spark/internal/config/package.scala | 9 +++++++++
.../spark/sql/connect/test/RemoteSparkSession.scala | 2 ++
.../apache/spark/sql/artifact/ArtifactManager.scala | 15 ++++++++++++++-
4 files changed, 41 insertions(+), 3 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index fc22107e008b..04e966294336 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -20,7 +20,7 @@ package org.apache.spark.executor
import java.io.{File, NotSerializableException}
import java.lang.Thread.UncaughtExceptionHandler
import java.lang.management.ManagementFactory
-import java.net.{URI, URL}
+import java.net.{URI, URL, URLClassLoader}
import java.nio.ByteBuffer
import java.util.{Locale, Properties}
import java.util.concurrent._
@@ -212,7 +212,7 @@ private[spark] class Executor(
val defaultSessionState: IsolatedSessionState =
newSessionState(JobArtifactState("default", None))
val isolatedSessionCache: Cache[String, IsolatedSessionState] =
CacheBuilder.newBuilder()
- .maximumSize(100)
+ .maximumSize(conf.get(EXECUTOR_ISOLATED_SESSION_CACHE_SIZE))
.expireAfterAccess(30, TimeUnit.MINUTES)
.removalListener(new RemovalListener[String, IsolatedSessionState]() {
override def onRemoval(
@@ -220,6 +220,20 @@ private[spark] class Executor(
val state = notification.getValue
// Cache is always used for isolated sessions.
assert(!isDefaultState(state.sessionUUID))
+ // Close the urlClassLoader to release resources.
+ try {
+ state.urlClassLoader match {
+ case urlClassLoader: URLClassLoader =>
+ urlClassLoader.close()
+ logInfo(log"Closed urlClassLoader (URLClassLoader) for evicted
session " +
+ log"${MDC(SESSION_ID, state.sessionUUID)}")
+ case _ =>
+ }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(log"Failed to close urlClassLoader for session " +
+ log"${MDC(SESSION_ID, state.sessionUUID)}", e)
+ }
val sessionBasedRoot = new File(SparkFiles.getRootDirectory(),
state.sessionUUID)
if (sessionBasedRoot.isDirectory && sessionBasedRoot.exists()) {
Utils.deleteRecursively(sessionBasedRoot)
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 9876848f654a..8b32c18aa3b6 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -358,6 +358,15 @@ package object config {
.intConf
.createWithDefault(60)
+ private[spark] val EXECUTOR_ISOLATED_SESSION_CACHE_SIZE =
+ ConfigBuilder("spark.executor.isolatedSessionCache.size")
+ .doc("Maximum number of isolated sessions to cache in the executor. Each
cached session " +
+ "maintains its own classloader for artifact isolation.")
+ .version("4.1.0")
+ .intConf
+ .checkValue(_ > 0, "The cache size must be positive.")
+ .createWithDefault(100)
+
private[spark] val EXECUTOR_PROCESS_TREE_METRICS_ENABLED =
ConfigBuilder("spark.executor.processTreeMetrics.enabled")
.doc("Whether to collect process tree metrics (from the /proc
filesystem) when collecting " +
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
index 6d8d2edcf082..efac3bc7561f 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala
@@ -135,6 +135,8 @@ object SparkConnectServerUtils {
"spark.connect.execute.reattachable.senderMaxStreamSize=123",
// Testing SPARK-49673, setting maxBatchSize to 10MiB
s"spark.connect.grpc.arrow.maxBatchSize=${10 * 1024 * 1024}",
+ // Cache less sessions to save memory.
+ "spark.executor.isolatedSessionCache.size=5",
// Disable UI
"spark.ui.enabled=false").flatMap(v => "--conf" :: v :: Nil)
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 5889fe581d4e..346cdb832c3f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -450,7 +450,20 @@ class ArtifactManager(session: SparkSession) extends
AutoCloseable with Logging
pythonIncludeList.clear()
sparkContextRelativePaths.clear()
- // Removed cached classloader
+ // Close and remove cached classloader
+ cachedClassLoader.foreach {
+ case urlClassLoader: URLClassLoader =>
+ try {
+ urlClassLoader.close()
+ logDebug(log"Closed URLClassLoader for session " +
+ log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}")
+ } catch {
+ case e: IOException =>
+ logWarning(log"Failed to close URLClassLoader for session " +
+ log"${MDC(LogKeys.SESSION_ID, session.sessionUUID)}", e)
+ }
+ case _ =>
+ }
cachedClassLoader = None
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]