This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5526ec7eefe858d9535f23a057b9f2c6c725ab92
Author: Nico Kruber <[email protected]>
AuthorDate: Tue Aug 21 17:16:29 2018 +0200

    [hotfix][network] simplify moreAvailable/wasEmpty logic
    
    If we only need the status of a queue being empty or not, we do not need to
    acquire the size.
---
 .../io/network/partition/consumer/RemoteInputChannel.java      | 10 +++++-----
 .../runtime/io/network/partition/consumer/SingleInputGate.java |  2 +-
 2 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 28f3020..79d25c6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -191,16 +191,16 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                checkError();
 
                final Buffer next;
-               final int remaining;
+               final boolean moreAvailable;
 
                synchronized (receivedBuffers) {
                        next = receivedBuffers.poll();
-                       remaining = receivedBuffers.size();
+                       moreAvailable = !receivedBuffers.isEmpty();
                }
 
                numBytesIn.inc(next.getSizeUnsafe());
                numBuffersIn.inc();
-               return Optional.of(new BufferAndAvailability(next, remaining > 
0, getSenderBacklog()));
+               return Optional.of(new BufferAndAvailability(next, 
moreAvailable, getSenderBacklog()));
        }
 
        // 
------------------------------------------------------------------------
@@ -516,12 +516,12 @@ public class RemoteInputChannel extends InputChannel 
implements BufferRecycler,
                        synchronized (receivedBuffers) {
                                if (!isReleased.get()) {
                                        if (expectedSequenceNumber == 
sequenceNumber) {
-                                               int available = 
receivedBuffers.size();
+                                               final boolean wasEmpty = 
receivedBuffers.isEmpty();
 
                                                receivedBuffers.add(buffer);
                                                expectedSequenceNumber++;
 
-                                               if (available == 0) {
+                                               if (wasEmpty) {
                                                        notifyChannelNonEmpty();
                                                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 2e7d076..f51dc74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -545,7 +545,7 @@ public class SingleInputGate implements InputGate {
 
                                currentChannel = inputChannelsWithData.remove();
                                
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
-                               moreAvailable = inputChannelsWithData.size() > 
0;
+                               moreAvailable = 
!inputChannelsWithData.isEmpty();
                        }
 
                        result = currentChannel.getNextBuffer();

Reply via email to