This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5b5ff6d732ce [SPARK-54753][SQL] fix memory leak of ArtifactManager
5b5ff6d732ce is described below
commit 5b5ff6d732ce4a866f6e6e30821d5c257f903ecc
Author: xihuan_mstr <[email protected]>
AuthorDate: Wed Dec 31 10:57:04 2025 +0900
[SPARK-54753][SQL] fix memory leak of ArtifactManager
### What changes were proposed in this pull request?
As stated in
https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/lang/ref/Cleaner.html
**The cleaning action could be a lambda but all too easily will capture the
object reference, by referring to fields of the object being cleaned,
preventing the object from becoming phantom reachable. Using a static nested
class, as above, will avoid accidentally retaining the object reference.**
For more details, and the test and analysis are in
https://issues.apache.org/jira/browse/SPARK-54753
<img width="1462" height="559" alt="image"
src="https://github.com/user-attachments/assets/83de9e8e-8f63-41fe-8318-b1cea6a1de9c"
/>
After running with Spark 4.0.1, the ArtififactManager is leaked, its
referenced SessionState/SparkSession is as well leaked.
### Why are the changes needed?
use a separate class to ref the cleanup state
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
with test program in https://issues.apache.org/jira/browse/SPARK-54753,
and use Visual VM to monitor the memory usage
### Was this patch authored or co-authored using generative AI tooling?
No
cc dongjoon-hyun pranavdev022 hvanhovell vicennial HyukjinKwon
Closes #53591 from scottme/master.
Lead-authored-by: xihuan_mstr <[email protected]>
Co-authored-by: scottme <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/client/CheckConnectJvmClientCompatibility.scala | 2 ++
.../scala/org/apache/spark/sql/artifact/ArtifactManager.scala | 9 +++++++--
2 files changed, 9 insertions(+), 2 deletions(-)
diff --git
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 92adc8eb9346..1a6fd7926daf 100644
---
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -234,6 +234,8 @@ object CheckConnectJvmClientCompatibility {
"org.apache.spark.sql.artifact.ArtifactManager$"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.ArtifactManager$SparkContextResourceType$"),
+ ProblemFilters.exclude[MissingClassProblem](
+ "org.apache.spark.sql.artifact.ArtifactManager$StateCleanupRunner"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.artifact.RefCountedCacheId"),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
index 346cdb832c3f..e11f0f99bf2f 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/artifact/ArtifactManager.scala
@@ -422,8 +422,7 @@ class ArtifactManager(session: SparkSession) extends
AutoCloseable with Logging
artifactPath)
// Ensure that no reference to `this` is captured/help by the cleanup lambda
private def getCleanable: Cleaner.Cleanable = cleaner.register(
- this,
- () =>
ArtifactManager.cleanUpGlobalResources(cleanUpStateForGlobalResources)
+ this, new StateCleanupRunner(cleanUpStateForGlobalResources)
)
private var cleanable = getCleanable
@@ -529,6 +528,12 @@ object ArtifactManager extends Logging {
val JAR, FILE, ARCHIVE = Value
}
+ private class StateCleanupRunner(cleanupState: ArtifactStateForCleanup)
extends Runnable {
+ override def run(): Unit = {
+ ArtifactManager.cleanUpGlobalResources(cleanupState)
+ }
+ }
+
// Shared cleaner instance
private val cleaner: Cleaner = Cleaner.create()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]