This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 261efa8f6a321a9303705c35681c0d67880213bb
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu May 9 10:42:33 2019 +0200

    [hotfix][network] Refactor InputGates code
---
 .../io/network/partition/consumer/InputGate.java   | 17 ++++++
 .../partition/consumer/SingleInputGate.java        | 51 +++++++++++------
 .../network/partition/consumer/UnionInputGate.java | 64 +++++++++++-----------
 3 files changed, 83 insertions(+), 49 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
index 83e18e9..03ac822 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGate.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * An input gate consumes one or more partitions of a single produced 
intermediate result.
  *
@@ -112,4 +114,19 @@ public abstract class InputGate implements AutoCloseable {
                        isAvailable = new CompletableFuture<>();
                }
        }
+
+       /**
+        * Simple pojo for INPUT, DATA and moreAvailable.
+        */
+       protected static class InputWithData<INPUT, DATA> {
+               protected final INPUT input;
+               protected final DATA data;
+               protected final boolean moreAvailable;
+
+               InputWithData(INPUT input, DATA data, boolean moreAvailable) {
+                       this.input = checkNotNull(input);
+                       this.data = checkNotNull(data);
+                       this.moreAvailable = moreAvailable;
+               }
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index c0830fa..19912b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -529,12 +529,21 @@ public class SingleInputGate extends InputGate {
                }
 
                requestPartitions();
+               Optional<InputWithData<InputChannel, BufferAndAvailability>> 
next = waitAndGetNextData(blocking);
+               if (!next.isPresent()) {
+                       return Optional.empty();
+               }
 
-               InputChannel currentChannel;
-               boolean moreAvailable;
-               Optional<BufferAndAvailability> result = Optional.empty();
+               InputWithData<InputChannel, BufferAndAvailability> 
inputWithData = next.get();
+               return Optional.of(transformToBufferOrEvent(
+                       inputWithData.data.buffer(),
+                       inputWithData.moreAvailable,
+                       inputWithData.input));
+       }
 
-               do {
+       private Optional<InputWithData<InputChannel, BufferAndAvailability>> 
waitAndGetNextData(boolean blocking)
+                       throws IOException, InterruptedException {
+               while (true) {
                        synchronized (inputChannelsWithData) {
                                while (inputChannelsWithData.size() == 0) {
                                        if (isReleased) {
@@ -550,29 +559,38 @@ public class SingleInputGate extends InputGate {
                                        }
                                }
 
-                               currentChannel = inputChannelsWithData.remove();
+                               InputChannel inputChannel = 
inputChannelsWithData.remove();
 
-                               result = currentChannel.getNextBuffer();
+                               Optional<BufferAndAvailability> result = 
inputChannel.getNextBuffer();
 
                                if (result.isPresent() && 
result.get().moreAvailable()) {
-                                       // enqueue the currentChannel at the 
end to avoid starvation
-                                       
inputChannelsWithData.add(currentChannel);
+                                       // enqueue the inputChannel at the end 
to avoid starvation
+                                       inputChannelsWithData.add(inputChannel);
                                } else {
-                                       
enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
+                                       
enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
                                }
 
-                               moreAvailable = 
!inputChannelsWithData.isEmpty();
-
-                               if (!moreAvailable) {
+                               if (inputChannelsWithData.isEmpty()) {
                                        resetIsAvailable();
                                }
+
+                               if (result.isPresent()) {
+                                       return Optional.of(new InputWithData<>(
+                                               inputChannel,
+                                               result.get(),
+                                               
!inputChannelsWithData.isEmpty()));
+                               }
                        }
-               } while (!result.isPresent());
+               }
+       }
 
-               final Buffer buffer = result.get().buffer();
+       private BufferOrEvent transformToBufferOrEvent(
+                       Buffer buffer,
+                       boolean moreAvailable,
+                       InputChannel currentChannel) throws IOException, 
InterruptedException {
                numBytesIn.inc(buffer.getSizeUnsafe());
                if (buffer.isBuffer()) {
-                       return Optional.of(new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable));
+                       return new BufferOrEvent(buffer, 
currentChannel.getChannelIndex(), moreAvailable);
                }
                else {
                        final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
@@ -591,11 +609,10 @@ public class SingleInputGate extends InputGate {
                                }
 
                                currentChannel.notifySubpartitionConsumed();
-
                                currentChannel.releaseAllResources();
                        }
 
-                       return Optional.of(new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable));
+                       return new BufferOrEvent(event, 
currentChannel.getChannelIndex(), moreAvailable);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index 986d1bb..5019cfc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -182,36 +182,22 @@ public class UnionInputGate extends InputGate {
                // Make sure to request the partitions, if they have not been 
requested before.
                requestPartitions();
 
-               Optional<InputGateWithData> next = 
waitAndGetNextInputGate(blocking);
+               Optional<InputWithData<InputGate, BufferOrEvent>> next = 
waitAndGetNextData(blocking);
                if (!next.isPresent()) {
                        return Optional.empty();
                }
 
-               InputGateWithData inputGateWithData = next.get();
-               InputGate inputGate = inputGateWithData.inputGate;
-               BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
+               InputWithData<InputGate, BufferOrEvent> inputWithData = 
next.get();
 
-               if (bufferOrEvent.isEvent()
-                       && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
-                       && inputGate.isFinished()) {
-
-                       checkState(!bufferOrEvent.moreAvailable());
-                       if (!inputGatesWithRemainingData.remove(inputGate)) {
-                               throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
-                                       "input gates.");
-                       }
-               }
-
-               // Set the channel index to identify the input channel (across 
all unioned input gates)
-               final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
-
-               bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
-               bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
inputGateWithData.moreInputGatesAvailable);
-
-               return Optional.of(bufferOrEvent);
+               handleEndOfPartitionEvent(inputWithData.data, 
inputWithData.input);
+               return Optional.of(adjustForUnionInputGate(
+                       inputWithData.data,
+                       inputWithData.input,
+                       inputWithData.moreAvailable));
        }
 
