Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5423#discussion_r167818220
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 ---
    @@ -169,18 +170,60 @@ public BufferOrEvent getNextBufferOrEvent() throws 
IOException, InterruptedExcep
                        && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
                        && inputGate.isFinished()) {
     
    +                   checkState(!bufferOrEvent.moreAvailable());
                        if (!inputGatesWithRemainingData.remove(inputGate)) {
                                throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
                                        "input gates.");
                        }
                }
     
    +           if (bufferOrEvent.moreAvailable()) {
    +                   // this buffer or event was now removed from the 
non-empty gates queue
    +                   // we re-add it in case it has more data, because in 
that case no "non-empty" notification
    +                   // will come for that gate
    +                   queueInputGate(inputGate);
    +           }
    +
                // Set the channel index to identify the input channel (across 
all unioned input gates)
                final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
     
                bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
     
    -           return bufferOrEvent;
    +           return Optional.ofNullable(bufferOrEvent);
    +   }
    +
    +   @Override
    +   public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
    +           throw new UnsupportedOperationException();
    +   }
    +
    +   private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
    +           while (true) {
    +                   InputGate inputGate;
    +                   synchronized (inputGatesWithData) {
    +                           while (inputGatesWithData.size() == 0) {
    +                                   inputGatesWithData.wait();
    +                           }
    +                           inputGate = inputGatesWithData.remove();
    +                           enqueuedInputGatesWithData.remove(inputGate);
    +                   }
    +
    +                   // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
    --- End diff --
    
    Maybe we can add a comment explaining why this can happen now, i.e. 
mentioning about the output flusher.


---

Reply via email to