This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 2e9d64295aa9 [SPARK-52995][YARN] Use Buffered I/O for creating spark
jar archive
2e9d64295aa9 is described below
commit 2e9d64295aa996347167654408fead0a8230866a
Author: Kent Yao <[email protected]>
AuthorDate: Tue Jul 29 08:13:06 2025 -0700
[SPARK-52995][YARN] Use Buffered I/O for creating spark jar archive
### What changes were proposed in this pull request?
Use BufferedInputStream/BufferedOutputStream to read spark_home/jars to a
zip archive
### Why are the changes needed?
We can improve the I/O efficiency and reduce system call costs, especially
for cases where `spark-submit` is called on busy gateway nodes
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Tested locally with logically the same code snippets.
#### Before
```scala
scala> import java.io.{File, FileOutputStream}
| import java.nio.file.Files
| import java.util.UUID
| import java.util.zip.{ZipEntry, ZipOutputStream}
|
|
| def timeTakenMs[T](body: => T): Unit = {
| val startTime = System.currentTimeMillis()
| body
| val endTime = System.currentTimeMillis()//
| // scalastyle: off println
| println(s"Time taken: ${endTime - startTime} ms")
| }
|
| timeTakenMs {
| val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars")
| val jarsArchive = File.createTempFile(UUID.randomUUID().toString,
".zip",
| new File("/Users/hzyaoqin/spark/dist/logs"))
| val jarsStream = new ZipOutputStream(new
FileOutputStream(jarsArchive))
| jarsStream.setLevel(0)
| jarsDir.listFiles().foreach { f =>
| if (f.isFile) {
| jarsStream.putNextEntry(new ZipEntry(f.getName))
| Files.copy(f.toPath, jarsStream)
| jarsStream.closeEntry()
| }
| }
| }
```
- Time taken: 2218 ms
- Time taken: 2253 ms
- Time taken: 2211 ms
- Time taken: 2318 ms
#### After
```scala
scala>
| import java.io.{BufferedInputStream, BufferedOutputStream, File,
FileInputStream, FileOutputStream}
| import java.nio.file.Files
| import java.util.UUID
| import java.util.zip.{ZipEntry, ZipOutputStream}
|
| import scala.util.Using
|
| def timeTakenMs[T](body: => T): Unit = {
| val startTime = System.currentTimeMillis()
| body
| val endTime = System.currentTimeMillis()//
| // scalastyle: off println
| println(s"Time taken: ${endTime - startTime} ms")
| }
|
| timeTakenMs {
| val jarsDir = new File("/Users/hzyaoqin/spark/dist/jars")
| val jarsArchive = File.createTempFile(UUID.randomUUID().toString,
".zip",
| new File("/Users/hzyaoqin/spark/dist/logs"))
| Using.resource(new ZipOutputStream(
| new BufferedOutputStream(new FileOutputStream(jarsArchive),
65536))) { jarsStream =>
| jarsStream.setLevel(0)
| jarsDir.listFiles().foreach { f =>
| if (f.isFile) {
| jarsStream.putNextEntry(new ZipEntry(f.getName))
| Using.resource(new BufferedInputStream(new
FileInputStream(f), 65536)) { in =>
| in.transferTo(jarsStream)
| }
| jarsStream.closeEntry()
| }
| }
| }
| }
```
- Time taken: 490 ms
- Time taken: 486 ms
- Time taken: 476 ms
- Time taken: 485 ms
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #51704 from yaooqinn/SPARK-52995.
Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../org/apache/spark/deploy/yarn/Client.scala | 27 +++++++++++-----------
1 file changed, 14 insertions(+), 13 deletions(-)
diff --git
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 8b621e82afe2..514bc41a7df1 100644
---
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.io.{File, FileFilter, FileNotFoundException, FileOutputStream,
InterruptedIOException, IOException, OutputStreamWriter}
+import java.io.{BufferedInputStream, BufferedOutputStream, File, FileFilter,
FileInputStream, FileNotFoundException, FileOutputStream,
InterruptedIOException, IOException, OutputStreamWriter}
import java.net.{InetAddress, UnknownHostException, URI, URL}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
@@ -28,6 +28,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
import scala.collection.immutable.{Map => IMap}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer,
Map}
import scala.jdk.CollectionConverters._
+import scala.util.Using
import scala.util.control.NonFatal
import com.google.common.base.Objects
@@ -711,21 +712,21 @@ private[spark] class Client(
sparkConf.getenv("SPARK_HOME")))
val jarsArchive = File.createTempFile(LOCALIZED_LIB_DIR, ".zip",
new File(Utils.getLocalDir(sparkConf)))
- val jarsStream = new ZipOutputStream(new
FileOutputStream(jarsArchive))
-
- try {
- jarsStream.setLevel(0)
- jarsDir.listFiles().foreach { f =>
- if (f.isFile &&
f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
- jarsStream.putNextEntry(new ZipEntry(f.getName))
- Files.copy(f.toPath, jarsStream)
+ val bufferSize = sparkConf.get(BUFFER_SIZE)
+ Using.resource(new ZipOutputStream(
+ new BufferedOutputStream(new FileOutputStream(jarsArchive),
bufferSize))) {
+ jarsStream =>
+ jarsStream.setLevel(0)
+ jarsDir.listFiles().foreach { f =>
+ if (f.isFile &&
f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
+ jarsStream.putNextEntry(new ZipEntry(f.getName))
+ Using.resource(new BufferedInputStream(new
FileInputStream(f), bufferSize)) {
+ _.transferTo(jarsStream)
+ }
+ }
jarsStream.closeEntry()
}
- }
- } finally {
- jarsStream.close()
}
-
distribute(jarsArchive.toURI.getPath,
resType = LocalResourceType.ARCHIVE,
destName = Some(LOCALIZED_LIB_DIR))
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]