Repository: spark
Updated Branches:
  refs/heads/master ce2cdc36e -> b58b1fdf9


[SPARK-26068][CORE] ChunkedByteBufferInputStream should handle empty chunks 
correctly

## What changes were proposed in this pull request?

Empty chunk in ChunkedByteBuffer will truncate the ChunkedByteBufferInputStream.
The detail reason is described in: 
https://issues.apache.org/jira/browse/SPARK-26068

## How was this patch tested?
Modified current UT to cover this case.

Closes #23040 from LinhongLiu/fix-empty-chunked-byte-buffer.

Lead-authored-by: Liu,Linhong <[email protected]>
Co-authored-by: Xianjin YE <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b58b1fdf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b58b1fdf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b58b1fdf

Branch: refs/heads/master
Commit: b58b1fdf906d9609321824fc0bb892b986763b3e
Parents: ce2cdc3
Author: Liu,Linhong <[email protected]>
Authored: Mon Nov 19 22:09:44 2018 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Nov 19 22:09:44 2018 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala   | 3 ++-
 .../test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala   | 2 +-
 2 files changed, 3 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b58b1fdf/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala 
b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
index 870830f..128d6ff 100644
--- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
+++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala
@@ -222,7 +222,8 @@ private[spark] class ChunkedByteBufferInputStream(
     dispose: Boolean)
   extends InputStream {
 
-  private[this] var chunks = chunkedByteBuffer.getChunks().iterator
+  // Filter out empty chunks since `read()` assumes all chunks are non-empty.
+  private[this] var chunks = 
chunkedByteBuffer.getChunks().filter(_.hasRemaining).iterator
   private[this] var currentChunk: ByteBuffer = {
     if (chunks.hasNext) {
       chunks.next()

http://git-wip-us.apache.org/repos/asf/spark/blob/b58b1fdf/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala 
b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
index ff117b1..083c5e6 100644
--- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
+++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala
@@ -90,7 +90,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite with 
SharedSparkContext {
     val empty = ByteBuffer.wrap(Array.empty[Byte])
     val bytes1 = ByteBuffer.wrap(Array.tabulate(256)(_.toByte))
     val bytes2 = ByteBuffer.wrap(Array.tabulate(128)(_.toByte))
-    val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, bytes2))
+    val chunkedByteBuffer = new ChunkedByteBuffer(Array(empty, bytes1, empty, 
bytes2))
     assert(chunkedByteBuffer.size === bytes1.limit() + bytes2.limit())
 
     val inputStream = chunkedByteBuffer.toInputStream(dispose = false)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to