Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r168725339
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
---
@@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws
IOException, InterruptedExcep
&& bufferOrEvent.getEvent().getClass() ==
EndOfPartitionEvent.class
&& inputGate.isFinished()) {
+ checkState(!bufferOrEvent.moreAvailable());
if (!inputGatesWithRemainingData.remove(inputGate)) {
throw new IllegalStateException("Couldn't find
input gate in set of remaining " +
"input gates.");
}
}
+ if (bufferOrEvent.moreAvailable()) {
+ // this buffer or event was now removed from the
non-empty gates queue
+ // we re-add it in case it has more data, because in
that case no "non-empty" notification
+ // will come for that gate
+ queueInputGate(inputGate);
+ }
+
// Set the channel index to identify the input channel (across
all unioned input gates)
final int channelIndexOffset =
inputGateToIndexOffsetMap.get(inputGate);
bufferOrEvent.setChannelIndex(channelIndexOffset +
bufferOrEvent.getChannelIndex());
- return bufferOrEvent;
+ return Optional.ofNullable(bufferOrEvent);
+ }
+
+ @Override
+ public Optional<BufferOrEvent> pollNextBufferOrEvent() throws
IOException, InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ private InputGateWithData waitAndGetNextInputGate() throws IOException,
InterruptedException {
+ while (true) {
+ InputGate inputGate;
+ synchronized (inputGatesWithData) {
+ while (inputGatesWithData.size() == 0) {
+ inputGatesWithData.wait();
+ }
+ inputGate = inputGatesWithData.remove();
+ enqueuedInputGatesWithData.remove(inputGate);
+ }
+
+ // In case of inputGatesWithData being inaccurate do
not block on an empty inputGate, but just poll the data.
--- End diff --
That sounds ok, maybe with some short pointer here to the high-level doc or
else there is an increased change that somebody misses it.
---