This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 213e085cc75174e458a35e669059aa142e971d6b Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 12:23:33 2018 +0200 [hotfix][network] some minor improvements around the network stack --- .../api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 4 ++-- .../org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java | 2 +- .../io/network/netty/CreditBasedPartitionRequestClientHandler.java | 3 +-- .../flink/runtime/io/network/netty/PartitionRequestClientHandler.java | 3 +-- 4 files changed, 5 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 196287b..8630ace 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -502,8 +502,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit int segmentRemaining = numBytes; // check where to go. if we have a partial length, we need to complete it first if (this.lengthBuffer.position() > 0) { - int toPut = Math.min(this.lengthBuffer.remaining(), numBytes); - segment.get(offset, this.lengthBuffer, toPut); + int toPut = Math.min(this.lengthBuffer.remaining(), segmentRemaining); + segment.get(segmentPosition, this.lengthBuffer, toPut); // did we complete the length? if (this.lengthBuffer.hasRemaining()) { return; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java index 489be39..05b7582 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBuffer.java @@ -93,7 +93,7 @@ public class NetworkBuffer extends AbstractReferenceCountedByteBuf implements Bu /** * Creates a new buffer instance backed by the given <tt>memorySegment</tt> with <tt>0</tt> for - * the <tt>readerIndex</tt> and <tt>writerIndex</tt>. + * the <tt>readerIndex</tt> and <tt>size</tt> as <tt>writerIndex</tt>. * * @param memorySegment * backing memory segment (defines {@link #maxCapacity}) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 9aa3920..90daf75 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -327,8 +327,7 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap nettyBuffer.readBytes(byteArray); MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); - Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false); - buffer.setSize(receivedSize); + Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize); inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java index 367c62d..796e86f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java @@ -337,8 +337,7 @@ class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter impleme nettyBuffer.readBytes(byteArray); MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray); - Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false); - buffer.setSize(receivedSize); + Buffer buffer = new NetworkBuffer(memSeg, FreeingBufferRecycler.INSTANCE, false, receivedSize); inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
