This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e9d64295aa9 [SPARK-52995][YARN] Use Buffered I/O for creating spark 
jar archive
2e9d64295aa9 is described below

commit 2e9d64295aa996347167654408fead0a8230866a
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Jul 29 08:13:06 2025 -0700

    [SPARK-52995][YARN] Use Buffered I/O for creating spark jar archive
    
    ### What changes were proposed in this pull request?
    
    Use BufferedInputStream/BufferedOutputStream to read spark_home/jars to a 
zip archive
    
    ### Why are the changes needed?
    
    We can improve the I/O efficiency and reduce system call costs, especially 
for cases where `spark-submit` is called on busy gateway nodes
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    
    Tested locally with logically the same code snippets.
    
    #### Before
    
    ```scala
    scala> import java.io.{File, FileOutputStream}
         | import java.nio.file.Files
         | import java.util.UUID
         | import java.util.zip.{ZipEntry, ZipOutputStream}
         |
         |
         | def timeTakenMs[T](body: => T): Unit = {
         |   val startTime = System.currentTimeMillis()
         |   body
         |   val endTime = System.currentTimeMillis()//
         |   // scalastyle: off println
         |   println(s"Time taken: ${endTime - startTime} ms")
         | }
         |
         | timeTakenMs {
         |   val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars")
         |   val jarsArchive = File.createTempFile(UUID.randomUUID().toString, 
".zip",
         |     new File("/Users/hzyaoqin/spark/dist/logs"))
         |   val jarsStream = new ZipOutputStream(new 
FileOutputStream(jarsArchive))
         |   jarsStream.setLevel(0)
         |   jarsDir.listFiles().foreach { f =>
         |     if (f.isFile) {
         |       jarsStream.putNextEntry(new ZipEntry(f.getName))
         |       Files.copy(f.toPath, jarsStream)
         |       jarsStream.closeEntry()
         |     }
         |   }
         | }
    ```
    
    - Time taken: 2218 ms
    - Time taken: 2253 ms
    - Time taken: 2211 ms
    - Time taken: 2318 ms
    
    #### After
    ```scala
    scala>
         | import java.io.{BufferedInputStream, BufferedOutputStream, File, 
FileInputStream, FileOutputStream}
         | import java.nio.file.Files
         | import java.util.UUID
         | import java.util.zip.{ZipEntry, ZipOutputStream}
         |
         | import scala.util.Using
         |
         | def timeTakenMs[T](body: => T): Unit = {
         |   val startTime = System.currentTimeMillis()
         |   body
         |   val endTime = System.currentTimeMillis()//
         |   // scalastyle: off println
         |   println(s"Time taken: ${endTime - startTime} ms")
         | }
         |
         | timeTakenMs {
         |   val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars")
         |   val jarsArchive = File.createTempFile(UUID.randomUUID().toString, 
".zip",
         |     new File("/Users/hzyaoqin/spark/dist/logs"))
         |   Using.resource(new ZipOutputStream(
         |     new BufferedOutputStream(new FileOutputStream(jarsArchive), 
65536))) { jarsStream =>
         |     jarsStream.setLevel(0)
         |     jarsDir.listFiles().foreach { f =>
         |       if (f.isFile) {
         |         jarsStream.putNextEntry(new ZipEntry(f.getName))
         |         Using.resource(new BufferedInputStream(new 
FileInputStream(f), 65536)) { in =>
         |           in.transferTo(jarsStream)
         |         }
         |         jarsStream.closeEntry()
         |       }
         |     }
         |   }
         | }
    ```
    - Time taken: 490 ms
    - Time taken: 486 ms
    - Time taken: 476 ms
    - Time taken: 485 ms
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #51704 from yaooqinn/SPARK-52995.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/deploy/yarn/Client.scala      | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b621e82afe2..514bc41a7df1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{File, FileFilter, FileNotFoundException, FileOutputStream, 
InterruptedIOException, IOException, OutputStreamWriter}
+import java.io.{BufferedInputStream, BufferedOutputStream, File, FileFilter, 
FileInputStream, FileNotFoundException, FileOutputStream, 
InterruptedIOException, IOException, OutputStreamWriter}
 import java.net.{InetAddress, UnknownHostException, URI, URL}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
@@ -28,6 +28,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 import scala.collection.immutable.{Map => IMap}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, 
Map}
 import scala.jdk.CollectionConverters._
+import scala.util.Using
 import scala.util.control.NonFatal
 
 import com.google.common.base.Objects
@@ -711,21 +712,21 @@ private[spark] class Client(
             sparkConf.getenv("SPARK_HOME")))
           val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
             new File(Utils.getLocalDir(sparkConf)))
-          val jarsStream = new ZipOutputStream(new 
FileOutputStream(jarsArchive))
-
-          try {
-            jarsStream.setLevel(0)
-            jarsDir.listFiles().foreach { f =>
-              if (f.isFile && 
f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
-                jarsStream.putNextEntry(new ZipEntry(f.getName))
-                Files.copy(f.toPath, jarsStream)
+          val bufferSize = sparkConf.get(BUFFER_SIZE)
+          Using.resource(new ZipOutputStream(
+            new BufferedOutputStream(new FileOutputStream(jarsArchive), 
bufferSize))) {
+            jarsStream =>
+              jarsStream.setLevel(0)
+              jarsDir.listFiles().foreach { f =>
+                if (f.isFile && 
f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
+                  jarsStream.putNextEntry(new ZipEntry(f.getName))
+                  Using.resource(new BufferedInputStream(new 
FileInputStream(f), bufferSize)) {
+                    _.transferTo(jarsStream)
+                  }
+                }
                 jarsStream.closeEntry()
               }
-            }
-          } finally {
-            jarsStream.close()
           }
-
           distribute(jarsArchive.toURI.getPath,
             resType = LocalResourceType.ARCHIVE,
             destName = Some(LOCALIZED_LIB_DIR))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to