This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 04b5cbf07775c086b1df33f94b77a99acc3f4615
Author: Piotr Nowojski <[email protected]>
AuthorDate: Fri Jun 21 11:57:24 2019 +0200

    [FLINK-12479][operators] Integrate StreamInputProcessor with mailbox
---
 .../streaming/runtime/io/StreamInputProcessor.java |   7 +-
 .../runtime/io/StreamOneInputProcessor.java        |  43 ++++---
 .../runtime/io/StreamTwoInputProcessor.java        |  21 +++-
 .../io/StreamTwoInputSelectableProcessor.java      | 125 ++++++++++-----------
 .../flink/streaming/runtime/tasks/StreamTask.java  |   9 +-
 .../tasks/StreamTaskSelectiveReadingTest.java      |  10 +-
 6 files changed, 122 insertions(+), 93 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 0b263d0..1de31bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.AvailabilityListener;
 
 import java.io.Closeable;
 
@@ -26,9 +27,11 @@ import java.io.Closeable;
  * Interface for processing records by {@link 
org.apache.flink.streaming.runtime.tasks.StreamTask}.
  */
 @Internal
-public interface StreamInputProcessor extends Closeable {
+public interface StreamInputProcessor extends AvailabilityListener, Closeable {
        /**
-        * @return true if {@link StreamTaskInput} is finished.
+        * @return true if {@link StreamInputProcessor} estimates that more 
records can be processed
+        * immediately. Otherwise false, which means that there are no more 
records available at the
+        * moment and the caller should check {@link #isFinished()} and/or 
{@link #isAvailable()}.
         */
        boolean processInput() throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
index 4be6d54..3be72ee 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.java
@@ -43,6 +43,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -129,29 +130,29 @@ public final class StreamOneInputProcessor<IN> implements 
StreamInputProcessor {
        }
 
        @Override
+       public boolean isFinished() {
+               return input.isFinished();
+       }
+
+       @Override
+       public CompletableFuture<?> isAvailable() {
+               return input.isAvailable();
+       }
+
+       @Override
        public boolean processInput() throws Exception {
                initializeNumRecordsIn();
 
                StreamElement recordOrMark = input.pollNextNullable();
-               if (recordOrMark == null) {
-                       input.isAvailable().get();
-                       return !checkFinished();
-               }
-               int channel = input.getLastChannel();
-               checkState(channel != StreamTaskInput.UNSPECIFIED);
+               if (recordOrMark != null) {
+                       int channel = input.getLastChannel();
+                       checkState(channel != StreamTaskInput.UNSPECIFIED);
 
-               processElement(recordOrMark, channel);
-               return true;
-       }
-
-       private boolean checkFinished() throws Exception {
-               boolean isFinished = input.isFinished();
-               if (isFinished) {
-                       synchronized (lock) {
-                               operatorChain.endInput(1);
-                       }
+                       processElement(recordOrMark, channel);
                }
-               return isFinished;
+               checkFinished();
+
+               return recordOrMark != null;
        }
 
        private void processElement(StreamElement recordOrMark, int channel) 
throws Exception {
@@ -180,6 +181,14 @@ public final class StreamOneInputProcessor<IN> implements 
StreamInputProcessor {
                }
        }
 
+       private void checkFinished() throws Exception {
+               if (input.isFinished()) {
+                       synchronized (lock) {
+                               operatorChain.endInput(1);
+                       }
+               }
+       }
+
        private void initializeNumRecordsIn() {
                if (numRecordsIn == null) {
                        try {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 00bdab9..c888477 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -56,6 +56,7 @@ import java.io.IOException;
 import java.util.BitSet;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -207,6 +208,19 @@ public final class StreamTwoInputProcessor<IN1, IN2> 
implements StreamInputProce
        }
 
        @Override
+       public boolean isFinished() {
+               return isFinished;
+       }
+
+       @Override
+       public CompletableFuture<?> isAvailable() {
+               if (currentRecordDeserializer != null) {
+                       return AVAILABLE;
+               }
+               return barrierHandler.isAvailable();
+       }
+
+       @Override
        public boolean processInput() throws Exception {
                if (isFinished) {
                        return false;
@@ -259,7 +273,6 @@ public final class StreamTwoInputProcessor<IN1, IN2> 
implements StreamInputProce
                                                                
streamOperator.processElement1(record);
                                                        }
                                                        return true;
-
                                                }
                                        }
                                        else {
@@ -295,15 +308,13 @@ public final class StreamTwoInputProcessor<IN1, IN2> 
implements StreamInputProce
                        if (bufferOrEvent.isPresent()) {
                                processBufferOrEvent(bufferOrEvent.get());
                        } else {
-                               if (!barrierHandler.isFinished()) {
-                                       barrierHandler.isAvailable().get();
-                               } else {
+                               if (barrierHandler.isFinished()) {
                                        isFinished = true;
                                        if (!barrierHandler.isEmpty()) {
                                                throw new 
IllegalStateException("Trailing data in checkpoint barrier handler.");
                                        }
-                                       return false;
                                }
+                               return false;
                        }
                }
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 65464db..4adc2dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -47,7 +47,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -156,7 +155,21 @@ public final class StreamTwoInputSelectableProcessor<IN1, 
IN2> implements Stream
                this.lastReadInputIndex = 1; // always try to read from the 
first input
 
                this.isPrepared = false;
+       }
 
+       @Override
+       public boolean isFinished() {
+               return input1.isFinished() && input2.isFinished();
+       }
+
+       @Override
+       public CompletableFuture<?> isAvailable() {
+               if (inputSelection.isALLMaskOf2()) {
+                       return isAnyInputAvailable();
+               } else {
+                       StreamTaskInput input = (inputSelection.getInputMask() 
== InputSelection.FIRST.getInputMask()) ? input1 : input2;
+                       return input.isAvailable();
+               }
        }
 
        @Override
@@ -179,18 +192,29 @@ public final class StreamTwoInputSelectableProcessor<IN1, 
IN2> implements Stream
                        if (recordOrMark != null) {
                                processElement1(recordOrMark, 
input1.getLastChannel());
                        }
+                       checkFinished(input1, lastReadInputIndex);
                } else {
                        recordOrMark = input2.pollNextNullable();
                        if (recordOrMark != null) {
                                processElement2(recordOrMark, 
input2.getLastChannel());
                        }
+                       checkFinished(input2, lastReadInputIndex);
                }
 
                if (recordOrMark == null) {
                        setUnavailableInput(readingInputIndex);
                }
 
-               return !checkFinished();
+               return recordOrMark != null;
+       }
+
+       private void checkFinished(StreamTaskInput input, int inputIndex) 
throws Exception {
+               if (input.isFinished()) {
+                       synchronized (lock) {
+                               operatorChain.endInput(getInputId(inputIndex));
+                               inputSelection = inputSelector.nextSelection();
+                       }
+               }
        }
 
        @Override
@@ -213,14 +237,12 @@ public final class StreamTwoInputSelectableProcessor<IN1, 
IN2> implements Stream
                }
        }
 