-       private Optional<InputGateWithData> waitAndGetNextInputGate(boolean 
blocking) throws IOException, InterruptedException {
+       private Optional<InputWithData<InputGate, BufferOrEvent>> 
waitAndGetNextData(boolean blocking)
+                       throws IOException, InterruptedException {
                while (true) {
                        synchronized (inputGatesWithData) {
                                while (inputGatesWithData.size() == 0) {
@@ -242,7 +228,7 @@ public class UnionInputGate extends InputGate {
                                }
 
                                if (bufferOrEvent.isPresent()) {
-                                       return Optional.of(new 
InputGateWithData(
+                                       return Optional.of(new InputWithData<>(
                                                inputGate,
                                                bufferOrEvent.get(),
                                                !inputGatesWithData.isEmpty()));
@@ -251,15 +237,29 @@ public class UnionInputGate extends InputGate {
                }
        }
 
-       private static class InputGateWithData {
-               private final InputGate inputGate;
-               private final BufferOrEvent bufferOrEvent;
-               private final boolean moreInputGatesAvailable;
+       private BufferOrEvent adjustForUnionInputGate(
+               BufferOrEvent bufferOrEvent,
+               InputGate inputGate,
+               boolean moreInputGatesAvailable) {
+               // Set the channel index to identify the input channel (across 
all unioned input gates)
+               final int channelIndexOffset = 
inputGateToIndexOffsetMap.get(inputGate);
+
+               bufferOrEvent.setChannelIndex(channelIndexOffset + 
bufferOrEvent.getChannelIndex());
+               bufferOrEvent.setMoreAvailable(bufferOrEvent.moreAvailable() || 
moreInputGatesAvailable);
 
-               InputGateWithData(InputGate inputGate, BufferOrEvent 
bufferOrEvent, boolean moreInputGatesAvailable) {
-                       this.inputGate = checkNotNull(inputGate);
-                       this.bufferOrEvent = checkNotNull(bufferOrEvent);
-                       this.moreInputGatesAvailable = moreInputGatesAvailable;
+               return bufferOrEvent;
+       }
+
+       private void handleEndOfPartitionEvent(BufferOrEvent bufferOrEvent, 
InputGate inputGate) {
+               if (bufferOrEvent.isEvent()
+                       && bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class
+                       && inputGate.isFinished()) {
+
+                       checkState(!bufferOrEvent.moreAvailable());
+                       if (!inputGatesWithRemainingData.remove(inputGate)) {
+                               throw new IllegalStateException("Couldn't find 
input gate in set of remaining " +
+                                       "input gates.");
+                       }
                }
        }
 

Reply via email to