[hotfix] [network] Various minor improvements
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/30eb8cd0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/30eb8cd0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/30eb8cd0 Branch: refs/heads/master Commit: 30eb8cd026a2e00397cea645814b384f5774366d Parents: 6abc8a9 Author: Nico Kruber <n...@data-artisans.com> Authored: Thu Feb 22 14:17:06 2018 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Mar 9 16:49:40 2018 +0100 ---------------------------------------------------------------------- .../network/netty/CreditBasedSequenceNumberingViewReader.java | 3 ++- .../flink/runtime/io/network/netty/PartitionRequestQueue.java | 5 ++++- .../runtime/io/network/netty/SequenceNumberingViewReader.java | 1 + .../runtime/io/network/partition/SubpartitionTestBase.java | 2 +- 4 files changed, 8 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/30eb8cd0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java index 9acbbac..8fc7ef4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java @@ -114,7 +114,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen */ @Override public boolean isAvailable() { - // BEWARE: this must be in sync with #isAvailable()! + // BEWARE: this must be in sync with #isAvailable(BufferAndBacklog)! return hasBuffersAvailable() && (numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent()); } @@ -130,6 +130,7 @@ class CreditBasedSequenceNumberingViewReader implements BufferAvailabilityListen * current buffer and backlog including information about the next buffer */ private boolean isAvailable(BufferAndBacklog bufferAndBacklog) { + // BEWARE: this must be in sync with #isAvailable()! return bufferAndBacklog.isMoreAvailable() && (numCreditsAvailable > 0 || bufferAndBacklog.nextBufferIsEvent()); } http://git-wip-us.apache.org/repos/asf/flink/blob/30eb8cd0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java index d63a88e..8c05b82 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java @@ -38,6 +38,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdap import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayDeque; import java.util.Set; @@ -52,7 +54,7 @@ import static org.apache.flink.runtime.io.network.netty.NettyMessage.BufferRespo */ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { - private final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class); + private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestQueue.class); private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener(); @@ -278,6 +280,7 @@ class PartitionRequestQueue extends ChannelInboundHandlerAdapter { reader.setRegisteredAsAvailable(true); } + @Nullable private NetworkSequenceViewReader pollAvailableReader() { NetworkSequenceViewReader reader = availableReaders.poll(); if (reader != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/30eb8cd0/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java index 6a83af1..054046f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java @@ -148,6 +148,7 @@ class SequenceNumberingViewReader implements BufferAvailabilityListener, Network "requestLock=" + requestLock + ", receiverId=" + receiverId + ", sequenceNumber=" + sequenceNumber + + ", isRegisteredAsAvailable=" + isRegisteredAvailable + '}'; } } http://git-wip-us.apache.org/repos/asf/flink/blob/30eb8cd0/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java index 215726b..a3f18f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java @@ -134,7 +134,7 @@ public abstract class SubpartitionTestBase extends TestLogger { assertTrue(view.isReleased()); } - protected void assertNextBuffer( + static void assertNextBuffer( ResultSubpartitionView readView, int expectedReadableBufferSize, boolean expectedIsMoreAvailable,