[FLINK-8736][network] fix memory segment offsets for slices of slices being wrong
This closes #5551. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fb1c23a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fb1c23a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fb1c23a Branch: refs/heads/master Commit: 9fb1c23aaead71bd7e81a6a73be1b4206dac405f Parents: 6597e67 Author: Nico Kruber <[email protected]> Authored: Wed Feb 21 17:09:31 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Tue Feb 27 09:07:15 2018 +0100 ---------------------------------------------------------------------- .../buffer/ReadOnlySlicedNetworkBuffer.java | 13 +++-- .../buffer/ReadOnlySlicedBufferTest.java | 59 +++++++++++++++++++- 2 files changed, 64 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9fb1c23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java index e4b8113..52fb57a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java @@ -38,7 +38,7 @@ import java.nio.ReadOnlyBufferException; */ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implements Buffer { - private final int index; + private final int memorySegmentOffset; /** * Creates a buffer which shares the memory segment of the given buffer and exposed the given @@ -53,7 +53,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement */ ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int length) { super(new SlicedByteBuf(buffer, index, length)); - this.index = index; + this.memorySegmentOffset = buffer.getMemorySegmentOffset() + index; } /** @@ -66,10 +66,11 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement * @param buffer the buffer to derive from * @param index the index to start from * @param length the length of the slice + * @param memorySegmentOffset <tt>buffer</tt>'s absolute offset in the backing {@link MemorySegment} */ - private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length) { + private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int length, int memorySegmentOffset) { super(new SlicedByteBuf(buffer, index, length)); - this.index = index; + this.memorySegmentOffset = memorySegmentOffset + index; } @Override @@ -102,7 +103,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement @Override public int getMemorySegmentOffset() { - return ((Buffer) unwrap()).getMemorySegmentOffset() + index; + return memorySegmentOffset; } @Override @@ -133,7 +134,7 @@ public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf implement @Override public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) { - return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length); + return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, length, memorySegmentOffset); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/9fb1c23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java index 834ec74..529b0f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ReadOnlyBufferException; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertSame; @@ -50,8 +51,10 @@ public class ReadOnlySlicedBufferTest { @Before public void setUp() throws Exception { final MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE); - buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, DATA_SIZE); - buffer.setSize(DATA_SIZE); + buffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE, true, 0); + for (int i = 0; i < DATA_SIZE; ++i) { + buffer.writeByte(i); + } } @Test @@ -137,34 +140,64 @@ public class ReadOnlySlicedBufferTest { @Test public void testCreateSlice1() { + buffer.readByte(); // so that we do not start at position 0 ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(); + buffer.readByte(); // should not influence the second slice at all ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(); ByteBuf unwrap = slice2.unwrap(); assertSame(buffer, unwrap); + assertSame(slice1.getMemorySegment(), slice2.getMemorySegment()); + assertEquals(1, slice1.getMemorySegmentOffset()); + assertEquals(slice1.getMemorySegmentOffset(), slice2.getMemorySegmentOffset()); + + assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertReadableBytes(slice2, 1, 2, 3, 4, 5, 6, 7, 8, 9); } @Test public void testCreateSlice2() { + buffer.readByte(); // so that we do not start at position 0 ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(); + buffer.readByte(); // should not influence the second slice at all ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2); ByteBuf unwrap = slice2.unwrap(); assertSame(buffer, unwrap); + assertSame(slice1.getMemorySegment(), slice2.getMemorySegment()); + assertEquals(1, slice1.getMemorySegmentOffset()); + assertEquals(2, slice2.getMemorySegmentOffset()); + + assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9); + assertReadableBytes(slice2, 2, 3); } @Test public void testCreateSlice3() { ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 2); + buffer.readByte(); // should not influence the second slice at all ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(); ByteBuf unwrap = slice2.unwrap(); assertSame(buffer, unwrap); + assertSame(slice1.getMemorySegment(), slice2.getMemorySegment()); + assertEquals(1, slice1.getMemorySegmentOffset()); + assertEquals(1, slice2.getMemorySegmentOffset()); + + assertReadableBytes(slice1, 1, 2); + assertReadableBytes(slice2, 1, 2); } @Test public void testCreateSlice4() { ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 5); + buffer.readByte(); // should not influence the second slice at all ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2); ByteBuf unwrap = slice2.unwrap(); assertSame(buffer, unwrap); + assertSame(slice1.getMemorySegment(), slice2.getMemorySegment()); + assertEquals(1, slice1.getMemorySegmentOffset()); + assertEquals(2, slice2.getMemorySegmentOffset()); + + assertReadableBytes(slice1, 1, 2, 3, 4, 5); + assertReadableBytes(slice2, 2, 3); } @Test @@ -323,4 +356,26 @@ public class ReadOnlySlicedBufferTest { assertSame(buffer.alloc(), slice.alloc()); assertSame(allocator, slice.alloc()); } + + private static void assertReadableBytes(Buffer actualBuffer, int... expectedBytes) { + ByteBuffer actualBytesBuffer = actualBuffer.getNioBufferReadable(); + int[] actual = new int[actualBytesBuffer.limit()]; + for (int i = 0; i < actual.length; ++i) { + actual[i] = actualBytesBuffer.get(); + } + assertArrayEquals(expectedBytes, actual); + + // verify absolutely positioned read method: + ByteBuf buffer = (ByteBuf) actualBuffer; + for (int i = 0; i < buffer.readableBytes(); ++i) { + actual[i] = buffer.getByte(buffer.readerIndex() + i); + } + assertArrayEquals(expectedBytes, actual); + + // verify relatively positioned read method: + for (int i = 0; i < buffer.readableBytes(); ++i) { + actual[i] = buffer.readByte(); + } + assertArrayEquals(expectedBytes, actual); + } }
