[FLINK-8588][runtime] Handle sliced buffers in RecordDeserializer

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1310c725
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1310c725
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1310c725

Branch: refs/heads/master
Commit: 1310c725ab022323eec42c563e6f3a5c59479ed1
Parents: 2c0f4d4
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Wed Jan 31 15:35:24 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:39 2018 +0100

----------------------------------------------------------------------
 .../api/serialization/RecordDeserializer.java   |  3 --
 ...llingAdaptiveSpanningRecordDeserializer.java | 42 +++++++--------
 .../flink/runtime/io/network/buffer/Buffer.java | 14 ++++-
 .../io/network/buffer/NetworkBuffer.java        |  5 ++
 .../buffer/ReadOnlySlicedNetworkBuffer.java     |  9 ++++
 .../SpanningRecordSerializationTest.java        | 54 ++++++++++++++++----
 6 files changed, 89 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
index 4f48d86..10a1b4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordDeserializer.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
 import java.io.IOException;
@@ -57,8 +56,6 @@ public interface RecordDeserializer<T extends 
IOReadableWritable> {
 
        DeserializationResult getNextRecord(T target) throws IOException;
 
-       void setNextMemorySegment(MemorySegment segment, int numBytes) throws 
IOException;
-
        void setNextBuffer(Buffer buffer) throws IOException;
 
        Buffer getCurrentBuffer();

http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 985a93e..fded258 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -66,10 +66,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        public void setNextBuffer(Buffer buffer) throws IOException {
                currentBuffer = buffer;
 
+               int offset = buffer.getMemorySegmentOffset();
                MemorySegment segment = buffer.getMemorySegment();
                int numBytes = buffer.getSize();
 
-               setNextMemorySegment(segment, numBytes);
+               // check if some spanning record deserialization is pending
+               if (this.spanningWrapper.getNumGatheredBytes() > 0) {
+                       
this.spanningWrapper.addNextChunkFromMemorySegment(segment, offset, numBytes);
+               }
+               else {
+                       
this.nonSpanningWrapper.initializeFromMemorySegment(segment, offset, numBytes + 
offset);
+               }
        }
 
        @Override
@@ -80,17 +87,6 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        }
 
        @Override
-       public void setNextMemorySegment(MemorySegment segment, int numBytes) 
throws IOException {
-               // check if some spanning record deserialization is pending
-               if (this.spanningWrapper.getNumGatheredBytes() > 0) {
-                       
this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes);
-               }
-               else {
-                       
this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
-               }
-       }
-
-       @Override
        public DeserializationResult getNextRecord(T target) throws IOException 
{
                // always check the non-spanning wrapper first.
                // this should be the majority of the cases for small records
@@ -500,14 +496,13 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        partial.segment.get(partial.position, 
this.lengthBuffer, partial.remaining());
                }
 
