This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b84dbf423a380e8e6d44694cdeb06eb968353aa
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue May 7 12:26:23 2019 +0200

    [hotfix][network] Refactor and simplify InputGate#getNextBufferOrEvent
    
    Previously in case of more data available, re-enquing a channel or an 
inputGate
    was done in a separate critical section, resulting with more complicated 
concurrency
    contract (critical section split into two). Side effect of this change
    is that now recursive getNextBuffer/pollNextBufferOrEvent are happening also
    under the lock, however they are non-blocking, so this shouldn't cause any 
issues.
---
 .../partition/consumer/SingleInputGate.java        | 21 +++++++------
 .../network/partition/consumer/UnionInputGate.java | 34 +++++++++++-----------
 2 files changed, 27 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index d40af83..750e678 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -552,21 +552,20 @@ public class SingleInputGate implements InputGate {
                                }
 
                                currentChannel = inputChannelsWithData.remove();
-                               
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+
+                               result = currentChannel.getNextBuffer();
+
+                               if (result.isPresent() && 
result.get().moreAvailable()) {
+                                       // enqueue the currentChannel at the 
end to avoid starvation
+                                       
inputChannelsWithData.add(currentChannel);
+                               } else {
+                                       
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+                               }
+
                                moreAvailable = 
!inputChannelsWithData.isEmpty();
                        }
-
-                       result = currentChannel.getNextBuffer();
                } while (!result.isPresent());
 
-               // this channel was now removed from the non-empty channels 
queue
-               // we re-add it in case it has more data, because in that case 
no "non-empty" notification
-               // will come for that channel
-               if (result.get().moreAvailable()) {
-                       queueChannel(currentChannel);
-                       moreAvailable = true;
-               }
-
                final Buffer buffer = result.get().buffer();
                numBytesIn.inc(buffer.getSizeUnsafe());
                if (buffer.isBuffer()) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 196743e..fcae79f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -184,13 +184,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                InputGate inputGate = inputGateWithData.inputGate;
                BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
-               if (bufferOrEvent.moreAvailable()) {
-                       // this buffer or event was now removed from the 
non-empty gates queue
-                       // we re-add it in case it has more data, because in 
that case no "non-empty" notification
-                       // will come for that gate
-                       queueInputGate(inputGate);
-               }
-
                if (bufferOrEvent.isEvent()
                        && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
                        && inputGate.isFinished()) {
@@ -213,8 +206,6 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
 
        private Optional<InputGateWithData> waitAndGetNextInputGate(boolean 
blocking) throws IOException, InterruptedException {
                while (true) {
-                       InputGate inputGate;
-                       boolean moreInputGatesAvailable;
                        synchronized (inputGatesWithData) {
                                while (inputGatesWithData.size() == 0) {
                                        if (blocking) {
@@ -223,15 +214,24 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                                                return Optional.empty();
                                        }
                                }
-                               inputGate = inputGatesWithData.remove();
-                               enqueuedInputGatesWithData.remove(inputGate);
-                               moreInputGatesAvailable = 
enqueuedInputGatesWithData.size() > 0;
-                       }
+                               final InputGate inputGate = 
inputGatesWithData.remove();
+
+                               // In case of inputGatesWithData being 
inaccurate do not block on an empty inputGate, but just poll the data.
+                               Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
 
-                       // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
-                       Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
-                       if (bufferOrEvent.isPresent()) {
-                               return Optional.of(new 
InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
+                               if (bufferOrEvent.isPresent() && 
bufferOrEvent.get().moreAvailable()) {
+                                       // enqueue the inputGate at the end to 
avoid starvation
+                                       inputGatesWithData.add(inputGate);
+                               } else {
+                                       
enqueuedInputGatesWithData.remove(inputGate);
+                               }
+
+                               if (bufferOrEvent.isPresent()) {
+                                       return Optional.of(new 
InputGateWithData(
+                                               inputGate,
+                                               bufferOrEvent.get(),
+                                               
!enqueuedInputGatesWithData.isEmpty()));
+                               }
                        }
                }
        }

Reply via email to