This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b84dbf423a380e8e6d44694cdeb06eb968353aa Author: Piotr Nowojski <[email protected]> AuthorDate: Tue May 7 12:26:23 2019 +0200 [hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent Previously in case of more data available, re-enquing a channel or an inputGate was done in a separate critical section, resulting with more complicated concurrency contract (critical section split into two). Side effect of this change is that now recursive getNextBuffer/pollNextBufferOrEvent are happening also under the lock, however they are non-blocking, so this shouldn't cause any issues. --- .../partition/consumer/SingleInputGate.java | 21 +++++++------ .../network/partition/consumer/UnionInputGate.java | 34 +++++++++++----------- 2 files changed, 27 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index d40af83..750e678 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -552,21 +552,20 @@ public class SingleInputGate implements InputGate { } currentChannel = inputChannelsWithData.remove(); - enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); + + result = currentChannel.getNextBuffer(); + + if (result.isPresent() && result.get().moreAvailable()) { + // enqueue the currentChannel at the end to avoid starvation + inputChannelsWithData.add(currentChannel); + } else { + enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex()); + } + moreAvailable = !inputChannelsWithData.isEmpty(); } - - result = currentChannel.getNextBuffer(); } while (!result.isPresent()); - // this channel was now removed from the non-empty channels queue - // we re-add it in case it has more data, because in that case no "non-empty" notification - // will come for that channel - if (result.get().moreAvailable()) { - queueChannel(currentChannel); - moreAvailable = true; - } - final Buffer buffer = result.get().buffer(); numBytesIn.inc(buffer.getSizeUnsafe()); if (buffer.isBuffer()) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index 196743e..fcae79f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -184,13 +184,6 @@ public class UnionInputGate implements InputGate, InputGateListener { InputGate inputGate = inputGateWithData.inputGate; BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent; - if (bufferOrEvent.moreAvailable()) { - // this buffer or event was now removed from the non-empty gates queue - // we re-add it in case it has more data, because in that case no "non-empty" notification - // will come for that gate - queueInputGate(inputGate); - } - if (bufferOrEvent.isEvent() && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { @@ -213,8 +206,6 @@ public class UnionInputGate implements InputGate, InputGateListener { private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException { while (true) { - InputGate inputGate; - boolean moreInputGatesAvailable; synchronized (inputGatesWithData) { while (inputGatesWithData.size() == 0) { if (blocking) { @@ -223,15 +214,24 @@ public class UnionInputGate implements InputGate, InputGateListener { return Optional.empty(); } } - inputGate = inputGatesWithData.remove(); - enqueuedInputGatesWithData.remove(inputGate); - moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0; - } + final InputGate inputGate = inputGatesWithData.remove(); + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. + Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); - // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. - Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); - if (bufferOrEvent.isPresent()) { - return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable)); + if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { + // enqueue the inputGate at the end to avoid starvation + inputGatesWithData.add(inputGate); + } else { + enqueuedInputGatesWithData.remove(inputGate); + } + + if (bufferOrEvent.isPresent()) { + return Optional.of(new InputGateWithData( + inputGate, + bufferOrEvent.get(), + !enqueuedInputGatesWithData.isEmpty())); + } } } }
