This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 96edcce564f6 [SPARK-53191][CORE][SQL][MLLIB][YARN] Use Java `InputStream.readAllBytes` instead of `ByteStreams.toByteArray` 96edcce564f6 is described below commit 96edcce564f635ddb601522b86eb8ee44adb3a1a Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Fri Aug 8 00:02:40 2025 -0700 [SPARK-53191][CORE][SQL][MLLIB][YARN] Use Java `InputStream.readAllBytes` instead of `ByteStreams.toByteArray` ### What changes were proposed in this pull request? This PR aims to use Java `InputStream.readAllBytes` instead of `ByteStreams.toByteArray` in order to improve the performance. ### Why are the changes needed? Since Java 9+, we can use `readAllBytes` which is roughly 30% faster than `ByteStreams.toByteArray`. **BEFORE (ByteStreams.toByteArray)** ```scala scala> spark.time(com.google.common.io.ByteStreams.toByteArray(new java.io.FileInputStream("/tmp/1G.bin")).length) Time taken: 386 ms val res0: Int = 1073741824 ``` **AFTER (InputStream.readAllBytes)** ```scala scala> spark.time(new java.io.FileInputStream("/tmp/1G.bin").readAllBytes().length) Time taken: 248 ms val res0: Int = 1073741824 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #51919 from dongjoon-hyun/SPARK-53191. Authored-by: Dongjoon Hyun <dongj...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java | 3 +-- core/src/main/scala/org/apache/spark/input/PortableDataStream.scala | 4 ++-- .../scala/org/apache/spark/input/WholeTextFileRecordReader.scala | 4 ++-- core/src/test/scala/org/apache/spark/CheckpointSuite.scala | 3 +-- core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 4 +--- .../org/apache/spark/deploy/history/EventLogFileReadersSuite.scala | 5 ++--- .../scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala | 6 +++--- core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala | 3 +-- .../scala/org/apache/spark/ml/source/image/ImageFileFormat.scala | 4 ++-- .../test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 4 +--- .../src/main/scala/org/apache/spark/sql/catalyst/util/package.scala | 4 +--- .../sql/execution/datasources/binaryfile/BinaryFileFormat.scala | 4 ++-- .../spark/sql/execution/datasources/json/JsonDataSource.scala | 3 +-- .../execution/datasources/binaryfile/BinaryFileFormatSuite.scala | 4 ++-- 14 files changed, 22 insertions(+), 33 deletions(-) diff --git a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java index bf0424a1506a..48376e5bf2d4 100644 --- a/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java +++ b/common/network-common/src/test/java/org/apache/spark/network/sasl/SparkSaslSuite.java @@ -36,7 +36,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.sasl.SaslException; import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteStreams; import com.google.common.io.Files; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -282,7 +281,7 @@ public class SparkSaslSuite { verify(callback, times(1)).onSuccess(anyInt(), any(ManagedBuffer.class)); verify(callback, never()).onFailure(anyInt(), any(Throwable.class)); - byte[] received = ByteStreams.toByteArray(response.get().createInputStream()); + byte[] received = response.get().createInputStream().readAllBytes(); assertArrayEquals(data, received); } finally { file.delete(); diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 3c3017a9a64c..9211bfec5cbd 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -21,7 +21,7 @@ import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, Da import scala.jdk.CollectionConverters._ -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.Closeables import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} @@ -202,7 +202,7 @@ class PortableDataStream( def toArray(): Array[Byte] = { val stream = open() try { - ByteStreams.toByteArray(stream) + stream.readAllBytes() } finally { Closeables.close(stream, true) } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 20ebfe035c08..ba975237cb93 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -17,7 +17,7 @@ package org.apache.spark.input -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.Closeables import org.apache.hadoop.conf.{Configurable => HConfigurable, Configuration} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.InputSplit @@ -71,7 +71,7 @@ private[spark] class WholeTextFileRecordReader( override def nextKeyValue(): Boolean = { if (!processed) { val fileIn = HadoopCodecStreams.createInputStream(getConf, path) - val innerBuffer = ByteStreams.toByteArray(fileIn) + val innerBuffer = fileIn.readAllBytes() value = new Text(innerBuffer) Closeables.close(fileIn, false) diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index 7a39ba4ab382..58512a2282ac 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -21,7 +21,6 @@ import java.io.File import scala.reflect.ClassTag -import com.google.common.io.ByteStreams import org.apache.hadoop.fs.Path import org.apache.spark.internal.config.CACHE_CHECKPOINT_PREFERRED_LOCS_EXPIRE_TIME @@ -612,7 +611,7 @@ class CheckpointStorageSuite extends SparkFunSuite with LocalSparkContext { val compressedInputStream = CompressionCodec.createCodec(conf) .compressedInputStream(fs.open(checkpointFile)) try { - ByteStreams.toByteArray(compressedInputStream) + compressedInputStream.readAllBytes() } finally { compressedInputStream.close() } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 0116b2aa781a..dbecad2df768 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -25,7 +25,6 @@ import java.nio.file.{Files, Paths} import scala.collection.mutable.ArrayBuffer import scala.io.{Codec, Source} -import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path} import org.scalatest.BeforeAndAfterEach @@ -1880,8 +1879,7 @@ object SimpleApplicationTest { object UserClasspathFirstTest { def main(args: Array[String]): Unit = { val ccl = Thread.currentThread().getContextClassLoader() - val resource = ccl.getResourceAsStream("test.resource") - val bytes = ByteStreams.toByteArray(resource) + val bytes = ccl.getResourceAsStream("test.resource").readAllBytes() val contents = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) if (contents != "USER") { throw new SparkException("Should have read user resource, but instead read: " + contents) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index cd8c876c5017..c1a93c7aa9a2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -22,7 +22,6 @@ import java.net.URI import java.nio.file.Files import java.util.zip.{ZipInputStream, ZipOutputStream} -import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfter @@ -220,7 +219,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite { val entry = is.getNextEntry assert(entry != null) - val actual = ByteStreams.toByteArray(is) + val actual = is.readAllBytes() val expected = Files.readAllBytes(new File(logPath.toString).toPath) assert(actual === expected) assert(is.getNextEntry === null) @@ -367,7 +366,7 @@ class RollingEventLogFilesReaderSuite extends EventLogFileReadersSuite { val fileName = entry.getName.stripPrefix(logPath.getName + "/") assert(allFileNames.contains(fileName)) - val actual = ByteStreams.toByteArray(is) + val actual = is.readAllBytes() val expected = Files.readAllBytes(new File(logPath.toString, fileName).toPath) assert(actual === expected) } diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala index 94e51c0c5341..fe6530862cab 100644 --- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -93,7 +93,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { val inputStream = new ByteArrayInputStream(encryptedBytes) val wrappedInputStream = serializerManager.wrapStream(blockId, inputStream) - val decryptedBytes = ByteStreams.toByteArray(wrappedInputStream) + val decryptedBytes = wrappedInputStream.readAllBytes() val decryptedStr = new String(decryptedBytes, UTF_8) assert(decryptedStr === plainStr) } @@ -141,7 +141,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { val inStream = createCryptoInputStream(new FileInputStream(file), conf, key) try { - val inStreamData = ByteStreams.toByteArray(inStream) + val inStreamData = inStream.readAllBytes() assert(Arrays.equals(inStreamData, testData)) } finally { inStream.close() @@ -159,7 +159,7 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { val inChannel = createReadableChannel(new FileInputStream(file).getChannel(), conf, key) try { - val inChannelData = ByteStreams.toByteArray(Channels.newInputStream(inChannel)) + val inChannelData = Channels.newInputStream(inChannel).readAllBytes() assert(Arrays.equals(inChannelData, testData)) } finally { inChannel.close() diff --git a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala index 8baaa68692a8..475312f8d3a9 100644 --- a/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/DiskStoreSuite.scala @@ -21,7 +21,6 @@ import java.nio.{ByteBuffer, MappedByteBuffer} import java.nio.file.Files import java.util.{Arrays, Random} -import com.google.common.io.ByteStreams import io.netty.channel.FileRegion import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -172,7 +171,7 @@ class DiskStoreSuite extends SparkFunSuite { private def readViaInputStream(data: BlockData): Array[Byte] = { val is = data.toInputStream() try { - ByteStreams.toByteArray(is) + is.readAllBytes() } finally { is.close() } diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala index f44cbd5b3acc..10a3101dbb60 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/image/ImageFileFormat.scala @@ -17,7 +17,7 @@ package org.apache.spark.ml.source.image -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.Closeables import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.Job @@ -76,7 +76,7 @@ private[image] case class ImageFileFormat() extends FileFormat with DataSourceRe val fs = path.getFileSystem(broadcastedHadoopConf.value.value) val stream = fs.open(path) val bytes = try { - ByteStreams.toByteArray(stream) + stream.readAllBytes() } finally { Closeables.close(stream, true) } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 2559b241e461..32cf9ea66803 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -27,7 +27,6 @@ import scala.collection.mutable import scala.concurrent.duration._ import scala.io.Source -import com.google.common.io.ByteStreams import org.apache.hadoop.yarn.conf.YarnConfiguration import org.scalatest.concurrent.Eventually._ import org.scalatest.exceptions.TestFailedException @@ -763,8 +762,7 @@ private object YarnClasspathTest extends Logging { var result = "failure" try { val ccl = Thread.currentThread().getContextClassLoader() - val resource = ccl.getResourceAsStream("test.resource") - val bytes = ByteStreams.toByteArray(resource) + val bytes = ccl.getResourceAsStream("test.resource").readAllBytes() result = new String(bytes, 0, bytes.length, StandardCharsets.UTF_8) } catch { case t: Throwable => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 1532f0e67b4d..562a02e6a111 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -20,8 +20,6 @@ package org.apache.spark.sql.catalyst import java.io._ import java.nio.charset.StandardCharsets.UTF_8 -import com.google.common.io.ByteStreams - import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.analysis.TempResolvedColumn import org.apache.spark.sql.catalyst.expressions._ @@ -52,7 +50,7 @@ package object util extends Logging { classLoader: ClassLoader = Utils.getSparkClassLoader): Array[Byte] = { val inStream = classLoader.getResourceAsStream(resource) try { - ByteStreams.toByteArray(inStream) + inStream.readAllBytes() } finally { inStream.close() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala index a1f2e1644924..e242609fa58b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.binaryfile import java.sql.Timestamp -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.Closeables import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.mapreduce.Job @@ -118,7 +118,7 @@ case class BinaryFileFormat() extends FileFormat with DataSourceRegister { } val stream = fs.open(status.getPath) try { - writer.write(i, ByteStreams.toByteArray(stream)) + writer.write(i, stream.readAllBytes()) } finally { Closeables.close(stream, true) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index bedf5ec62e4e..aaa5af478dbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.datasources.json import java.io.InputStream import com.fasterxml.jackson.core.{JsonFactory, JsonParser} -import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text @@ -222,7 +221,7 @@ object MultiLineJsonDataSource extends JsonDataSource { CodecStreams.createInputStreamWithCloseResource(conf, file.toPath) } } { inputStream => - UTF8String.fromBytes(ByteStreams.toByteArray(inputStream)) + UTF8String.fromBytes(inputStream.readAllBytes()) } } val streamParser = parser.options.encoding diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala index 62f2f2cb10a8..7d2166beb2d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormatSuite.scala @@ -23,7 +23,7 @@ import java.sql.Timestamp import scala.jdk.CollectionConverters._ -import com.google.common.io.{ByteStreams, Closeables} +import com.google.common.io.Closeables import org.apache.hadoop.fs.{FileStatus, FileSystem, GlobFilter, Path} import org.mockito.Mockito.{mock, when} @@ -134,7 +134,7 @@ class BinaryFileFormatSuite extends QueryTest with SharedSparkSession { val fcontent = { val stream = fs.open(fileStatus.getPath) val content = try { - ByteStreams.toByteArray(stream) + stream.readAllBytes() } finally { Closeables.close(stream, true) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org