[hotfix] [network] Various minor improvements

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7e2e25cf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7e2e25cf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7e2e25cf

Branch: refs/heads/release-1.5
Commit: 7e2e25cfa6be737803a74b75ffb231568a51513e
Parents: adae7d9
Author: Nico Kruber <n...@data-artisans.com>
Authored: Thu Feb 22 14:17:06 2018 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Mar 9 17:01:45 2018 +0100

----------------------------------------------------------------------
 .../network/netty/CreditBasedSequenceNumberingViewReader.java   | 3 ++-
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java   | 5 ++++-
 .../runtime/io/network/netty/SequenceNumberingViewReader.java   | 1 +
 .../runtime/io/network/partition/SubpartitionTestBase.java      | 2 +-
 4 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7e2e25cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
index 9acbbac..8fc7ef4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java
@@ -114,7 +114,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
         */
        @Override
        public boolean isAvailable() {
-               // BEWARE: this must be in sync with #isAvailable()!
+               // BEWARE: this must be in sync with 
#isAvailable(BufferAndBacklog)!
                return hasBuffersAvailable() &&
                        (numCreditsAvailable > 0 || 
subpartitionView.nextBufferIsEvent());
        }
@@ -130,6 +130,7 @@ class CreditBasedSequenceNumberingViewReader implements 
BufferAvailabilityListen
         *              current buffer and backlog including information about 
the next buffer
         */
        private boolean isAvailable(BufferAndBacklog bufferAndBacklog) {
+               // BEWARE: this must be in sync with #isAvailable()!
                return bufferAndBacklog.isMoreAvailable() &&
                        (numCreditsAvailable > 0 || 
bufferAndBacklog.nextBufferIsEvent());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e2e25cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
index d63a88e..8c05b82 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java
@@ -38,6 +38,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdap
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Set;
@@ -52,7 +54,7 @@ import static 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferRespo
  */
 class PartitionRequestQueue extends ChannelInboundHandlerAdapter {
 
-       private final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestQueue.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(PartitionRequestQueue.class);
 
        private final ChannelFutureListener writeListener = new 
WriteAndFlushNextMessageIfPossibleListener();
 
@@ -278,6 +280,7 @@ class PartitionRequestQueue extends 
ChannelInboundHandlerAdapter {
                reader.setRegisteredAsAvailable(true);
        }
 
+       @Nullable
        private NetworkSequenceViewReader pollAvailableReader() {
                NetworkSequenceViewReader reader = availableReaders.poll();
                if (reader != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/7e2e25cf/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
index 6a83af1..054046f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java
@@ -148,6 +148,7 @@ class SequenceNumberingViewReader implements 
BufferAvailabilityListener, Network
                        "requestLock=" + requestLock +
                        ", receiverId=" + receiverId +
                        ", sequenceNumber=" + sequenceNumber +
+                       ", isRegisteredAsAvailable=" + isRegisteredAvailable +
                        '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7e2e25cf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
index 215726b..a3f18f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java
@@ -134,7 +134,7 @@ public abstract class SubpartitionTestBase extends 
TestLogger {
                assertTrue(view.isReleased());
        }
 
-       protected void assertNextBuffer(
+       static void assertNextBuffer(
                        ResultSubpartitionView readView,
                        int expectedReadableBufferSize,
                        boolean expectedIsMoreAvailable,

Reply via email to