Repository: spark Updated Branches: refs/heads/master ce2cdc36e -> b58b1fdf9
[SPARK-26068][CORE] ChunkedByteBufferInputStream should handle empty chunks correctly ## What changes were proposed in this pull request? Empty chunk in ChunkedByteBuffer will truncate the ChunkedByteBufferInputStream. The detail reason is described in: https://issues.apache.org/jira/browse/SPARK-26068 ## How was this patch tested? Modified current UT to cover this case. Closes #23040 from LinhongLiu/fix-empty-chunked-byte-buffer. Lead-authored-by: Liu,Linhong <[email protected]> Co-authored-by: Xianjin YE <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b58b1fdf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b58b1fdf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b58b1fdf Branch: refs/heads/master Commit: b58b1fdf906d9609321824fc0bb892b986763b3e Parents: ce2cdc3 Author: Liu,Linhong <[email protected]> Authored: Mon Nov 19 22:09:44 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Mon Nov 19 22:09:44 2018 +0800 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 3 ++- .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b58b1fdf/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 870830f..128d6ff 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -222,7 +222,8 @@ private[spark] class ChunkedByteBufferInputStream( dispose: Boolean) extends InputStream { - private[this] var chunks = chunkedByteBuffer.getChunks().iterator + // Filter out empty chunks since `read()` assumes all chunks are non-empty. + private[this] var chunks = chunkedByteBuffer.getChunks().filter(_.hasRemaining).iterator private[this] var currentChunk: ByteBuffer = { if (chunks.hasNext) { chunks.next() http://git-wip-us.apache.org/repos/asf/spark/blob/b58b1fdf/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index ff117b1..083c5e6 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -90,7 +90,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with SharedSparkContext { val empty = ByteBuffer.wrap(Array.empty[Byte]) val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte)) val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte)) - val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2)) + val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, empty, bytes2)) assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit()) val inputStream = chunkedByteBuffer.toInputStream(dispose = false) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
