This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 9a023c479c6 [SPARK-45014][CONNECT] Clean up fileserver when cleaning
up files, jars and archives in SparkContext
9a023c479c6 is described below
commit 9a023c479c6a91a602f96ccabba398223c04b3d1
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Aug 31 15:21:19 2023 +0900
[SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, jars and
archives in SparkContext
### What changes were proposed in this pull request?
This PR proposes to clean up the files, jars and archives added via Spark
Connect sessions.
### Why are the changes needed?
In [SPARK-44348](https://issues.apache.org/jira/browse/SPARK-44348), we
clean up Spark Context's added files but we don't clean up the ones in
fileserver.
### Does this PR introduce _any_ user-facing change?
Yes, it will avoid slowly growing memory within the file server.
### How was this patch tested?
Manually tested. Also existing tests should not be broken.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #42731 from HyukjinKwon/SPARK-45014.
Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
.../sql/connect/artifact/SparkConnectArtifactManager.scala | 10 ++++++----
core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala | 13 +++++++++++++
.../org/apache/spark/rpc/netty/NettyStreamManager.scala | 4 ++++
3 files changed, 23 insertions(+), 4 deletions(-)
diff --git
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index a2df11eeb58..fee99532bd5 100644
---
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -208,12 +208,14 @@ class SparkConnectArtifactManager(sessionHolder:
SessionHolder) extends Logging
s"sessionId: ${sessionHolder.sessionId}")
// Clean up added files
- sessionHolder.session.sparkContext.addedFiles.remove(state.uuid)
- sessionHolder.session.sparkContext.addedArchives.remove(state.uuid)
- sessionHolder.session.sparkContext.addedJars.remove(state.uuid)
+ val fileserver = SparkEnv.get.rpcEnv.fileServer
+ val sparkContext = sessionHolder.session.sparkContext
+
sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+
sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+
sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))
// Clean up cached relations
- val blockManager = sessionHolder.session.sparkContext.env.blockManager
+ val blockManager = sparkContext.env.blockManager
blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)
// Clean up artifacts folder
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 2fce2889c09..2575cffdeb3 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -206,6 +206,19 @@ private[spark] trait RpcEnvFileServer {
fixedBaseUri
}
+ /**
+ * Removes a file from this RpcEnv.
+ *
+ * @param key Local file to remove.
+ */
+ def removeFile(key: String): Unit
+
+ /**
+ * Removes a jar to from this RpcEnv.
+ *
+ * @param key Local jar to remove.
+ */
+ def removeJar(key: String): Unit
}
private[spark] case class RpcEnvConfig(
diff --git
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 57243133aba..9ac14f34836 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -43,6 +43,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
private val jars = new ConcurrentHashMap[String, File]()
private val dirs = new ConcurrentHashMap[String, File]()
+ override def removeFile(key: String): Unit = files.remove(key)
+
+ override def removeJar(key: String): Unit = jars.remove(key)
+
override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
throw new UnsupportedOperationException()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]