-               private void addNextChunkFromMemorySegment(MemorySegment 
segment, int numBytesInSegment) throws IOException {
-                       int segmentPosition = 0;
-
+               private void addNextChunkFromMemorySegment(MemorySegment 
segment, int offset, int numBytes) throws IOException {
+                       int segmentPosition = offset;
+                       int segmentRemaining = numBytes;
                        // check where to go. if we have a partial length, we 
need to complete it first
                        if (this.lengthBuffer.position() > 0) {
-                               int toPut = 
Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
-                               segment.get(0, this.lengthBuffer, toPut);
-
+                               int toPut = 
Math.min(this.lengthBuffer.remaining(), numBytes);
+                               segment.get(offset, this.lengthBuffer, toPut);
                                // did we complete the length?
                                if (this.lengthBuffer.hasRemaining()) {
                                        return;
@@ -515,8 +510,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                        this.recordLength = 
this.lengthBuffer.getInt(0);
 
                                        this.lengthBuffer.clear();
-                                       segmentPosition = toPut;
-
+                                       segmentPosition += toPut;
+                                       segmentRemaining -= toPut;
                                        if (this.recordLength > 
THRESHOLD_FOR_SPILLING) {
                                                this.spillingChannel = 
createSpillingChannel();
                                        }
@@ -525,8 +520,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
                        // copy as much as we need or can for this next 
spanning record
                        int needed = this.recordLength - 
this.accumulatedRecordBytes;
-                       int available = numBytesInSegment - segmentPosition;
-                       int toCopy = Math.min(needed, available);
+                       int toCopy = Math.min(needed, segmentRemaining);
 
                        if (spillingChannel != null) {
                                // spill to file
@@ -540,11 +534,11 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
 
                        this.accumulatedRecordBytes += toCopy;
 
-                       if (toCopy < available) {
+                       if (toCopy < segmentRemaining) {
                                // there is more data in the segment
                                this.leftOverData = segment;
                                this.leftOverStart = segmentPosition + toCopy;
-                               this.leftOverLimit = numBytesInSegment;
+                               this.leftOverLimit = numBytes + offset;
                        }
 
                        if (accumulatedRecordBytes == recordLength) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
index c649262..96b18ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/Buffer.java
@@ -60,13 +60,25 @@ public interface Buffer {
        void tagAsEvent();
 
        /**
-        * Returns the underlying memory segment.
+        * Returns the underlying memory segment. This method is dangerous 
since it ignores read only protections and omits
+        * slices. Use it only along the {@link #getMemorySegmentOffset()}.
+        *
+        * <p>This method will be removed in the future. For writing use {@link 
BufferBuilder}.
         *
         * @return the memory segment backing this buffer
         */
+       @Deprecated
        MemorySegment getMemorySegment();
 
        /**
+        * This method will be removed in the future. For writing use {@link 
BufferBuilder}.
+        *
+        * @return the offset where this (potential slice) {@link Buffer}'s 
data start in the underlying memory segment.
+        */
+       @Deprecated
+       int getMemorySegmentOffset();
+
+       /**
         * Gets the buffer's recycler.
         *
         * @return buffer recycler

http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index 0f3fa0e..deb0f4d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -132,6 +132,11 @@ public class NetworkBuffer extends 
AbstractReferenceCountedByteBuf implements Bu
        }
 
        @Override
+       public int getMemorySegmentOffset() {
+               return 0;
+       }
+
+       @Override
        public BufferRecycler getRecycler(){
                return recycler;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
index d5f81d0..e4b8113 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/ReadOnlySlicedNetworkBuffer.java
@@ -38,6 +38,8 @@ import java.nio.ReadOnlyBufferException;
  */
 public final class ReadOnlySlicedNetworkBuffer extends ReadOnlyByteBuf 
implements Buffer {
 
+       private final int index;
+
        /**
         * Creates a buffer which shares the memory segment of the given buffer 
and exposed the given
         * sub-region only.
@@ -51,6 +53,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
         */
        ReadOnlySlicedNetworkBuffer(NetworkBuffer buffer, int index, int 
length) {
                super(new SlicedByteBuf(buffer, index, length));
+               this.index = index;
        }
 
        /**
@@ -66,6 +69,7 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
         */
        private ReadOnlySlicedNetworkBuffer(ByteBuf buffer, int index, int 
length) {
                super(new SlicedByteBuf(buffer, index, length));
+               this.index = index;
        }
 
        @Override
@@ -97,6 +101,11 @@ public final class ReadOnlySlicedNetworkBuffer extends 
ReadOnlyByteBuf implement
        }
 
        @Override
+       public int getMemorySegmentOffset() {
+               return ((Buffer) unwrap()).getMemorySegmentOffset() + index;
+       }
+
+       @Override
        public BufferRecycler getRecycler() {
                return ((Buffer) unwrap()).getRecycler();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/1310c725/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
index abcdb1a..aa09681 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializationTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.io.network.api.serialization;
 
 import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
+import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
 import org.apache.flink.testutils.serialization.types.IntType;
 import org.apache.flink.testutils.serialization.types.SerializationTestType;
@@ -29,18 +31,20 @@ import org.apache.flink.testutils.serialization.types.Util;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 
 import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.buildSingleBuffer;
-import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createBufferBuilder;
+import static 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils.createFilledBufferBuilder;
 
 /**
  * Tests for the {@link SpillingAdaptiveSpanningRecordDeserializer}.
  */
 public class SpanningRecordSerializationTest {
+       private static final Random RANDOM = new Random(42);
 
        @Test
        public void testIntRecordsSpanningMultipleSegments() throws Exception {
@@ -124,8 +128,7 @@ public class SpanningRecordSerializationTest {
 
                // 
-------------------------------------------------------------------------------------------------------------
 
-               BufferBuilder bufferBuilder = createBufferBuilder(segmentSize);
-               serializer.setNextBufferBuilder(bufferBuilder);
+               BufferConsumerAndSerializerResult serializationResult = 
setNextBufferForSerializer(serializer, segmentSize);
 
                int numRecords = 0;
                for (SerializationTestType record : records) {
@@ -137,7 +140,7 @@ public class SpanningRecordSerializationTest {
                        // serialize record
                        if (serializer.addRecord(record).isFullBuffer()) {
                                // buffer is full => start deserializing
-                               
deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
+                               
deserializer.setNextBuffer(serializationResult.buildBuffer());
 
                                while (!serializedRecords.isEmpty()) {
                                        SerializationTestType expected = 
serializedRecords.poll();
@@ -153,18 +156,15 @@ public class SpanningRecordSerializationTest {
                                }
 
                                // move buffers as long as necessary (for long 
records)
-                               bufferBuilder = 
createBufferBuilder(segmentSize);
-                               serializer.clear();
-                               while 
(serializer.setNextBufferBuilder(bufferBuilder).isFullBuffer()) {
-                                       
deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
-                                       bufferBuilder = 
createBufferBuilder(segmentSize);
+                               while ((serializationResult = 
setNextBufferForSerializer(serializer, segmentSize)).isFullBuffer()) {
+                                       
deserializer.setNextBuffer(serializationResult.buildBuffer());
                                        serializer.clear();
                                }
                        }
                }
 
                // deserialize left over records
-               deserializer.setNextBuffer(buildSingleBuffer(bufferBuilder));
+               deserializer.setNextBuffer(serializationResult.buildBuffer());
 
                while (!serializedRecords.isEmpty()) {
                        SerializationTestType expected = 
serializedRecords.poll();
@@ -182,4 +182,38 @@ public class SpanningRecordSerializationTest {
                Assert.assertFalse(serializer.hasSerializedData());
                Assert.assertFalse(deserializer.hasUnfinishedData());
        }
+
+       private static BufferConsumerAndSerializerResult 
setNextBufferForSerializer(
+                       RecordSerializer<SerializationTestType> serializer,
+                       int segmentSize) throws IOException {
+               // create a bufferBuilder with some random starting offset to 
properly test handling buffer slices in the
+               // deserialization code.
+               int startingOffset = segmentSize > 2 ? 
RANDOM.nextInt(segmentSize / 2) : 0;
+               BufferBuilder bufferBuilder = 
createFilledBufferBuilder(segmentSize + startingOffset, startingOffset);
+               BufferConsumer bufferConsumer = 
bufferBuilder.createBufferConsumer();
+               bufferConsumer.build().recycleBuffer();
+
+               serializer.clear();
+               return new BufferConsumerAndSerializerResult(
+                       bufferConsumer,
+                       serializer.setNextBufferBuilder(bufferBuilder));
+       }
+
+       private static class BufferConsumerAndSerializerResult {
+               private final BufferConsumer bufferConsumer;
+               private final RecordSerializer.SerializationResult 
serializationResult;
+
+               public BufferConsumerAndSerializerResult(BufferConsumer 
bufferConsumer, RecordSerializer.SerializationResult serializationResult) {
+                       this.bufferConsumer = bufferConsumer;
+                       this.serializationResult = serializationResult;
+               }
+
+               public Buffer buildBuffer() {
+                       return buildSingleBuffer(bufferConsumer);
+               }
+
+               public boolean isFullBuffer() {
+                       return serializationResult.isFullBuffer();
+               }
+       }
 }

Reply via email to