This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.5
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 26f4355fc30c0e8e8e186b0f7148aea79d5446bc
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 12:23:33 2018 +0200

    [hotfix][network] some minor improvements around the network stack
---
 .../api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 4 ++--
 .../org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java     | 2 +-
 .../io/network/netty/CreditBasedPartitionRequestClientHandler.java    | 3 +--
 .../flink/runtime/io/network/netty/PartitionRequestClientHandler.java | 3 +--
 4 files changed, 5 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 196287b..8630ace 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -502,8 +502,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        int segmentRemaining = numBytes;
                        // check where to go. if we have a partial length, we 
need to complete it first
                        if (this.lengthBuffer.position() > 0) {
-                               int toPut = 
Math.min(this.lengthBuffer.remaining(), numBytes);
-                               segment.get(offset, this.lengthBuffer, toPut);
+                               int toPut = 
Math.min(this.lengthBuffer.remaining(), segmentRemaining);
+                               segment.get(segmentPosition, this.lengthBuffer, 
toPut);
                                // did we complete the length?
                                if (this.lengthBuffer.hasRemaining()) {
                                        return;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
index deb0f4d..a5bf30f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java
@@ -92,7 +92,7 @@ public class NetworkBuffer extends 
AbstractReferenceCountedByteBuf implements Bu
 
        /**
         * Creates a new buffer instance backed by the given 
<tt>memorySegment</tt> with <tt>0</tt> for
-        * the <tt>readerIndex</tt> and <tt>writerIndex</tt>.
+        * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>.
         *
         * @param memorySegment
         *              backing memory segment (defines {@link #maxCapacity})
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 9aa3920..90daf75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -327,8 +327,7 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
                                nettyBuffer.readBytes(byteArray);
 
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
-                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
-                               buffer.setSize(receivedSize);
+                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
                                inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
                        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index 367c62d..796e86f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -337,8 +337,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter impleme
                                nettyBuffer.readBytes(byteArray);
 
                                MemorySegment memSeg = 
MemorySegmentFactory.wrap(byteArray);
-                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false);
-                               buffer.setSize(receivedSize);
+                               Buffer buffer = new NetworkBuffer(memSeg, 
FreeingBufferRecycler.INSTANCE, false, receivedSize);
 
                                inputChannel.onBuffer(buffer, 
bufferOrEvent.sequenceNumber, -1);
 

Reply via email to