This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit cd13fdfa0c8b0db7c0087d9fe40f1522ccb5d122 Author: sunhaibotb <[email protected]> AuthorDate: Tue May 7 13:52:50 2019 +0200 [hotfix][network] Implement UnionInputGate#pollNextBufferOrEvent method --- .../network/partition/consumer/UnionInputGate.java | 31 +++++++++++++++------- .../partition/consumer/SingleInputGateTest.java | 12 +-------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index ea83004..196743e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -159,6 +159,15 @@ public class UnionInputGate implements InputGate, InputGateListener { @Override public Optional<BufferOrEvent> getNextBufferOrEvent() throws IOException, InterruptedException { + return getNextBufferOrEvent(true); + } + + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { + return getNextBufferOrEvent(false); + } + + private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException { if (inputGatesWithRemainingData.isEmpty()) { return Optional.empty(); } @@ -166,7 +175,12 @@ public class UnionInputGate implements InputGate, InputGateListener { // Make sure to request the partitions, if they have not been requested before. requestPartitions(); - InputGateWithData inputGateWithData = waitAndGetNextInputGate(); + Optional<InputGateWithData> next = waitAndGetNextInputGate(blocking); + if (!next.isPresent()) { + return Optional.empty(); + } + + InputGateWithData inputGateWithData = next.get(); InputGate inputGate = inputGateWithData.inputGate; BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent; @@ -197,18 +211,17 @@ public class UnionInputGate implements InputGate, InputGateListener { return Optional.of(bufferOrEvent); } - @Override - public Optional<BufferOrEvent> pollNextBufferOrEvent() throws UnsupportedOperationException { - throw new UnsupportedOperationException(); - } - - private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + private Optional<InputGateWithData> waitAndGetNextInputGate(boolean blocking) throws IOException, InterruptedException { while (true) { InputGate inputGate; boolean moreInputGatesAvailable; synchronized (inputGatesWithData) { while (inputGatesWithData.size() == 0) { - inputGatesWithData.wait(); + if (blocking) { + inputGatesWithData.wait(); + } else { + return Optional.empty(); + } } inputGate = inputGatesWithData.remove(); enqueuedInputGatesWithData.remove(inputGate); @@ -218,7 +231,7 @@ public class UnionInputGate implements InputGate, InputGateListener { // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); if (bufferOrEvent.isPresent()) { - return new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable); + return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable)); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java index 71e4f5a..d82d571 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java @@ -619,17 +619,7 @@ public class SingleInputGateTest { assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex()); assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable()); if (!expectedMoreAvailable) { - try { - assertFalse(inputGate.pollNextBufferOrEvent().isPresent()); - } - catch (UnsupportedOperationException ex) { - /** - * {@link UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment. - */ - if (!(inputGate instanceof UnionInputGate)) { - throw ex; - } - } + assertFalse(inputGate.pollNextBufferOrEvent().isPresent()); } } }
