This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8b80ea04d0b1 [SPARK-53190][CORE] Use Java `InputStream.transferTo` 
instead of `ByteStreams.copy`
8b80ea04d0b1 is described below

commit 8b80ea04d0b14d19b819cd4648b5ddd3e1c42650
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu Aug 7 23:02:24 2025 -0700

    [SPARK-53190][CORE] Use Java `InputStream.transferTo` instead of 
`ByteStreams.copy`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use Java 9+ API `InputStream.transferTo` instead of 
`ByteStreams.copy`.
    
    Note that this improves `UnsafeShuffleWriter`.
    
    ### Why are the changes needed?
    
    Java `transferTo` is **faster** than `ByteStreams.copy`.
    
    ```scala
    scala> import java.io._
    import java.io._
    
    scala> spark.time(new FileInputStream("/tmp/4G.bin").transferTo(new 
FileOutputStream("/dev/null")))
    Time taken: 5 ms
    val res2: Long = 4294967296
    
    scala> spark.time(com.google.common.io.ByteStreams.copy(new 
FileInputStream("/tmp/4G.bin"), new FileOutputStream("/dev/null")))
    Time taken: 772 ms
    val res3: Long = 4294967296
    ```
    
    ```scala
    $ bin/spark-shell --driver-memory 12G
    ...
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
          /_/
    
    Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8)
    ...
    scala> spark.time(new java.io.FileInputStream("/tmp/4G.bin").transferTo(new 
java.io.FileOutputStream("/tmp/4G.bin.java")))
    Time taken: 1209 ms
    val res0: Long = 4294967296
    ```
    
    ```scala
    $ bin/spark-shell --driver-memory 12G
    ...
          ____              __
         / __/__  ___ _____/ /__
        _\ \/ _ \/ _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
          /_/
    
    Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8)
    ...
    scala> spark.time(com.google.common.io.ByteStreams.copy(new 
java.io.FileInputStream("/tmp/4G.bin"), new 
java.io.FileOutputStream("/tmp/4G.bin.google")))
    Time taken: 1899 ms
    val res0: Long = 4294967296
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51918 from dongjoon-hyun/SPARK-53190.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 3 +--
 core/src/main/scala/org/apache/spark/TestUtils.scala                 | 5 ++---
 core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala      | 4 ++--
 .../scala/org/apache/spark/deploy/history/EventLogFileReaders.scala  | 3 +--
 .../src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +-
 .../scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala     | 4 +++-
 dev/checkstyle.xml                                                   | 4 ++++
 scalastyle-config.xml                                                | 5 +++++
 8 files changed, 19 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
index e725df593a82..36a148762736 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java
@@ -33,7 +33,6 @@ import scala.reflect.ClassTag;
 import scala.reflect.ClassTag$;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.io.ByteStreams;
 import com.google.common.io.Closeables;
 
 import org.apache.spark.*;
@@ -404,7 +403,7 @@ public class UnsafeShuffleWriter<K, V> extends 
ShuffleWriter<K, V> {
                   partitionInputStream = 
compressionCodec.compressedInputStream(
                       partitionInputStream);
                 }
-                ByteStreams.copy(partitionInputStream, partitionOutput);
+                partitionInputStream.transferTo(partitionOutput);
                 copySpillThrewException = false;
               } finally {
                 Closeables.close(partitionInputStream, 
copySpillThrewException);
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 915fb86bdcc6..aadfb2125cd6 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -37,7 +37,6 @@ import scala.reflect.{classTag, ClassTag}
 import scala.sys.process.Process
 import scala.util.Try
 
-import com.google.common.io.ByteStreams
 import org.apache.logging.log4j.LogManager
 import org.apache.logging.log4j.core.LoggerContext
 import org.apache.logging.log4j.core.appender.ConsoleAppender
@@ -96,7 +95,7 @@ private[spark] object TestUtils extends SparkTestUtils {
     files.foreach { case (k, v) =>
       val entry = new JarEntry(k)
       jarStream.putNextEntry(entry)
-      ByteStreams.copy(new 
ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream)
+      new 
ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)).transferTo(jarStream)
     }
     jarStream.close()
     jarFile.toURI.toURL
@@ -132,7 +131,7 @@ private[spark] object TestUtils extends SparkTestUtils {
       jarStream.putNextEntry(jarEntry)
 
       val in = new FileInputStream(file)
-      ByteStreams.copy(in, jarStream)
+      in.transferTo(jarStream)
       in.close()
     }
     jarStream.close()
diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala 
b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
index d315155ec44a..24f1f5a60eec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala
@@ -24,7 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.jdk.CollectionConverters._
 
-import com.google.common.io.{ByteStreams, Files}
+import com.google.common.io.Files
 
 import org.apache.spark.api.r.RUtils
 import org.apache.spark.internal.{LogEntry, Logging, MessageWithContext}
@@ -251,7 +251,7 @@ private[deploy] object RPackageUtils extends Logging {
         val fis = new FileInputStream(file)
         val zipEntry = new ZipEntry(relPath)
         zipOutputStream.putNextEntry(zipEntry)
-        ByteStreams.copy(fis, zipOutputStream)
+        fis.transferTo(zipOutputStream)
         zipOutputStream.closeEntry()
         fis.close()
       }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
index 8827fcde7b73..1721ef51807a 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala
@@ -21,7 +21,6 @@ import java.io.{BufferedInputStream, InputStream}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.hdfs.DFSInputStream
 
@@ -52,7 +51,7 @@ abstract class EventLogFileReader(
       entryName: String): Unit = {
     Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { 
inputStream =>
       zipStream.putNextEntry(new ZipEntry(entryName))
-      ByteStreams.copy(inputStream, zipStream)
+      inputStream.transferTo(zipStream)
       zipStream.closeEntry()
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala 
b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index 9bb38fc43938..83f646de8372 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -923,7 +923,7 @@ private[storage] class PartiallySerializedBlock[T](
     verifyNotConsumedAndNotDiscarded()
     consumed = true
     // `unrolled`'s underlying buffers will be freed once this input stream is 
fully read:
-    ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os)
+    unrolledBuffer.toInputStream(dispose = true).transferTo(os)
     memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
     redirectableOutputStream.setOutputStream(os)
     while (rest.hasNext) {
diff --git 
a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
index 83a9eb1df98f..94e51c0c5341 100644
--- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -134,7 +134,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
 
     val outStream = createCryptoOutputStream(new FileOutputStream(file), conf, 
key)
     try {
-      ByteStreams.copy(new ByteArrayInputStream(testData), outStream)
+      new ByteArrayInputStream(testData).transferTo(outStream)
     } finally {
       outStream.close()
     }
@@ -150,7 +150,9 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
     val outChannel = createWritableChannel(new 
FileOutputStream(file).getChannel(), conf, key)
     try {
       val inByteChannel = Channels.newChannel(new 
ByteArrayInputStream(testData))
+      // scalastyle:off bytestreamscopy
       ByteStreams.copy(inByteChannel, outChannel)
+      // scalastyle:on bytestreamscopy
     } finally {
       outChannel.close()
     }
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index f7c4801c9e6f..00f3e0d9e5ca 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -209,6 +209,10 @@
             <property name="format" value="Files\.asCharSink"/>
             <property name="message" value="Use 
java.nio.file.Files.writeString instead." />
         </module>
+        <module name="RegexpSinglelineJava">
+            <property name="format" value="ByteStreams\.copy"/>
+            <property name="message" value="Use Java transferTo instead." />
+        </module>
         <module name="RegexpSinglelineJava">
             <property name="format" value="ByteStreams\.skipFully"/>
             <property name="message" value="Use Java skipNBytes instead." />
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 3b3dce52b36c..e6d0007cae48 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -732,6 +732,11 @@ This file is divided into 3 sections:
     <customMessage>Use Java `write` instead.</customMessage>
   </check>
 
+  <check customId="bytestreamscopy" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter 
name="regex">\bByteStreams\.copy\b</parameter></parameters>
+    <customMessage>Use Java transferTo instead.</customMessage>
+  </check>
+
   <check customId="skipFully" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
     <parameters><parameter 
name="regex">\bByteStreams\.skipFully\b</parameter></parameters>
     <customMessage>Use Java `skipNBytes` instead.</customMessage>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to