Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r168486202 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java --- @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedExcep && bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class && inputGate.isFinished()) { + checkState(!bufferOrEvent.moreAvailable()); if (!inputGatesWithRemainingData.remove(inputGate)) { throw new IllegalStateException("Couldn't find input gate in set of remaining " + "input gates."); } } + if (bufferOrEvent.moreAvailable()) { + // this buffer or event was now removed from the non-empty gates queue + // we re-add it in case it has more data, because in that case no "non-empty" notification + // will come for that gate + queueInputGate(inputGate); + } + // Set the channel index to identify the input channel (across all unioned input gates) final int channelIndexOffset = inputGateToIndexOffsetMap.get(inputGate); bufferOrEvent.setChannelIndex(channelIndexOffset + bufferOrEvent.getChannelIndex()); - return bufferOrEvent; + return Optional.ofNullable(bufferOrEvent); + } + + @Override + public Optional<BufferOrEvent> pollNextBufferOrEvent() throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + private InputGateWithData waitAndGetNextInputGate() throws IOException, InterruptedException { + while (true) { + InputGate inputGate; + synchronized (inputGatesWithData) { + while (inputGatesWithData.size() == 0) { + inputGatesWithData.wait(); + } + inputGate = inputGatesWithData.remove(); + enqueuedInputGatesWithData.remove(inputGate); + } + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. --- End diff -- It's kind of bad place for such comment - it can outdate without any control :/ What `UnionInputGate` know about `OutputFlusher` from the sender. This code should just assume that there is no guarantees about data notifications being accurate. It should be place in some high level network stack documentation.
---