This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 96edcce564f6 [SPARK-53191][CORE][SQL][MLLIB][YARN] Use Java 
`InputStream.readAllBytes` instead of `ByteStreams.toByteArray`
96edcce564f6 is described below

commit 96edcce564f635ddb601522b86eb8ee44adb3a1a
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Fri Aug 8 00:02:40 2025 -0700

    [SPARK-53191][CORE][SQL][MLLIB][YARN] Use Java `InputStream.readAllBytes` 
instead of `ByteStreams.toByteArray`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use Java `InputStream.readAllBytes` instead of 
`ByteStreams.toByteArray` in order to improve the performance.
    
    ### Why are the changes needed?
    
    Since Java 9+, we can use `readAllBytes` which is roughly 30% faster than 
`ByteStreams.toByteArray`.
    
    **BEFORE (ByteStreams.toByteArray)**
    
    ```scala
    scala> spark.time(com.google.common.io.ByteStreams.toByteArray(new 
java.io.FileInputStream("/tmp/1G.bin")).length)
    Time taken: 386 ms
    val res0: Int = 1073741824
    ```
    
    **AFTER (InputStream.readAllBytes)**
    
    ```scala
    scala> spark.time(new 
java.io.FileInputStream("/tmp/1G.bin").readAllBytes().length)
    Time taken: 248 ms
    val res0: Int = 1073741824
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #51919 from dongjoon-hyun/SPARK-53191.
    
    Authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java | 3 +--
 core/src/main/scala/org/apache/spark/input/PortableDataStream.scala | 4 ++--
 .../scala/org/apache/spark/input/WholeTextFileRecordReader.scala    | 4 ++--
 core/src/test/scala/org/apache/spark/CheckpointSuite.scala          | 3 +--
 core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala  | 4 +---
 .../org/apache/spark/deploy/history/EventLogFileReadersSuite.scala  | 5 ++---
 .../scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala    | 6 +++---
 core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala   | 3 +--
 .../scala/org/apache/spark/ml/source/image/ImageFileFormat.scala    | 4 ++--
 .../test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala  | 4 +---
 .../src/main/scala/org/apache/spark/sql/catalyst/util/package.scala | 4 +---
 .../sql/execution/datasources/binaryfile/BinaryFileFormat.scala     | 4 ++--
 .../spark/sql/execution/datasources/json/JsonDataSource.scala       | 3 +--
 .../execution/datasources/binaryfile/BinaryFileFormatSuite.scala    | 4 ++--
 14 files changed, 22 insertions(+), 33 deletions(-)

diff --git 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
index bf0424a1506a..48376e5bf2d4 100644
--- 
a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
+++ 
b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java
@@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.security.sasl.SaslException;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -282,7 +281,7 @@ public class SparkSaslSuite {
       verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class));
       verify(callback, never()).onFailure(anyInt(), any(Throwable.class));
 
-      byte[] received = 
ByteStreams.toByteArray(response.get().createInputStream());
+      byte[] received = response.get().createInputStream().readAllBytes();
       assertArrayEquals(data, received);
     } finally {
       file.delete();
diff --git 
a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala 
b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
index 3c3017a9a64c..9211bfec5cbd 100644
--- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
+++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala
@@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, 
DataInputStream, Da
 
 import scala.jdk.CollectionConverters._
 
-import com.google.common.io.{ByteStreams, Closeables}
+import com.google.common.io.Closeables
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, 
TaskAttemptContext}
@@ -202,7 +202,7 @@ class PortableDataStream(
   def toArray(): Array[Byte] = {
     val stream = open()
     try {
-      ByteStreams.toByteArray(stream)
+      stream.readAllBytes()
     } finally {
       Closeables.close(stream, true)
     }
diff --git 
a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala 
b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
index 20ebfe035c08..ba975237cb93 100644
--- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
+++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.input
 
-import com.google.common.io.{ByteStreams, Closeables}
+import com.google.common.io.Closeables
 import org.apache.hadoop.conf.{Configurable => HConfigurable, Configuration}
 import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapreduce.InputSplit
@@ -71,7 +71,7 @@ private[spark] class WholeTextFileRecordReader(
   override def nextKeyValue(): Boolean = {
     if (!processed) {
       val fileIn = HadoopCodecStreams.createInputStream(getConf, path)
-      val innerBuffer = ByteStreams.toByteArray(fileIn)
+      val innerBuffer = fileIn.readAllBytes()
 
       value = new Text(innerBuffer)
       Closeables.close(fileIn, false)
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala 
b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index 7a39ba4ab382..58512a2282ac 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -21,7 +21,6 @@ import java.io.File
 
 import scala.reflect.ClassTag
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.fs.Path
 
 import 
org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME
@@ -612,7 +611,7 @@ class CheckpointStorageSuite extends SparkFunSuite with 
LocalSparkContext {
       val compressedInputStream = CompressionCodec.createCodec(conf)
         .compressedInputStream(fs.open(checkpointFile))
       try {
-        ByteStreams.toByteArray(compressedInputStream)
+        compressedInputStream.readAllBytes()
       } finally {
         compressedInputStream.close()
       }
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 0116b2aa781a..dbecad2df768 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -25,7 +25,6 @@ import java.nio.file.{Files, Paths}
 import scala.collection.mutable.ArrayBuffer
 import scala.io.{Codec, Source}
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
 import org.scalatest.BeforeAndAfterEach
@@ -1880,8 +1879,7 @@ object SimpleApplicationTest {
 object UserClasspathFirstTest {
   def main(args: Array[String]): Unit = {
     val ccl = Thread.currentThread().getContextClassLoader()
-    val resource = ccl.getResourceAsStream("test.resource")
-    val bytes = ByteStreams.toByteArray(resource)
+    val bytes = ccl.getResourceAsStream("test.resource").readAllBytes()
     val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
     if (contents != "USER") {
       throw new SparkException("Should have read user resource, but instead 
read: " + contents)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
index cd8c876c5017..c1a93c7aa9a2 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala
@@ -22,7 +22,6 @@ import java.net.URI
 import java.nio.file.Files
 import java.util.zip.{ZipInputStream, ZipOutputStream}
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfter
@@ -220,7 +219,7 @@ class SingleFileEventLogFileReaderSuite extends 
EventLogFileReadersSuite {
 
       val entry = is.getNextEntry
       assert(entry != null)
-      val actual = ByteStreams.toByteArray(is)
+      val actual = is.readAllBytes()
       val expected = Files.readAllBytes(new File(logPath.toString).toPath)
       assert(actual === expected)
       assert(is.getNextEntry === null)
@@ -367,7 +366,7 @@ class RollingEventLogFilesReaderSuite extends 
EventLogFileReadersSuite {
           val fileName = entry.getName.stripPrefix(logPath.getName + "/")
           assert(allFileNames.contains(fileName))
 
-          val actual = ByteStreams.toByteArray(is)
+          val actual = is.readAllBytes()
           val expected = Files.readAllBytes(new File(logPath.toString, 
fileName).toPath)
           assert(actual === expected)
         }
diff --git 
a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
index 94e51c0c5341..fe6530862cab 100644
--- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala
@@ -93,7 +93,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
 
     val inputStream = new ByteArrayInputStream(encryptedBytes)
     val wrappedInputStream = serializerManager.wrapStream(blockId, inputStream)
-    val decryptedBytes = ByteStreams.toByteArray(wrappedInputStream)
+    val decryptedBytes = wrappedInputStream.readAllBytes()
     val decryptedStr = new String(decryptedBytes, UTF_8)
     assert(decryptedStr === plainStr)
   }
@@ -141,7 +141,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
 
     val inStream = createCryptoInputStream(new FileInputStream(file), conf, 
key)
     try {
-      val inStreamData = ByteStreams.toByteArray(inStream)
+      val inStreamData = inStream.readAllBytes()
       assert(Arrays.equals(inStreamData, testData))
     } finally {
       inStream.close()
@@ -159,7 +159,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
 
     val inChannel = createReadableChannel(new 
FileInputStream(file).getChannel(), conf, key)
     try {
-      val inChannelData = 
ByteStreams.toByteArray(Channels.newInputStream(inChannel))
+      val inChannelData = Channels.newInputStream(inChannel).readAllBytes()
       assert(Arrays.equals(inChannelData, testData))
     } finally {
       inChannel.close()
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
index 8baaa68692a8..475312f8d3a9 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala
@@ -21,7 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer}
 import java.nio.file.Files
 import java.util.{Arrays, Random}
 
-import com.google.common.io.ByteStreams
 import io.netty.channel.FileRegion
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
@@ -172,7 +171,7 @@ class DiskStoreSuite extends SparkFunSuite {
   private def readViaInputStream(data: BlockData): Array[Byte] = {
     val is = data.toInputStream()
     try {
-      ByteStreams.toByteArray(is)
+      is.readAllBytes()
     } finally {
       is.close()
     }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala 
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
index f44cbd5b3acc..10a3101dbb60 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.ml.source.image
 
-import com.google.common.io.{ByteStreams, Closeables}
+import com.google.common.io.Closeables
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.FileStatus
 import org.apache.hadoop.mapreduce.Job
@@ -76,7 +76,7 @@ private[image] case class ImageFileFormat() extends 
FileFormat with DataSourceRe
         val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
         val stream = fs.open(path)
         val bytes = try {
-          ByteStreams.toByteArray(stream)
+          stream.readAllBytes()
         } finally {
           Closeables.close(stream, true)
         }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 2559b241e461..32cf9ea66803 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -27,7 +27,6 @@ import scala.collection.mutable
 import scala.concurrent.duration._
 import scala.io.Source
 
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.exceptions.TestFailedException
@@ -763,8 +762,7 @@ private object YarnClasspathTest extends Logging {
     var result = "failure"
     try {
       val ccl = Thread.currentThread().getContextClassLoader()
-      val resource = ccl.getResourceAsStream("test.resource")
-      val bytes = ByteStreams.toByteArray(resource)
+      val bytes = ccl.getResourceAsStream("test.resource").readAllBytes()
       result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8)
     } catch {
       case t: Throwable =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
index 1532f0e67b4d..562a02e6a111 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala
@@ -20,8 +20,6 @@ package org.apache.spark.sql.catalyst
 import java.io._
 import java.nio.charset.StandardCharsets.UTF_8
 
-import com.google.common.io.ByteStreams
-
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.analysis.TempResolvedColumn
 import org.apache.spark.sql.catalyst.expressions._
@@ -52,7 +50,7 @@ package object util extends Logging {
       classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = {
     val inStream = classLoader.getResourceAsStream(resource)
     try {
-      ByteStreams.toByteArray(inStream)
+      inStream.readAllBytes()
     } finally {
       inStream.close()
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
index a1f2e1644924..e242609fa58b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.binaryfile
 
 import java.sql.Timestamp
 
-import com.google.common.io.{ByteStreams, Closeables}
+import com.google.common.io.Closeables
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.mapreduce.Job
@@ -118,7 +118,7 @@ case class BinaryFileFormat() extends FileFormat with 
DataSourceRegister {
             }
             val stream = fs.open(status.getPath)
             try {
-              writer.write(i, ByteStreams.toByteArray(stream))
+              writer.write(i, stream.readAllBytes())
             } finally {
               Closeables.close(stream, true)
             }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
index bedf5ec62e4e..aaa5af478dbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json
 import java.io.InputStream
 
 import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
-import com.google.common.io.ByteStreams
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hadoop.io.Text
@@ -222,7 +221,7 @@ object MultiLineJsonDataSource extends JsonDataSource {
           CodecStreams.createInputStreamWithCloseResource(conf, file.toPath)
         }
       } { inputStream =>
-        UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
+        UTF8String.fromBytes(inputStream.readAllBytes())
       }
     }
     val streamParser = parser.options.encoding
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
index 62f2f2cb10a8..7d2166beb2d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala
@@ -23,7 +23,7 @@ import java.sql.Timestamp
 
 import scala.jdk.CollectionConverters._
 
-import com.google.common.io.{ByteStreams, Closeables}
+import com.google.common.io.Closeables
 import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path}
 import org.mockito.Mockito.{mock, when}
 
@@ -134,7 +134,7 @@ class BinaryFileFormatSuite extends QueryTest with 
SharedSparkSession {
           val fcontent = {
             val stream = fs.open(fileStatus.getPath)
             val content = try {
-              ByteStreams.toByteArray(stream)
+              stream.readAllBytes()
             } finally {
               Closeables.close(stream, true)
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to