This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 86c1be50376 [SPARK-45014][CONNECT] Clean up fileserver when cleaning 
up files, jars and archives in SparkContext
86c1be50376 is described below

commit 86c1be503768dd1275f4d000fedd62ad6e9e30db
Author: Hyukjin Kwon <[email protected]>
AuthorDate: Thu Aug 31 15:21:19 2023 +0900

    [SPARK-45014][CONNECT] Clean up fileserver when cleaning up files, jars and 
archives in SparkContext
    
    This PR proposes to clean up the files, jars and archives added via Spark 
Connect sessions.
    
    In [SPARK-44348](https://issues.apache.org/jira/browse/SPARK-44348), we 
clean up Spark Context's added files but we don't clean up the ones in 
fileserver.
    
    Yes, it will avoid slowly growing memory within the file server.
    
    Manually tested. Also existing tests should not be broken.
    
    No.
    
    Closes #42731 from HyukjinKwon/SPARK-45014.
    
    Authored-by: Hyukjin Kwon <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
    (cherry picked from commit 9a023c479c6a91a602f96ccabba398223c04b3d1)
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 .../sql/connect/artifact/SparkConnectArtifactManager.scala  | 10 ++++++----
 core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala       | 13 +++++++++++++
 .../org/apache/spark/rpc/netty/NettyStreamManager.scala     |  4 ++++
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
index a2df11eeb58..fee99532bd5 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala
@@ -208,12 +208,14 @@ class SparkConnectArtifactManager(sessionHolder: 
SessionHolder) extends Logging
         s"sessionId: ${sessionHolder.sessionId}")
 
     // Clean up added files
-    sessionHolder.session.sparkContext.addedFiles.remove(state.uuid)
-    sessionHolder.session.sparkContext.addedArchives.remove(state.uuid)
-    sessionHolder.session.sparkContext.addedJars.remove(state.uuid)
+    val fileserver = SparkEnv.get.rpcEnv.fileServer
+    val sparkContext = sessionHolder.session.sparkContext
+    
sparkContext.addedFiles.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+    
sparkContext.addedArchives.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeFile))
+    
sparkContext.addedJars.remove(state.uuid).foreach(_.keys.foreach(fileserver.removeJar))
 
     // Clean up cached relations
-    val blockManager = sessionHolder.session.sparkContext.env.blockManager
+    val blockManager = sparkContext.env.blockManager
     blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)
 
     // Clean up artifacts folder
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index 2fce2889c09..2575cffdeb3 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -206,6 +206,19 @@ private[spark] trait RpcEnvFileServer {
     fixedBaseUri
   }
 
+  /**
+   * Removes a file from this RpcEnv.
+   *
+   * @param key Local file to remove.
+   */
+  def removeFile(key: String): Unit
+
+  /**
+   * Removes a jar to from this RpcEnv.
+   *
+   * @param key Local jar to remove.
+   */
+  def removeJar(key: String): Unit
 }
 
 private[spark] case class RpcEnvConfig(
diff --git 
a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala 
b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
index 57243133aba..9ac14f34836 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala
@@ -43,6 +43,10 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv)
   private val jars = new ConcurrentHashMap[String, File]()
   private val dirs = new ConcurrentHashMap[String, File]()
 
+  override def removeFile(key: String): Unit = files.remove(key)
+
+  override def removeJar(key: String): Unit = jars.remove(key)
+
   override def getChunk(streamId: Long, chunkIndex: Int): ManagedBuffer = {
     throw new UnsupportedOperationException()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to