This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e9d64295aa9 [SPARK-52995][YARN] Use Buffered I/O for creating spark jar archive 2e9d64295aa9 is described below commit 2e9d64295aa996347167654408fead0a8230866a Author: Kent Yao <y...@apache.org> AuthorDate: Tue Jul 29 08:13:06 2025 -0700 [SPARK-52995][YARN] Use Buffered I/O for creating spark jar archive ### What changes were proposed in this pull request? Use BufferedInputStream/BufferedOutputStream to read spark_home/jars to a zip archive ### Why are the changes needed? We can improve the I/O efficiency and reduce system call costs, especially for cases where `spark-submit` is called on busy gateway nodes ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Tested locally with logically the same code snippets. #### Before ```scala scala> import java.io.{File, FileOutputStream} | import java.nio.file.Files | import java.util.UUID | import java.util.zip.{ZipEntry, ZipOutputStream} | | | def timeTakenMs[T](body: => T): Unit = { | val startTime = System.currentTimeMillis() | body | val endTime = System.currentTimeMillis()// | // scalastyle: off println | println(s"Time taken: ${endTime - startTime} ms") | } | | timeTakenMs { | val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars") | val jarsArchive = File.createTempFile(UUID.randomUUID().toString, ".zip", | new File("/Users/hzyaoqin/spark/dist/logs")) | val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive)) | jarsStream.setLevel(0) | jarsDir.listFiles().foreach { f => | if (f.isFile) { | jarsStream.putNextEntry(new ZipEntry(f.getName)) | Files.copy(f.toPath, jarsStream) | jarsStream.closeEntry() | } | } | } ``` - Time taken: 2218 ms - Time taken: 2253 ms - Time taken: 2211 ms - Time taken: 2318 ms #### After ```scala scala> | import java.io.{BufferedInputStream, BufferedOutputStream, File, FileInputStream, FileOutputStream} | import java.nio.file.Files | import java.util.UUID | import java.util.zip.{ZipEntry, ZipOutputStream} | | import scala.util.Using | | def timeTakenMs[T](body: => T): Unit = { | val startTime = System.currentTimeMillis() | body | val endTime = System.currentTimeMillis()// | // scalastyle: off println | println(s"Time taken: ${endTime - startTime} ms") | } | | timeTakenMs { | val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars") | val jarsArchive = File.createTempFile(UUID.randomUUID().toString, ".zip", | new File("/Users/hzyaoqin/spark/dist/logs")) | Using.resource(new ZipOutputStream( | new BufferedOutputStream(new FileOutputStream(jarsArchive), 65536))) { jarsStream => | jarsStream.setLevel(0) | jarsDir.listFiles().foreach { f => | if (f.isFile) { | jarsStream.putNextEntry(new ZipEntry(f.getName)) | Using.resource(new BufferedInputStream(new FileInputStream(f), 65536)) { in => | in.transferTo(jarsStream) | } | jarsStream.closeEntry() | } | } | } | } ``` - Time taken: 490 ms - Time taken: 486 ms - Time taken: 476 ms - Time taken: 485 ms ### Was this patch authored or co-authored using generative AI tooling? no Closes #51704 from yaooqinn/SPARK-52995. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/deploy/yarn/Client.scala | 27 +++++++++++----------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8b621e82afe2..514bc41a7df1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.io.{File, FileFilter, FileNotFoundException, FileOutputStream, InterruptedIOException, IOException, OutputStreamWriter} +import java.io.{BufferedInputStream, BufferedOutputStream, File, FileFilter, FileInputStream, FileNotFoundException, FileOutputStream, InterruptedIOException, IOException, OutputStreamWriter} import java.net.{InetAddress, UnknownHostException, URI, URL} import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -28,6 +28,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.immutable.{Map => IMap} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.jdk.CollectionConverters._ +import scala.util.Using import scala.util.control.NonFatal import com.google.common.base.Objects @@ -711,21 +712,21 @@ private[spark] class Client( sparkConf.getenv("SPARK_HOME"))) val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip", new File(Utils.getLocalDir(sparkConf))) - val jarsStream = new ZipOutputStream(new FileOutputStream(jarsArchive)) - - try { - jarsStream.setLevel(0) - jarsDir.listFiles().foreach { f => - if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) { - jarsStream.putNextEntry(new ZipEntry(f.getName)) - Files.copy(f.toPath, jarsStream) + val bufferSize = sparkConf.get(BUFFER_SIZE) + Using.resource(new ZipOutputStream( + new BufferedOutputStream(new FileOutputStream(jarsArchive), bufferSize))) { + jarsStream => + jarsStream.setLevel(0) + jarsDir.listFiles().foreach { f => + if (f.isFile && f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) { + jarsStream.putNextEntry(new ZipEntry(f.getName)) + Using.resource(new BufferedInputStream(new FileInputStream(f), bufferSize)) { + _.transferTo(jarsStream) + } + } jarsStream.closeEntry() } - } - } finally { - jarsStream.close() } - distribute(jarsArchive.toURI.getPath, resType = LocalResourceType.ARCHIVE, destName = Some(LOCALIZED_LIB_DIR)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org