This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 76d2878b61bd [SPARK-53188][CORE][SQL] Support `readFully` in
`SparkStreamUtils` and `JavaUtils`
76d2878b61bd is described below
commit 76d2878b61bd0be878f30aff230b01cd0d5c96cb
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Thu Aug 7 21:19:04 2025 -0700
[SPARK-53188][CORE][SQL] Support `readFully` in `SparkStreamUtils` and
`JavaUtils`
### What changes were proposed in this pull request?
This PR aims to support `readFully` in `SparkStreamUtils` and `JavaUtils`
which is based on Java 9+ `readNBytes` API.
```java
public static void readFully(InputStream in, byte[] arr, int off, int len)
throws IOException {
if (in == null || len < 0 || (off < 0 || off > arr.length - len)) {
throw new IllegalArgumentException("Invalid input argument");
}
if (len != in.readNBytes(arr, off, len)) {
throw new EOFException("Fail to read " + len + " bytes.");
}
}
```
### Why are the changes needed?
```scala
- ByteStreams.readFully(is, rowBuffer, 0, rowSize)
+ Utils.readFully(is, rowBuffer, 0, rowSize)
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #51915 from dongjoon-hyun/SPARK-53188.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../spark/network/shuffle/ExternalBlockHandlerSuite.java | 3 +--
.../main/java/org/apache/spark/network/util/JavaUtils.java | 12 ++++++++++++
.../main/scala/org/apache/spark/util/SparkStreamUtils.scala | 5 +++++
.../util/collection/unsafe/sort/UnsafeSorterSpillReader.java | 4 ++--
.../scala/org/apache/spark/security/CryptoStreamUtils.scala | 3 +--
core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
.../scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 5 ++---
.../scala/org/apache/spark/io/CompressionCodecSuite.scala | 5 ++---
.../spark/storage/ShuffleBlockFetcherIteratorSuite.scala | 3 +--
dev/checkstyle.xml | 4 ++++
scalastyle-config.xml | 5 +++++
.../org/apache/spark/sql/execution/UnsafeRowSerializer.scala | 7 +++----
.../streaming/state/HDFSBackedStateStoreProvider.scala | 9 ++++-----
.../sql/execution/streaming/state/StateStoreChangelog.scala | 7 +++----
.../streaming/StreamingQueryHashPartitionVerifySuite.scala | 5 ++---
15 files changed, 48 insertions(+), 31 deletions(-)
diff --git
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index f7edc8837fde..2a3135e3c8ae 100644
---
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -27,7 +27,6 @@ import java.util.zip.Checksum;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.Timer;
-import com.google.common.io.ByteStreams;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -136,7 +135,7 @@ public class ExternalBlockHandlerSuite {
CheckedInputStream checkedIn = new CheckedInputStream(
blockMarkers[0].createInputStream(), checksum);
byte[] buffer = new byte[10];
- ByteStreams.readFully(checkedIn, buffer, 0, (int)
blockMarkers[0].size());
+ JavaUtils.readFully(checkedIn, buffer, 0, (int) blockMarkers[0].size());
long checksumByWriter = checkedIn.getChecksum().getValue();
// when checksumByWriter == checksumRecalculated and checksumByReader !=
checksumByWriter
diff --git
a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
index 9e2db5250566..3a2485520c66 100644
--- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -610,6 +610,18 @@ public class JavaUtils {
}
}
+ /**
+ * Read len bytes exactly, otherwise throw exceptions.
+ */
+ public static void readFully(InputStream in, byte[] arr, int off, int len)
throws IOException {
+ if (in == null || len < 0 || (off < 0 || off > arr.length - len)) {
+ throw new IllegalArgumentException("Invalid input argument");
+ }
+ if (len != in.readNBytes(arr, off, len)) {
+ throw new EOFException("Fail to read " + len + " bytes.");
+ }
+ }
+
/**
* Copy the content of a URL into a file.
*/
diff --git
a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
index e1a1c98987e8..d4e44b019120 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/SparkStreamUtils.scala
@@ -20,6 +20,7 @@ import java.io.{FileInputStream, FileOutputStream,
InputStream, OutputStream}
import java.nio.channels.{FileChannel, WritableByteChannel}
import java.nio.charset.StandardCharsets
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally
private[spark] trait SparkStreamUtils {
@@ -109,6 +110,10 @@ private[spark] trait SparkStreamUtils {
def toString(in: InputStream): String = {
new String(in.readAllBytes(), StandardCharsets.UTF_8)
}
+
+ def readFully(in: InputStream, arr: Array[Byte], off: Int, len: Int): Unit =
{
+ JavaUtils.readFully(in, arr, off, len)
+ }
}
private [spark] object SparkStreamUtils extends SparkStreamUtils
diff --git
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 0693f8cb1a80..6674fcd73b92 100644
---
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -17,7 +17,6 @@
package org.apache.spark.util.collection.unsafe.sort;
-import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
@@ -27,6 +26,7 @@ import org.apache.spark.internal.SparkLogger;
import org.apache.spark.internal.SparkLoggerFactory;
import org.apache.spark.io.NioBufferedFileInputStream;
import org.apache.spark.io.ReadAheadInputStream;
+import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
@@ -128,7 +128,7 @@ public final class UnsafeSorterSpillReader extends
UnsafeSorterIterator implemen
arr = new byte[recordLength];
baseObject = arr;
}
- ByteStreams.readFully(in, arr, 0, recordLength);
+ JavaUtils.readFully(in, arr, 0, recordLength);
numRecordsRemaining--;
if (numRecordsRemaining == 0) {
close();
diff --git
a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
index 8bdb80d65207..b230df93e1fa 100644
--- a/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
+++ b/core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala
@@ -26,7 +26,6 @@ import javax.crypto.spec.{IvParameterSpec, SecretKeySpec}
import scala.jdk.CollectionConverters._
-import com.google.common.io.ByteStreams
import org.apache.commons.crypto.random._
import org.apache.commons.crypto.stream._
@@ -84,7 +83,7 @@ private[spark] object CryptoStreamUtils extends Logging {
sparkConf: SparkConf,
key: Array[Byte]): InputStream = {
val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
- ByteStreams.readFully(is, iv)
+ JavaUtils.readFully(is, iv, 0, IV_LENGTH_IN_BYTES)
val params = new CryptoParams(key, sparkConf)
new CryptoInputStream(params.transformation, params.conf, is,
params.keySpec,
new IvParameterSpec(iv))
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fc9375025881..57f70489a860 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1589,7 +1589,7 @@ private[spark] object Utils
try {
stream.skipNBytes(effectiveStart)
- ByteStreams.readFully(stream, buff)
+ readFully(stream, buff, 0, buff.length)
} finally {
stream.close()
}
diff --git
a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index 68b181de2928..992bb37f44a0 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream,
ObjectInputStream, ObjectOutputStream}
import java.nio.ByteBuffer
-import com.google.common.io.ByteStreams
-
import org.apache.spark.{SharedSparkContext, SparkFunSuite}
import org.apache.spark.internal.config
import org.apache.spark.network.util.ByteArrayWritableChannel
+import org.apache.spark.util.Utils
import org.apache.spark.util.io.ChunkedByteBuffer
class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext {
@@ -145,7 +144,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with
SharedSparkContext {
val inputStream = chunkedByteBuffer.toInputStream(dispose = false)
val bytesFromStream = new Array[Byte](chunkedByteBuffer.size.toInt)
- ByteStreams.readFully(inputStream, bytesFromStream)
+ Utils.readFully(inputStream, bytesFromStream, 0, bytesFromStream.length)
assert(bytesFromStream === bytes1.array() ++ bytes2.array())
assert(chunkedByteBuffer.getChunks().head.position() === 0)
}
diff --git
a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
index ff971b72d891..d6f0bfd237e4 100644
--- a/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/CompressionCodecSuite.scala
@@ -20,10 +20,9 @@ package org.apache.spark.io
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util.Locale
-import com.google.common.io.ByteStreams
-
import org.apache.spark.{SparkConf, SparkFunSuite,
SparkIllegalArgumentException}
import org.apache.spark.internal.config.IO_COMPRESSION_ZSTD_BUFFERPOOL_ENABLED
+import org.apache.spark.util.Utils
class CompressionCodecSuite extends SparkFunSuite {
val conf = new SparkConf(false)
@@ -158,7 +157,7 @@ class CompressionCodecSuite extends SparkFunSuite {
}
val concatenatedBytes = codec.compressedInputStream(new
ByteArrayInputStream(bytes1 ++ bytes2))
val decompressed: Array[Byte] = new Array[Byte](128)
- ByteStreams.readFully(concatenatedBytes, decompressed)
+ Utils.readFully(concatenatedBytes, decompressed, 0, decompressed.length)
assert(decompressed.toSeq === (0 to 127))
}
diff --git
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 3de78407ca16..211de2e8729e 100644
---
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -29,7 +29,6 @@ import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future
-import com.google.common.io.ByteStreams
import io.netty.util.internal.OutOfDirectMemoryError
import org.apache.logging.log4j.Level
import org.mockito.ArgumentMatchers.{any, eq => meq}
@@ -289,7 +288,7 @@ class ShuffleBlockFetcherIteratorSuite extends
SparkFunSuite {
intercept[FetchFailedException] {
val inputStream = iterator.next()._2
// Consume the data to trigger the corruption
- ByteStreams.readFully(inputStream, new Array[Byte](100))
+ Utils.readFully(inputStream, new Array[Byte](100), 0, 100)
}
// The block will be fetched only once because corruption can't be
detected in
// maxBytesInFlight/3 of the data size
diff --git a/dev/checkstyle.xml b/dev/checkstyle.xml
index 78f02b957b47..f7c4801c9e6f 100644
--- a/dev/checkstyle.xml
+++ b/dev/checkstyle.xml
@@ -213,6 +213,10 @@
<property name="format" value="ByteStreams\.skipFully"/>
<property name="message" value="Use Java skipNBytes instead." />
</module>
+ <module name="RegexpSinglelineJava">
+ <property name="format" value="ByteStreams\.readFully"/>
+ <property name="message" value="Use readFully of
JavaUtils/SparkStreamUtils/Utils instead." />
+ </module>
<module name="RegexpSinglelineJava">
<property name="format" value="FileUtils.writeStringToFile"/>
<property name="message" value="Use
java.nio.file.Files.writeString instead." />
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index 36c31c8e3187..3b3dce52b36c 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -737,6 +737,11 @@ This file is divided into 3 sections:
<customMessage>Use Java `skipNBytes` instead.</customMessage>
</check>
+ <check customId="readFully" level="error"
class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter
name="regex">\bByteStreams\.readFully\b</parameter></parameters>
+ <customMessage>Use readFully of JavaUtils/SparkStreamUtils/Utils
instead.</customMessage>
+ </check>
+
<check customId="maputils" level="error"
class="org.scalastyle.file.RegexChecker" enabled="true">
<parameters><parameter
name="regex">org\.apache\.commons\.collections4\.MapUtils\b</parameter></parameters>
<customMessage>Use org.apache.spark.util.collection.Utils
instead.</customMessage>
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
index 42fcfa8d60fa..9728d664998e 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala
@@ -22,13 +22,12 @@ import java.nio.ByteBuffer
import scala.reflect.ClassTag
-import com.google.common.io.ByteStreams
-
import org.apache.spark.SparkUnsupportedOperationException
import org.apache.spark.serializer.{DeserializationStream,
SerializationStream, Serializer, SerializerInstance}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Utils
/**
* Serializer for serializing [[UnsafeRow]]s during shuffle. Since UnsafeRows
are already stored as
@@ -125,7 +124,7 @@ private class UnsafeRowSerializerInstance(
if (rowBuffer.length < rowSize) {
rowBuffer = new Array[Byte](rowSize)
}
- ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
+ Utils.readFully(dIn, rowBuffer, 0, rowSize)
row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize)
rowSize = readSize()
if (rowSize == EOF) { // We are returning the last row in this
stream
@@ -160,7 +159,7 @@ private class UnsafeRowSerializerInstance(
if (rowBuffer.length < rowSize) {
rowBuffer = new Array[Byte](rowSize)
}
- ByteStreams.readFully(dIn, rowBuffer, 0, rowSize)
+ Utils.readFully(dIn, rowBuffer, 0, rowSize)
row.pointTo(rowBuffer, Platform.BYTE_ARRAY_OFFSET, rowSize)
row.asInstanceOf[T]
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 0ba4b1955c82..8f8ec84e2d10 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal
-import com.google.common.io.ByteStreams
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
@@ -662,7 +661,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
fileToRead, toString(), keySize)
} else {
val keyRowBuffer = new Array[Byte](keySize)
- ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
+ Utils.readFully(input, keyRowBuffer, 0, keySize)
val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyRowBuffer, keySize)
@@ -672,7 +671,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
map.remove(keyRow)
} else {
val valueRowBuffer = new Array[Byte](valueSize)
- ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
+ Utils.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to
multiple of 8.
// This is a workaround for the following:
@@ -782,7 +781,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
fileToRead, toString(), keySize)
} else {
val keyRowBuffer = new Array[Byte](keySize)
- ByteStreams.readFully(input, keyRowBuffer, 0, keySize)
+ Utils.readFully(input, keyRowBuffer, 0, keySize)
val keyRow = new UnsafeRow(keySchema.fields.length)
keyRow.pointTo(keyRowBuffer, keySize)
@@ -793,7 +792,7 @@ private[sql] class HDFSBackedStateStoreProvider extends
StateStoreProvider with
fileToRead, toString(), valueSize)
} else {
val valueRowBuffer = new Array[Byte](valueSize)
- ByteStreams.readFully(input, valueRowBuffer, 0, valueSize)
+ Utils.readFully(input, valueRowBuffer, 0, valueSize)
val valueRow = new UnsafeRow(valueSchema.fields.length)
// If valueSize in existing file is not multiple of 8, floor it to
multiple of 8.
// This is a workaround for the following:
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
index ccb58ed05a6d..9858b2494f84 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreChangelog.scala
@@ -21,7 +21,6 @@ import java.io.{DataInputStream, DataOutputStream,
FileNotFoundException, IOExce
import scala.util.control.NonFatal
-import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.{FSError, Path}
import org.json4s._
import org.json4s.jackson.Serialization
@@ -485,14 +484,14 @@ class StateStoreChangelogReaderV1(
} else {
// TODO: reuse the key buffer and value buffer across records.
val keyBuffer = new Array[Byte](keySize)
- ByteStreams.readFully(input, keyBuffer, 0, keySize)
+ Utils.readFully(input, keyBuffer, 0, keySize)
val valueSize = input.readInt()
if (valueSize < 0) {
// A deletion record
(RecordType.DELETE_RECORD, keyBuffer, null)
} else {
val valueBuffer = new Array[Byte](valueSize)
- ByteStreams.readFully(input, valueBuffer, 0, valueSize)
+ Utils.readFully(input, valueBuffer, 0, valueSize)
// A put record.
(RecordType.PUT_RECORD, keyBuffer, valueBuffer)
}
@@ -516,7 +515,7 @@ class StateStoreChangelogReaderV2(
private def parseBuffer(input: DataInputStream): Array[Byte] = {
val blockSize = input.readInt()
val blockBuffer = new Array[Byte](blockSize)
- ByteStreams.readFully(input, blockBuffer, 0, blockSize)
+ Utils.readFully(input, blockBuffer, 0, blockSize)
blockBuffer
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
index 3d8c20af3b38..a119edf6bdfb 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryHashPartitionVerifySuite.scala
@@ -22,14 +22,13 @@ import java.io.{BufferedWriter, DataInputStream,
DataOutputStream, File, FileInp
import scala.io.Source
import scala.util.Random
-import com.google.common.io.ByteStreams
-
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.{RandomDataGenerator, Row}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{BoundReference,
GenericInternalRow, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.types.{BinaryType, DataType, DoubleType,
FloatType, IntegerType, LongType, StringType, StructType, TimestampType}
+import org.apache.spark.util.Utils
/**
* To run the test suite:
@@ -110,7 +109,7 @@ class StreamingQueryHashPartitionVerifySuite extends
StreamTest {
val rows = (1 to numRows).map { _ =>
val rowSize = is.readInt()
val rowBuffer = new Array[Byte](rowSize)
- ByteStreams.readFully(is, rowBuffer, 0, rowSize)
+ Utils.readFully(is, rowBuffer, 0, rowSize)
val row = new UnsafeRow(1)
row.pointTo(rowBuffer, rowSize)
row
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]