This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a0751329da85dba31c5959c78cee7598dcddaabe
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue May 7 14:28:21 2019 +0200

    [hotfix][network] Replace inputGatesWithData and enqueuedInputGatesWithData 
fields with single LinkedHashSet
---
 .../network/partition/consumer/UnionInputGate.java | 24 ++++++++++------------
 1 file changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index fcae79f..7d457ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -25,8 +25,8 @@ import 
org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
 import org.apache.flink.shaded.guava18.com.google.common.collect.Sets;
 
 import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -70,13 +70,11 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
 
        private final Set<InputGate> inputGatesWithRemainingData;
 
-       /** Gates, which notified this input gate about available data. */
-       private final ArrayDeque<InputGate> inputGatesWithData = new 
ArrayDeque<>();
-
        /**
-        * Guardian against enqueuing an {@link InputGate} multiple times on 
{@code inputGatesWithData}.
+        * Gates, which notified this input gate about available data. We are 
using it as a FIFO
+        * queue of {@link InputGate}s to avoid starvation and provide some 
basic fairness.
         */
-       private final Set<InputGate> enqueuedInputGatesWithData = new 
HashSet<>();
+       private final LinkedHashSet<InputGate> inputGatesWithData = new 
LinkedHashSet<>();
 
        /** The total number of input channels across all unioned input gates. 
*/
        private final int totalNumberOfInputChannels;
@@ -214,7 +212,10 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                                return Optional.empty();
                                        }
                                }
-                               final InputGate inputGate = 
inputGatesWithData.remove();
+
+                               Iterator<InputGate> inputGateIterator = 
inputGatesWithData.iterator();
+                               final InputGate inputGate = 
inputGateIterator.next();
+                               inputGateIterator.remove();
 
                                // In case of inputGatesWithData being 
inaccurate do not block on an empty inputGate, but just poll the data.
                                Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
@@ -222,15 +223,13 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
                                        // enqueue the inputGate at the end to 
avoid starvation
                                        inputGatesWithData.add(inputGate);
-                               } else {
-                                       
enqueuedInputGatesWithData.remove(inputGate);
                                }
 
                                if (bufferOrEvent.isPresent()) {
                                        return Optional.of(new 
InputGateWithData(
                                                inputGate,
                                                bufferOrEvent.get(),
-                                               
!enqueuedInputGatesWithData.isEmpty()));
+                                               !inputGatesWithData.isEmpty()));
                                }
                        }
                }
@@ -290,14 +289,13 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                int availableInputGates;
 
                synchronized (inputGatesWithData) {
-                       if (enqueuedInputGatesWithData.contains(inputGate)) {
+                       if (inputGatesWithData.contains(inputGate)) {
                                return;
                        }
 
                        availableInputGates = inputGatesWithData.size();
 
                        inputGatesWithData.add(inputGate);
-                       enqueuedInputGatesWithData.add(inputGate);
 
                        if (availableInputGates == 0) {
                                inputGatesWithData.notifyAll();

Reply via email to