StefanRRichter commented on a change in pull request #8361: [FLINK-12434][network] Replace listeners with CompletableFuture in InputGates URL: https://github.com/apache/flink/pull/8361#discussion_r282024637
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java ########## @@ -223,15 +214,24 @@ public void requestPartitions() throws IOException, InterruptedException { return Optional.empty(); } } - inputGate = inputGatesWithData.remove(); - enqueuedInputGatesWithData.remove(inputGate); - moreInputGatesAvailable = enqueuedInputGatesWithData.size() > 0; - } + final InputGate inputGate = inputGatesWithData.remove(); + + // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. + Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); - // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. - Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); - if (bufferOrEvent.isPresent()) { - return Optional.of(new InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable)); + if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { + // enqueue the inputGate at the end to avoid starvation + inputGatesWithData.add(inputGate); + } else { + enqueuedInputGatesWithData.remove(inputGate); + } + + if (bufferOrEvent.isPresent()) { Review comment: Similar to the loop head in the single input, I think this can be fused by splitting the if in line 222. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services