This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5526ec7eefe858d9535f23a057b9f2c6c725ab92 Author: Nico Kruber <[email protected]> AuthorDate: Tue Aug 21 17:16:29 2018 +0200 [hotfix][network] simplify moreAvailable/wasEmpty logic If we only need the status of a queue being empty or not, we do not need to acquire the size. --- .../io/network/partition/consumer/RemoteInputChannel.java | 10 +++++----- .../runtime/io/network/partition/consumer/SingleInputGate.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 28f3020..79d25c6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -191,16 +191,16 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, checkError(); final Buffer next; - final int remaining; + final boolean moreAvailable; synchronized (receivedBuffers) { next = receivedBuffers.poll(); - remaining = receivedBuffers.size(); + moreAvailable = !receivedBuffers.isEmpty(); } numBytesIn.inc(next.getSizeUnsafe()); numBuffersIn.inc(); - return Optional.of(new BufferAndAvailability(next, remaining > 0, getSenderBacklog())); + return Optional.of(new BufferAndAvailability(next, moreAvailable, getSenderBacklog())); } // ------------------------------------------------------------------------ @@ -516,12 +516,12 @@ public class RemoteInputChannel extends InputChannel implements BufferRecycler, synchronized (receivedBuffers) { if (!isReleased.get()) { if (expectedSequenceNumber == sequenceNumber) { - int available = receivedBuffers.size(); + final boolean wasEmpty = receivedBuffers.isEmpty(); receivedBuffers.add(buffer); expectedSequenceNumber++; - if (available == 0) { + if (wasEmpty) { notifyChannelNonEmpty(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index 2e7d076..f51dc74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -545,7 +545,7 @@ public class SingleInputGate implements InputGate { currentChannel = inputChannelsWithData.remove(); enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); - moreAvailable = inputChannelsWithData.size() > 0; + moreAvailable = !inputChannelsWithData.isEmpty(); } result = currentChannel.getNextBuffer();
