This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit a0751329da85dba31c5959c78cee7598dcddaabe Author: Piotr Nowojski <[email protected]> AuthorDate: Tue May 7 14:28:21 2019 +0200 [hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWithData fields with single LinkedHashSet --- .../network/partition/consumer/UnionInputGate.java | 24 ++++++++++------------ 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java index fcae79f..7d457ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java @@ -25,8 +25,8 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; import org.apache.flink.shaded.guava18.com.google.common.collect.Sets; import java.io.IOException; -import java.util.ArrayDeque; -import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -70,13 +70,11 @@ public class UnionInputGate implements InputGate, InputGateListener { private final Set<InputGate> inputGatesWithRemainingData; - /** Gates, which notified this input gate about available data. */ - private final ArrayDeque<InputGate> inputGatesWithData = new ArrayDeque<>(); - /** - * Guardian against enqueuing an {@link InputGate} multiple times on {@code inputGatesWithData}. + * Gates, which notified this input gate about available data. We are using it as a FIFO + * queue of {@link InputGate}s to avoid starvation and provide some basic fairness. */ - private final Set<InputGate> enqueuedInputGatesWithData = new HashSet<>(); + private final LinkedHashSet<InputGate> inputGatesWithData = new LinkedHashSet<>(); /** The total number of input channels across all unioned input gates. */ private final int totalNumberOfInputChannels; @@ -214,7 +212,10 @@ public class UnionInputGate implements InputGate, InputGateListener { return Optional.empty(); } } - final InputGate inputGate = inputGatesWithData.remove(); + + Iterator<InputGate> inputGateIterator = inputGatesWithData.iterator(); + final InputGate inputGate = inputGateIterator.next(); + inputGateIterator.remove(); // In case of inputGatesWithData being inaccurate do not block on an empty inputGate, but just poll the data. Optional<BufferOrEvent> bufferOrEvent = inputGate.pollNextBufferOrEvent(); @@ -222,15 +223,13 @@ public class UnionInputGate implements InputGate, InputGateListener { if (bufferOrEvent.isPresent() && bufferOrEvent.get().moreAvailable()) { // enqueue the inputGate at the end to avoid starvation inputGatesWithData.add(inputGate); - } else { - enqueuedInputGatesWithData.remove(inputGate); } if (bufferOrEvent.isPresent()) { return Optional.of(new InputGateWithData( inputGate, bufferOrEvent.get(), - !enqueuedInputGatesWithData.isEmpty())); + !inputGatesWithData.isEmpty())); } } } @@ -290,14 +289,13 @@ public class UnionInputGate implements InputGate, InputGateListener { int availableInputGates; synchronized (inputGatesWithData) { - if (enqueuedInputGatesWithData.contains(inputGate)) { + if (inputGatesWithData.contains(inputGate)) { return; } availableInputGates = inputGatesWithData.size(); inputGatesWithData.add(inputGate); - enqueuedInputGatesWithData.add(inputGate); if (availableInputGates == 0) { inputGatesWithData.notifyAll();