-       private int selectNextReadingInputIndex()
-               throws InterruptedException, ExecutionException, IOException {
-
-               int readingInputIndex;
-               while ((readingInputIndex = 
inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, 
lastReadInputIndex)) == -1) {
-                       if (!waitForAvailableInput(inputSelection)) {
-                               return -1;
-                       }
+       private int selectNextReadingInputIndex() throws IOException {
+               updateAvailability();
+               checkInputSelectionAgainstIsFinished();
+               int readingInputIndex = 
inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, 
lastReadInputIndex);
+               if (readingInputIndex == -1) {
+                       return -1;
                }
 
                // to avoid starvation, if the input selection is ALL and 
availableInputsMask is not ALL,
@@ -234,6 +256,27 @@ public final class StreamTwoInputSelectableProcessor<IN1, 
IN2> implements Stream
                return readingInputIndex;
        }
 
+       private void checkInputSelectionAgainstIsFinished() throws IOException {
+               if (inputSelection.isALLMaskOf2()) {
+                       return;
+               }
+               if (inputSelection.isInputSelected(1) && input1.isFinished()) {
+                       throw new IOException("Can not make a progress: only 
first input is selected but it is already finished");
+               }
+               if (inputSelection.isInputSelected(2) && input2.isFinished()) {
+                       throw new IOException("Can not make a progress: only 
second input is selected but it is already finished");
+               }
+       }
+
+       private void updateAvailability() {
+               if (!input1.isFinished() && input1.isAvailable() == AVAILABLE) {
+                       setAvailableInput(input1.getInputIndex());
+               }
+               if (!input2.isFinished() && input2.isAvailable() == AVAILABLE) {
+                       setAvailableInput(input2.getInputIndex());
+               }
+       }
+
        private void processElement1(StreamElement recordOrMark, int channel) 
throws Exception {
                if (recordOrMark.isRecord()) {
                        StreamRecord<IN1> record = recordOrMark.asRecord();
@@ -304,64 +347,20 @@ public final class StreamTwoInputSelectableProcessor<IN1, 
IN2> implements Stream
                }
        }
 
-       /**
-        * @return false if both of the inputs are finished, true otherwise.
-        */
-       private boolean waitForAvailableInput(InputSelection inputSelection)
-               throws ExecutionException, InterruptedException, IOException {
-
-               if (inputSelection.isALLMaskOf2()) {
-                       return waitForAvailableEitherInput();
-               } else {
-                       waitForOneInput(
-                               (inputSelection.getInputMask() == 
InputSelection.FIRST.getInputMask()) ? input1 : input2);
-                       return true;
+       private CompletableFuture<?> isAnyInputAvailable() {
+               if (input1.isFinished()) {
+                       return input2.isFinished() ? AVAILABLE : 
input2.isAvailable();
                }
-       }
-
-       private boolean waitForAvailableEitherInput()
-               throws ExecutionException, InterruptedException {
 
-               CompletableFuture<?> future1 = input1.isFinished() ? 
UNAVAILABLE : input1.isAvailable();
-               CompletableFuture<?> future2 = input2.isFinished() ? 
UNAVAILABLE : input2.isAvailable();
-
-               if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
-                       return false;
+               if (input2.isFinished()) {
+                       return input1.isAvailable();
                }
 
-               // block to wait for a available input
-               CompletableFuture.anyOf(future1, future2).get();
-
-               if (future1.isDone()) {
-                       setAvailableInput(input1.getInputIndex());
-               }
-               if (future2.isDone()) {
-                       setAvailableInput(input2.getInputIndex());
-               }
+               CompletableFuture<?> input1Available = input1.isAvailable();
+               CompletableFuture<?> input2Available = input2.isAvailable();
 
-               return true;
-       }
-
-       private void waitForOneInput(StreamTaskInput input)
-               throws IOException, ExecutionException, InterruptedException {
-
-               if (input.isFinished()) {
-                       throw new IOException("Could not read the finished 
input: input" + (input.getInputIndex() + 1) +  ".");
-               }
-
-               input.isAvailable().get();
-               setAvailableInput(input.getInputIndex());
-       }
-
-       private boolean checkFinished() throws Exception {
-               if (getInput(lastReadInputIndex).isFinished()) {
-                       synchronized (lock) {
-                               
operatorChain.endInput(getInputId(lastReadInputIndex));
-                               inputSelection = inputSelector.nextSelection();
-                       }
-               }
-
-               return input1.isFinished() && input2.isFinished();
+               return (input1Available == AVAILABLE || input2Available == 
AVAILABLE) ?
+                       AVAILABLE : CompletableFuture.anyOf(input1Available, 
input2Available);
        }
 
        private void setAvailableInput(int inputIndex) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7960a2f..53142a6 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.DefaultActionContext;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxExecutor;
 import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.MailboxProcessor;
+import 
org.apache.flink.streaming.runtime.tasks.mailbox.execution.SuspendedMailboxDefaultAction;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -267,7 +268,13 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
         */
        protected void performDefaultAction(DefaultActionContext context) 
throws Exception {
                if (!inputProcessor.processInput()) {
-                       context.allActionsCompleted();
+                       if (inputProcessor.isFinished()) {
+                               context.allActionsCompleted();
+                       }
+                       else {
+                               SuspendedMailboxDefaultAction 
suspendedDefaultAction = context.suspendDefaultAction();
+                               
inputProcessor.isAvailable().thenRun(suspendedDefaultAction::resume);
+                       }
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
index c7c1440..16e5bd1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -104,13 +103,14 @@ public class StreamTaskSelectiveReadingTest {
        }
 
        @Test
-       public void testReadFinishedInput() {
+       public void testReadFinishedInput() throws Exception {
                try {
                        testBase(new TestReadFinishedInputStreamOperator(), 
false, new ConcurrentLinkedQueue<>(), true);
                        fail("should throw an IOException");
-               } catch (Throwable t) {
-                       assertTrue("wrong exception, should be IOException",
-                               ExceptionUtils.findThrowableWithMessage(t, 
"Could not read the finished input: input1").isPresent());
+               } catch (Exception t) {
+                       if (!ExceptionUtils.findThrowableWithMessage(t, "only 
first input is selected but it is already finished").isPresent()) {
+                               throw t;
+                       }
                }
        }
 

Reply via email to