This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cd13fdfa0c8b0db7c0087d9fe40f1522ccb5d122
Author: sunhaibotb <[email protected]>
AuthorDate: Tue May 7 13:52:50 2019 +0200

    [hotfix][network] Implement UnionInputGate#pollNextBufferOrEvent method
---
 .../network/partition/consumer/UnionInputGate.java | 31 +++++++++++++++-------
 .../partition/consumer/SingleInputGateTest.java    | 12 +--------
 2 files changed, 23 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
index ea83004..196743e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGate.java
@@ -159,6 +159,15 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
 
        @Override
        public Optional<BufferOrEvent> getNextBufferOrEvent() throws 
IOException, InterruptedException {
+               return getNextBufferOrEvent(true);
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
IOException, InterruptedException {
+               return getNextBufferOrEvent(false);
+       }
+
+       private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) 
throws IOException, InterruptedException {
                if (inputGatesWithRemainingData.isEmpty()) {
                        return Optional.empty();
                }
@@ -166,7 +175,12 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                // Make sure to request the partitions, if they have not been 
requested before.
                requestPartitions();
 
-               InputGateWithData inputGateWithData = waitAndGetNextInputGate();
+               Optional<InputGateWithData> next = 
waitAndGetNextInputGate(blocking);
+               if (!next.isPresent()) {
+                       return Optional.empty();
+               }
+
+               InputGateWithData inputGateWithData = next.get();
                InputGate inputGate = inputGateWithData.inputGate;
                BufferOrEvent bufferOrEvent = inputGateWithData.bufferOrEvent;
 
@@ -197,18 +211,17 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                return Optional.of(bufferOrEvent);
        }
 
-       @Override
-       public Optional<BufferOrEvent> pollNextBufferOrEvent() throws 
UnsupportedOperationException {
-               throw new UnsupportedOperationException();
-       }
-
-       private InputGateWithData waitAndGetNextInputGate() throws IOException, 
InterruptedException {
+       private Optional<InputGateWithData> waitAndGetNextInputGate(boolean 
blocking) throws IOException, InterruptedException {
                while (true) {
                        InputGate inputGate;
                        boolean moreInputGatesAvailable;
                        synchronized (inputGatesWithData) {
                                while (inputGatesWithData.size() == 0) {
-                                       inputGatesWithData.wait();
+                                       if (blocking) {
+                                               inputGatesWithData.wait();
+                                       } else {
+                                               return Optional.empty();
+                                       }
                                }
                                inputGate = inputGatesWithData.remove();
                                enqueuedInputGatesWithData.remove(inputGate);
@@ -218,7 +231,7 @@ public class UnionInputGate implements InputGate, 
InputGateListener {
                        // In case of inputGatesWithData being inaccurate do 
not block on an empty inputGate, but just poll the data.
                        Optional<BufferOrEvent> bufferOrEvent = 
inputGate.pollNextBufferOrEvent();
                        if (bufferOrEvent.isPresent()) {
-                               return new InputGateWithData(inputGate, 
bufferOrEvent.get(), moreInputGatesAvailable);
+                               return Optional.of(new 
InputGateWithData(inputGate, bufferOrEvent.get(), moreInputGatesAvailable));
                        }
                }
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 71e4f5a..d82d571 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -619,17 +619,7 @@ public class SingleInputGateTest {
                assertEquals(expectedChannelIndex, 
bufferOrEvent.get().getChannelIndex());
                assertEquals(expectedMoreAvailable, 
bufferOrEvent.get().moreAvailable());
                if (!expectedMoreAvailable) {
-                       try {
-                               
assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
-                       }
-                       catch (UnsupportedOperationException ex) {
-                               /**
-                                * {@link 
UnionInputGate#pollNextBufferOrEvent()} is unsupported at the moment.
-                                */
-                               if (!(inputGate instanceof UnionInputGate)) {
-                                       throw ex;
-                               }
-                       }
+                       
assertFalse(inputGate.pollNextBufferOrEvent().isPresent());
                }
        }
 }

Reply via email to