Repository: spark Updated Branches: refs/heads/master 4262fb0d5 -> 00074b577
[SPARK-19062] Utils.writeByteBuffer bug fix This commit changes Utils.writeByteBuffer so that it does not change the position of the ByteBuffer that it writes out, and adds a unit test for this functionality. cc mridulm Author: Kay Ousterhout <[email protected]> Closes #16462 from kayousterhout/SPARK-19062. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00074b57 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00074b57 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00074b57 Branch: refs/heads/master Commit: 00074b57786cfe4a0d67d617d04d3a1ea21c9ae5 Parents: 4262fb0 Author: Kay Ousterhout <[email protected]> Authored: Wed Jan 4 11:21:09 2017 -0800 Committer: Kay Ousterhout <[email protected]> Committed: Wed Jan 4 11:21:09 2017 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Utils.scala | 4 ++++ .../org/apache/spark/util/UtilsSuite.scala | 25 +++++++++++++++++++- 2 files changed, 28 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/00074b57/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 078cc3d..0dcf030 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -237,9 +237,11 @@ private[spark] object Utils extends Logging { if (bb.hasArray) { out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) } else { + val originalPosition = bb.position() val bbval = new Array[Byte](bb.remaining()) bb.get(bbval) out.write(bbval) + bb.position(originalPosition) } } @@ -250,9 +252,11 @@ private[spark] object Utils extends Logging { if (bb.hasArray) { out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining()) } else { + val originalPosition = bb.position() val bbval = new Array[Byte](bb.remaining()) bb.get(bbval) out.write(bbval) + bb.position(originalPosition) } } http://git-wip-us.apache.org/repos/asf/spark/blob/00074b57/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index fb7b912..442a603 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -17,7 +17,8 @@ package org.apache.spark.util -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, FileOutputStream, PrintStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataOutput, DataOutputStream, File, + FileOutputStream, PrintStream} import java.lang.{Double => JDouble, Float => JFloat} import java.net.{BindException, ServerSocket, URI} import java.nio.{ByteBuffer, ByteOrder} @@ -389,6 +390,28 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.deserializeLongValue(bbuf.array) === testval) } + test("writeByteBuffer should not change ByteBuffer position") { + // Test a buffer with an underlying array, for both writeByteBuffer methods. + val testBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4)) + assert(testBuffer.hasArray) + val bytesOut = new ByteBufferOutputStream(4096) + Utils.writeByteBuffer(testBuffer, bytesOut) + assert(testBuffer.position() === 0) + + val dataOut = new DataOutputStream(bytesOut) + Utils.writeByteBuffer(testBuffer, dataOut: DataOutput) + assert(testBuffer.position() === 0) + + // Test a buffer without an underlying array, for both writeByteBuffer methods. + val testDirectBuffer = ByteBuffer.allocateDirect(8) + assert(!testDirectBuffer.hasArray()) + Utils.writeByteBuffer(testDirectBuffer, bytesOut) + assert(testDirectBuffer.position() === 0) + + Utils.writeByteBuffer(testDirectBuffer, dataOut: DataOutput) + assert(testDirectBuffer.position() === 0) + } + test("get iterator size") { val empty = Seq[Int]() assert(Utils.getIteratorSize(empty.toIterator) === 0L) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
