[FLINK-8736][network] fix memory segment offsets for slices of slices being 
wrong

This closes #5551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9fb1c23a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9fb1c23a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9fb1c23a

Branch: refs/heads/master
Commit: 9fb1c23aaead71bd7e81a6a73be1b4206dac405f
Parents: 6597e67
Author: Nico Kruber <[email protected]>
Authored: Wed Feb 21 17:09:31 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Tue Feb 27 09:07:15 2018 +0100

----------------------------------------------------------------------
 .../buffer/ReadOnlySlicedNetworkBuffer.java     | 13 +++--
 .../buffer/ReadOnlySlicedBufferTest.java        | 59 +++++++++++++++++++-
 2 files changed, 64 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9fb1c23a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index e4b8113..52fb57a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -38,7 +38,7 @@ import java.nio.ReadOnlyBufferException;
  */
 public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf 
implements Buffer {
 
-       private final int index;
+       private final int memorySegmentOffset;
 
        /**
         * Creates a buffer which shares the memory segment of the given buffer 
and exposed the given
@@ -53,7 +53,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
         */
        ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int 
length) {
                super(new SlicedByteBuf(buffer, index, length));
-               this.index = index;
+               this.memorySegmentOffset = buffer.getMemorySegmentOffset() + 
index;
        }
 
        /**
@@ -66,10 +66,11 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
         * @param buffer the buffer to derive from
         * @param index the index to start from
         * @param length the length of the slice
+        * @param memorySegmentOffset <tt>buffer</tt>'s absolute offset in the 
backing {@link MemorySegment}
         */
-       private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int 
length) {
+       private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int 
length, int memorySegmentOffset) {
                super(new SlicedByteBuf(buffer, index, length));
-               this.index = index;
+               this.memorySegmentOffset = memorySegmentOffset + index;
        }
 
        @Override
@@ -102,7 +103,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
 
        @Override
        public int getMemorySegmentOffset() {
-               return ((Buffer) unwrap()).getMemorySegmentOffset() + index;
+               return memorySegmentOffset;
        }
 
        @Override
@@ -133,7 +134,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
 
        @Override
        public ReadOnlySlicedNetworkBuffer readOnlySlice(int index, int length) 
{
-               return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, 
length);
+               return new ReadOnlySlicedNetworkBuffer(super.unwrap(), index, 
length, memorySegmentOffset);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9fb1c23a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
index 834ec74..529b0f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedBufferTest.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ReadOnlyBufferException;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
@@ -50,8 +51,10 @@ public class ReadOnlySlicedBufferTest {
        @Before
        public void setUp() throws Exception {
                final MemorySegment segment = 
MemorySegmentFactory.allocateUnpooledSegment(BUFFER_SIZE);
-               buffer = new NetworkBuffer(segment, 
FreeingBufferRecycler.INSTANCE, true, DATA_SIZE);
-               buffer.setSize(DATA_SIZE);
+               buffer = new NetworkBuffer(segment, 
FreeingBufferRecycler.INSTANCE, true, 0);
+               for (int i = 0; i < DATA_SIZE; ++i) {
+                       buffer.writeByte(i);
+               }
        }
 
        @Test
@@ -137,34 +140,64 @@ public class ReadOnlySlicedBufferTest {
 
        @Test
        public void testCreateSlice1() {
+               buffer.readByte(); // so that we do not start at position 0
                ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+               buffer.readByte(); // should not influence the second slice at 
all
                ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
                ByteBuf unwrap = slice2.unwrap();
                assertSame(buffer, unwrap);
+               assertSame(slice1.getMemorySegment(), 
slice2.getMemorySegment());
+               assertEquals(1, slice1.getMemorySegmentOffset());
+               assertEquals(slice1.getMemorySegmentOffset(), 
slice2.getMemorySegmentOffset());
+
+               assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+               assertReadableBytes(slice2, 1, 2, 3, 4, 5, 6, 7, 8, 9);
        }
 
        @Test
        public void testCreateSlice2() {
+               buffer.readByte(); // so that we do not start at position 0
                ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice();
+               buffer.readByte(); // should not influence the second slice at 
all
                ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
                ByteBuf unwrap = slice2.unwrap();
                assertSame(buffer, unwrap);
+               assertSame(slice1.getMemorySegment(), 
slice2.getMemorySegment());
+               assertEquals(1, slice1.getMemorySegmentOffset());
+               assertEquals(2, slice2.getMemorySegmentOffset());
+
+               assertReadableBytes(slice1, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+               assertReadableBytes(slice2, 2, 3);
        }
 
        @Test
        public void testCreateSlice3() {
                ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 2);
+               buffer.readByte(); // should not influence the second slice at 
all
                ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice();
                ByteBuf unwrap = slice2.unwrap();
                assertSame(buffer, unwrap);
+               assertSame(slice1.getMemorySegment(), 
slice2.getMemorySegment());
+               assertEquals(1, slice1.getMemorySegmentOffset());
+               assertEquals(1, slice2.getMemorySegmentOffset());
+
+               assertReadableBytes(slice1, 1, 2);
+               assertReadableBytes(slice2, 1, 2);
        }
 
        @Test
        public void testCreateSlice4() {
                ReadOnlySlicedNetworkBuffer slice1 = buffer.readOnlySlice(1, 5);
+               buffer.readByte(); // should not influence the second slice at 
all
                ReadOnlySlicedNetworkBuffer slice2 = slice1.readOnlySlice(1, 2);
                ByteBuf unwrap = slice2.unwrap();
                assertSame(buffer, unwrap);
+               assertSame(slice1.getMemorySegment(), 
slice2.getMemorySegment());
+               assertEquals(1, slice1.getMemorySegmentOffset());
+               assertEquals(2, slice2.getMemorySegmentOffset());
+
+               assertReadableBytes(slice1, 1, 2, 3, 4, 5);
+               assertReadableBytes(slice2, 2, 3);
        }
 
        @Test
@@ -323,4 +356,26 @@ public class ReadOnlySlicedBufferTest {
                assertSame(buffer.alloc(), slice.alloc());
                assertSame(allocator, slice.alloc());
        }
+
+       private static void assertReadableBytes(Buffer actualBuffer, int... 
expectedBytes) {
+               ByteBuffer actualBytesBuffer = 
actualBuffer.getNioBufferReadable();
+               int[] actual = new int[actualBytesBuffer.limit()];
+               for (int i = 0; i < actual.length; ++i) {
+                       actual[i] = actualBytesBuffer.get();
+               }
+               assertArrayEquals(expectedBytes, actual);
+
+               // verify absolutely positioned read method:
+               ByteBuf buffer = (ByteBuf) actualBuffer;
+               for (int i = 0; i < buffer.readableBytes(); ++i) {
+                       actual[i] = buffer.getByte(buffer.readerIndex() + i);
+               }
+               assertArrayEquals(expectedBytes, actual);
+
+               // verify relatively positioned read method:
+               for (int i = 0; i < buffer.readableBytes(); ++i) {
+                       actual[i] = buffer.readByte();
+               }
+               assertArrayEquals(expectedBytes, actual);
+       }
 }

Reply via email to