This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8b80ea04d0b1 [SPARK-53190][CORE] Use Java `InputStream.transferTo` instead of `ByteStreams.copy` 8b80ea04d0b1 is described below commit 8b80ea04d0b14d19b819cd4648b5ddd3e1c42650 Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu Aug 7 23:02:24 2025 -0700 [SPARK-53190][CORE] Use Java `InputStream.transferTo` instead of `ByteStreams.copy` ### What changes were proposed in this pull request? This PR aims to use Java 9+ API `InputStream.transferTo` instead of `ByteStreams.copy`. Note that this improves `UnsafeShuffleWriter`. ### Why are the changes needed? Java `transferTo` is **faster** than `ByteStreams.copy`. ```scala scala> import java.io._ import java.io._ scala> spark.time(new FileInputStream("/tmp/4G.bin").transferTo(new FileOutputStream("/dev/null"))) Time taken: 5 ms val res2: Long = 4294967296 scala> spark.time(com.google.common.io.ByteStreams.copy(new FileInputStream("/tmp/4G.bin"), new FileOutputStream("/dev/null"))) Time taken: 772 ms val res3: Long = 4294967296 ``` ```scala $ bin/spark-shell --driver-memory 12G ... Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT /_/ Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8) ... scala> spark.time(new java.io.FileInputStream("/tmp/4G.bin").transferTo(new java.io.FileOutputStream("/tmp/4G.bin.java"))) Time taken: 1209 ms val res0: Long = 4294967296 ``` ```scala $ bin/spark-shell --driver-memory 12G ... ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT /_/ Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 21.0.8) ... scala> spark.time(com.google.common.io.ByteStreams.copy(new java.io.FileInputStream("/tmp/4G.bin"), new java.io.FileOutputStream("/tmp/4G.bin.google"))) Time taken: 1899 ms val res0: Long = 4294967296 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51918 from dongjoon-hyun/SPARK-53190. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 3 +-- core/src/main/scala/org/apache/spark/TestUtils.scala | 5 ++--- core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala | 4 ++-- .../scala/org/apache/spark/deploy/history/EventLogFileReaders.scala | 3 +-- .../src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala | 2 +- .../scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala | 4 +++- dev/checkstyle.xml | 4 ++++ scalastyle-config.xml | 5 +++++ 8 files changed, 19 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index e725df593a82..36a148762736 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -33,7 +33,6 @@ import scala.reflect.ClassTag; import scala.reflect.ClassTag$; import com.google.common.annotations.VisibleForTesting; -import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import org.apache.spark.*; @@ -404,7 +403,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { partitionInputStream = compressionCodec.compressedInputStream( partitionInputStream); } - ByteStreams.copy(partitionInputStream, partitionOutput); + partitionInputStream.transferTo(partitionOutput); copySpillThrewException = false; } finally { Closeables.close(partitionInputStream, copySpillThrewException); diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index 915fb86bdcc6..aadfb2125cd6 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -37,7 +37,6 @@ import scala.reflect.{classTag, ClassTag} import scala.sys.process.Process import scala.util.Try -import com.google.common.io.ByteStreams import org.apache.logging.log4j.LogManager import org.apache.logging.log4j.core.LoggerContext import org.apache.logging.log4j.core.appender.ConsoleAppender @@ -96,7 +95,7 @@ private[spark] object TestUtils extends SparkTestUtils { files.foreach { case (k, v) => val entry = new JarEntry(k) jarStream.putNextEntry(entry) - ByteStreams.copy(new ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream) + new ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)).transferTo(jarStream) } jarStream.close() jarFile.toURI.toURL @@ -132,7 +131,7 @@ private[spark] object TestUtils extends SparkTestUtils { jarStream.putNextEntry(jarEntry) val in = new FileInputStream(file) - ByteStreams.copy(in, jarStream) + in.transferTo(jarStream) in.close() } jarStream.close() diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index d315155ec44a..24f1f5a60eec 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -24,7 +24,7 @@ import java.util.zip.{ZipEntry, ZipOutputStream} import scala.jdk.CollectionConverters._ -import com.google.common.io.{ByteStreams, Files} +import com.google.common.io.Files import org.apache.spark.api.r.RUtils import org.apache.spark.internal.{LogEntry, Logging, MessageWithContext} @@ -251,7 +251,7 @@ private[deploy] object RPackageUtils extends Logging { val fis = new FileInputStream(file) val zipEntry = new ZipEntry(relPath) zipOutputStream.putNextEntry(zipEntry) - ByteStreams.copy(fis, zipOutputStream) + fis.transferTo(zipOutputStream) zipOutputStream.closeEntry() fis.close() } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala index 8827fcde7b73..1721ef51807a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/EventLogFileReaders.scala @@ -21,7 +21,6 @@ import java.io.{BufferedInputStream, InputStream} import java.util.concurrent.ConcurrentHashMap import java.util.zip.{ZipEntry, ZipOutputStream} -import com.google.common.io.ByteStreams import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.hdfs.DFSInputStream @@ -52,7 +51,7 @@ abstract class EventLogFileReader( entryName: String): Unit = { Utils.tryWithResource(fileSystem.open(path, 1 * 1024 * 1024)) { inputStream => zipStream.putNextEntry(new ZipEntry(entryName)) - ByteStreams.copy(inputStream, zipStream) + inputStream.transferTo(zipStream) zipStream.closeEntry() } } diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 9bb38fc43938..83f646de8372 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -923,7 +923,7 @@ private[storage] class PartiallySerializedBlock[T]( verifyNotConsumedAndNotDiscarded() consumed = true // `unrolled`'s underlying buffers will be freed once this input stream is fully read: - ByteStreams.copy(unrolledBuffer.toInputStream(dispose = true), os) + unrolledBuffer.toInputStream(dispose = true).transferTo(os) memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) redirectableOutputStream.setOutputStream(os) while (rest.hasNext) { diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala index 83a9eb1df98f..94e51c0c5341 100644 --- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -134,7 +134,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { val outStream = createCryptoOutputStream(new FileOutputStream(file), conf, key) try { - ByteStreams.copy(new ByteArrayInputStream(testData), outStream) + new ByteArrayInputStream(testData).transferTo(outStream) } finally { outStream.close() } @@ -150,7 +150,9 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { val outChannel = createWritableChannel(new FileOutputStream(file).getChannel(), conf, key) try { val inByteChannel = Channels.newChannel(new ByteArrayInputStream(testData)) + // scalastyle:off bytestreamscopy ByteStreams.copy(inByteChannel, outChannel) + // scalastyle:on bytestreamscopy } finally { outChannel.close() } diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml index f7c4801c9e6f..00f3e0d9e5ca 100644 --- a/dev/checkstyle.xml +++ b/dev/checkstyle.xml @@ -209,6 +209,10 @@ <property name="format" value="Files\.asCharSink"/> <property name="message" value="Use java.nio.file.Files.writeString instead." /> </module> + <module name="RegexpSinglelineJava"> + <property name="format" value="ByteStreams\.copy"/> + <property name="message" value="Use Java transferTo instead." /> + </module> <module name="RegexpSinglelineJava"> <property name="format" value="ByteStreams\.skipFully"/> <property name="message" value="Use Java skipNBytes instead." /> diff --git a/scalastyle-config.xml b/scalastyle-config.xml index 3b3dce52b36c..e6d0007cae48 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -732,6 +732,11 @@ This file is divided into 3 sections: <customMessage>Use Java `write` instead.</customMessage> </check> + <check customId="bytestreamscopy" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> + <parameters><parameter name="regex">\bByteStreams\.copy\b</parameter></parameters> + <customMessage>Use Java transferTo instead.</customMessage> + </check> + <check customId="skipFully" level="error" class="org.scalastyle.file.RegexChecker" enabled="true"> <parameters><parameter name="regex">\bByteStreams\.skipFully\b</parameter></parameters> <customMessage>Use Java `skipNBytes` instead.</customMessage> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org