This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 48b54e3e40d91b4a79a56f327210e3e8e36557e9
Author: sunhaibotb <[email protected]>
AuthorDate: Wed Jun 12 18:14:21 2019 +0800

    [FLINK-11877][runtime] Implement the runtime handling of the 
InputSelectable interface
---
 .../flink/streaming/api/graph/StreamGraph.java     |   6 +-
 .../api/operators/SimpleOperatorFactory.java       |   5 +
 .../api/operators/StreamOperatorFactory.java       |   5 +
 .../streaming/runtime/io/StreamInputProcessor.java |   2 +-
 .../streaming/runtime/io/StreamTaskInput.java      |   5 +
 .../runtime/io/StreamTaskNetworkInput.java         |  12 +-
 .../io/StreamTwoInputSelectableProcessor.java      | 429 +++++++++++++++++++++
 .../tasks/TwoInputSelectableStreamTask.java        |  78 ++++
 .../tasks/StreamTaskSelectiveReadingTest.java      | 369 ++++++++++++++++++
 .../runtime/tasks/TwoInputStreamTaskTest.java      |  70 +++-
 .../tasks/TwoInputStreamTaskTestHarness.java       |  10 +-
 .../flink/table/generated/GeneratedClass.java      |   4 +
 .../table/runtime/CodeGenOperatorFactory.java      |   6 +
 .../runtime/StreamTaskSelectiveReadingITCase.java  | 171 ++++++++
 14 files changed, 1154 insertions(+), 18 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 34ceaf1..19dd21f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
+import org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
 import org.apache.flink.util.OutputTag;
 
@@ -225,7 +226,10 @@ public class StreamGraph extends StreamingPlan {
                        TypeInformation<OUT> outTypeInfo,
                        String operatorName) {
 
-               addNode(vertexID, slotSharingGroup, coLocationGroup, 
TwoInputStreamTask.class, taskOperatorFactory, operatorName);
+               Class<? extends AbstractInvokable> vertexClass = 
taskOperatorFactory.isOperatorSelectiveReading() ?
+                       TwoInputSelectableStreamTask.class : 
TwoInputStreamTask.class;
+
+               addNode(vertexID, slotSharingGroup, coLocationGroup, 
vertexClass, taskOperatorFactory, operatorName);
 
                TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && 
!(outTypeInfo instanceof MissingTypeInfo) ?
                                outTypeInfo.createSerializer(executionConfig) : 
null;
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
index 4bf3124..8b2ea06 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SimpleOperatorFactory.java
@@ -88,6 +88,11 @@ public class SimpleOperatorFactory<OUT> implements 
StreamOperatorFactory<OUT> {
        }
 
        @Override
+       public boolean isOperatorSelectiveReading() {
+               return operator instanceof InputSelectable;
+       }
+
+       @Override
        public boolean isOutputTypeConfigurable() {
                return operator instanceof OutputTypeConfigurable;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
index 31d2684..9a493f9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactory.java
@@ -59,6 +59,11 @@ public interface StreamOperatorFactory<OUT> extends 
Serializable {
        }
 
        /**
+        * Test whether the operator is selective reading one.
+        */
+       boolean isOperatorSelectiveReading();
+
+       /**
         * If the stream operator need access to the output type information at 
{@link StreamGraph}
         * generation. This can be useful for cases where the output type is 
specified by the returns
         * method and, thus, after the stream operator has been created.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index faa14b5..203f633 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -107,7 +107,7 @@ public class StreamInputProcessor<IN> {
                        inputGate,
                        taskManagerConfig,
                        taskName);
-               this.input = new StreamTaskNetworkInput(barrierHandler, 
inputSerializer, ioManager);
+               this.input = new StreamTaskNetworkInput(barrierHandler, 
inputSerializer, ioManager, 0);
 
                this.lock = checkNotNull(lock);
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java
index ea1d2c3..19fd765 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskInput.java
@@ -35,4 +35,9 @@ public interface StreamTaskInput extends 
NullableAsyncDataInput<StreamElement>,
         * it is unspecified.
         */
        int getLastChannel();
+
+       /**
+        * Returns the input index of this input.
+        */
+       int getInputIndex();
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index c82d91e..85e7f46 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -53,6 +53,8 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
 
        private final 
RecordDeserializer<DeserializationDelegate<StreamElement>>[] 
recordDeserializers;
 
+       private final int inputIndex;
+
        private int lastChannel = UNSPECIFIED;
 
        private RecordDeserializer<DeserializationDelegate<StreamElement>> 
currentRecordDeserializer = null;
@@ -63,7 +65,8 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
        public StreamTaskNetworkInput(
                        CheckpointBarrierHandler barrierHandler,
                        TypeSerializer<?> inputSerializer,
-                       IOManager ioManager) {
+                       IOManager ioManager,
+                       int inputIndex) {
                this.barrierHandler = barrierHandler;
                this.deserializationDelegate = new 
NonReusingDeserializationDelegate<>(
                        new StreamElementSerializer<>(inputSerializer));
@@ -74,6 +77,8 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
                        recordDeserializers[i] = new 
SpillingAdaptiveSpanningRecordDeserializer<>(
                                ioManager.getSpillingDirectoriesPaths());
                }
+
+               this.inputIndex = inputIndex;
        }
 
        @Override
@@ -132,6 +137,11 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
        }
 
        @Override
+       public int getInputIndex() {
+               return inputIndex;
+       }
+
+       @Override
        public boolean isFinished() {
                return isFinished;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
new file mode 100644
index 0000000..7374f97
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
+import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Input reader for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputSelectableStreamTask}
+ * in the case that the operator is InputSelectable.
+ *
+ * @param <IN1> The type of the records that arrive on the first input
+ * @param <IN2> The type of the records that arrive on the second input
+ */
+@Internal
+public class StreamTwoInputSelectableProcessor<IN1, IN2> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StreamTwoInputSelectableProcessor.class);
+
+       private static final CompletableFuture<?> UNAVAILABLE = new 
CompletableFuture<>();
+
+       private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
+       private final InputSelectable inputSelector;
+
+       private final Object lock;
+
+       private final StreamTaskInput input1;
+       private final StreamTaskInput input2;
+
+       /**
+        * Valves that control how watermarks and stream statuses from the 2 
inputs are forwarded.
+        */
+       private final StatusWatermarkValve statusWatermarkValve1;
+       private final StatusWatermarkValve statusWatermarkValve2;
+
+       /**
+        * Stream status for the two inputs. We need to keep track for 
determining when
+        * to forward stream status changes downstream.
+        */
+       private StreamStatus firstStatus;
+       private StreamStatus secondStatus;
+
+       private int availableInputsMask;
+
+       private int lastReadInputIndex;
+
+       private InputSelection inputSelection;
+
+       private Counter numRecordsIn;
+
+       private boolean isPrepared;
+
+       public StreamTwoInputSelectableProcessor(
+               Collection<InputGate> inputGates1,
+               Collection<InputGate> inputGates2,
+               TypeSerializer<IN1> inputSerializer1,
+               TypeSerializer<IN2> inputSerializer2,
+               Object lock,
+               IOManager ioManager,
+               StreamStatusMaintainer streamStatusMaintainer,
+               TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
+               WatermarkGauge input1WatermarkGauge,
+               WatermarkGauge input2WatermarkGauge) {
+
+               checkState(streamOperator instanceof InputSelectable);
+
+               this.streamOperator = checkNotNull(streamOperator);
+               this.inputSelector = (InputSelectable) streamOperator;
+
+               this.lock = checkNotNull(lock);
+
+               InputGate unionedInputGate1 = 
InputGateUtil.createInputGate(inputGates1.toArray(new InputGate[0]));
+               InputGate unionedInputGate2 = 
InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
+
+               // create a Input instance for each input
+               this.input1 = new StreamTaskNetworkInput(new 
BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0);
+               this.input2 = new StreamTaskNetworkInput(new 
BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1);
+
+               this.statusWatermarkValve1 = new StatusWatermarkValve(
+                       unionedInputGate1.getNumberOfInputChannels(),
+                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input1WatermarkGauge, 0));
+               this.statusWatermarkValve2 = new StatusWatermarkValve(
+                       unionedInputGate2.getNumberOfInputChannels(),
+                       new ForwardingValveOutputHandler(streamOperator, lock, 
streamStatusMaintainer, input2WatermarkGauge, 1));
+
+               this.firstStatus = StreamStatus.ACTIVE;
+               this.secondStatus = StreamStatus.ACTIVE;
+
+               this.availableInputsMask = (int) new 
InputSelection.Builder().select(1).select(2).build().getInputMask();
+
+               this.lastReadInputIndex = 1; // always try to read from the 
first input
+
+               this.isPrepared = false;
+
+       }
+
+       public boolean processInput() throws Exception {
+               if (!isPrepared) {
+                       // the preparations here are not placed in the 
constructor because all work in it
+                       // must be executed after all operators are opened.
+                       prepareForProcessing();
+               }
+
+               int readingInputIndex = selectNextReadingInputIndex();
+               if (readingInputIndex == -1) {
+                       return false;
+               }
+               lastReadInputIndex = readingInputIndex;
+
+               StreamElement recordOrMark;
+               if (readingInputIndex == 0) {
+                       recordOrMark = input1.pollNextNullable();
+                       if (recordOrMark != null) {
+                               processElement1(recordOrMark, 
input1.getLastChannel());
+                       }
+               } else {
+                       recordOrMark = input2.pollNextNullable();
+                       if (recordOrMark != null) {
+                               processElement2(recordOrMark, 
input2.getLastChannel());
+                       }
+               }
+
+               if (recordOrMark == null) {
+                       setUnavailableInput(readingInputIndex);
+               }
+
+               return !checkFinished();
+       }
+
+       public void cleanup() throws Exception {
+               Exception ex = null;
+               try {
+                       input1.close();
+               } catch (Exception e) {
+                       ex = ExceptionUtils.firstOrSuppressed(e, ex);
+               }
+
+               try {
+                       input2.close();
+               } catch (Exception e) {
+                       ex = ExceptionUtils.firstOrSuppressed(e, ex);
+               }
+
+               if (ex != null) {
+                       throw ex;
+               }
+       }
+
+       private int selectNextReadingInputIndex()
+               throws InterruptedException, ExecutionException, IOException {
+
+               int readingInputIndex;
+               while ((readingInputIndex = 
inputSelection.fairSelectNextIndexOutOf2(availableInputsMask, 
lastReadInputIndex)) == -1) {
+                       if (!waitForAvailableInput(inputSelection)) {
+                               return -1;
+                       }
+               }
+
+               // to avoid starvation, if the input selection is ALL and 
availableInputsMask is not ALL,
+               // always try to check and set the availability of another input
+               // TODO: because this can be a costly operation (checking 
volatile inside CompletableFuture`
+               //  this might be optimized to only check once per processed 
NetworkBuffer
+               if (availableInputsMask < 3 && inputSelection.isALLMaskOf2()) {
+                       checkAndSetAvailable(1 - readingInputIndex);
+               }
+
+               return readingInputIndex;
+       }
+
+       private void processElement1(StreamElement recordOrMark, int channel) 
throws Exception {
+               if (recordOrMark.isRecord()) {
+                       StreamRecord<IN1> record = recordOrMark.asRecord();
+                       synchronized (lock) {
+                               numRecordsIn.inc();
+                               streamOperator.setKeyContextElement1(record);
+                               streamOperator.processElement1(record);
+                               inputSelection = inputSelector.nextSelection();
+                       }
+               }
+               else if (recordOrMark.isWatermark()) {
+                       
statusWatermarkValve1.inputWatermark(recordOrMark.asWatermark(), channel);
+               } else if (recordOrMark.isStreamStatus()) {
+                       
statusWatermarkValve1.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
+               } else if (recordOrMark.isLatencyMarker()) {
+                       synchronized (lock) {
+                               
streamOperator.processLatencyMarker1(recordOrMark.asLatencyMarker());
+                       }
+               } else {
+                       throw new UnsupportedOperationException("Unknown type 
of StreamElement on input1");
+               }
+       }
+
+       private void processElement2(StreamElement recordOrMark, int channel) 
throws Exception {
+               if (recordOrMark.isRecord()) {
+                       StreamRecord<IN2> record = recordOrMark.asRecord();
+                       synchronized (lock) {
+                               numRecordsIn.inc();
+                               streamOperator.setKeyContextElement2(record);
+                               streamOperator.processElement2(record);
+                               inputSelection = inputSelector.nextSelection();
+                       }
+               }
+               else if (recordOrMark.isWatermark()) {
+                       
statusWatermarkValve2.inputWatermark(recordOrMark.asWatermark(), channel);
+               } else if (recordOrMark.isStreamStatus()) {
+                       
statusWatermarkValve2.inputStreamStatus(recordOrMark.asStreamStatus(), channel);
+               } else if (recordOrMark.isLatencyMarker()) {
+                       synchronized (lock) {
+                               
streamOperator.processLatencyMarker2(recordOrMark.asLatencyMarker());
+                       }
+               } else {
+                       throw new UnsupportedOperationException("Unknown type 
of StreamElement on input2");
+               }
+       }
+
+       private void prepareForProcessing() {
+               // Note: the first call to nextSelection () on the operator 
must be made after this operator
+               // is opened to ensure that any changes about the input 
selection in its open()
+               // method take effect.
+               inputSelection = inputSelector.nextSelection();
+
+               try {
+                       numRecordsIn = ((OperatorMetricGroup) streamOperator
+                               
.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+               } catch (Exception e) {
+                       LOG.warn("An exception occurred during the metrics 
setup.", e);
+                       numRecordsIn = new SimpleCounter();
+               }
+
+               isPrepared = true;
+       }
+
+       private void checkAndSetAvailable(int inputIndex) {
+               StreamTaskInput input = getInput(inputIndex);
+               if (!input.isFinished() && input.isAvailable().isDone()) {
+                       setAvailableInput(inputIndex);
+               }
+       }
+
+       /**
+        * @return false if both of the inputs are finished, true otherwise.
+        */
+       private boolean waitForAvailableInput(InputSelection inputSelection)
+               throws ExecutionException, InterruptedException, IOException {
+
+               if (inputSelection.isALLMaskOf2()) {
+                       return waitForAvailableEitherInput();
+               } else {
+                       waitForOneInput(
+                               (inputSelection.getInputMask() == 
InputSelection.FIRST.getInputMask()) ? input1 : input2);
+                       return true;
+               }
+       }
+
+       private boolean waitForAvailableEitherInput()
+               throws ExecutionException, InterruptedException {
+
+               CompletableFuture<?> future1 = input1.isFinished() ? 
UNAVAILABLE : input1.isAvailable();
+               CompletableFuture<?> future2 = input2.isFinished() ? 
UNAVAILABLE : input2.isAvailable();
+
+               if (future1 == UNAVAILABLE && future2 == UNAVAILABLE) {
+                       return false;
+               }
+
+               // block to wait for a available input
+               CompletableFuture.anyOf(future1, future2).get();
+
+               if (future1.isDone()) {
+                       setAvailableInput(input1.getInputIndex());
+               }
+               if (future2.isDone()) {
+                       setAvailableInput(input2.getInputIndex());
+               }
+
+               return true;
+       }
+
+       private void waitForOneInput(StreamTaskInput input)
+               throws IOException, ExecutionException, InterruptedException {
+
+               if (input.isFinished()) {
+                       throw new IOException("Could not read the finished 
input: input" + (input.getInputIndex() + 1) +  ".");
+               }
+
+               input.isAvailable().get();
+               setAvailableInput(input.getInputIndex());
+       }
+
+       private boolean checkFinished() {
+               if (getInput(lastReadInputIndex).isFinished()) {
+                       inputSelection = (lastReadInputIndex == 0) ? 
InputSelection.SECOND : InputSelection.FIRST;
+                       // TODO: adds the runtime handling of the 
BoundedMultiInput interface;
+               }
+
+               return input1.isFinished() && input2.isFinished();
+       }
+
+       private void setAvailableInput(int inputIndex) {
+               availableInputsMask |= 1 << inputIndex;
+       }
+
+       private void setUnavailableInput(int inputIndex) {
+               availableInputsMask &= ~(1 << inputIndex);
+       }
+
+       private StreamTaskInput getInput(int inputIndex) {
+               return inputIndex == 0 ? input1 : input2;
+       }
+
+       private class ForwardingValveOutputHandler implements 
StatusWatermarkValve.ValveOutputHandler {
+
+               private final TwoInputStreamOperator<IN1, IN2, ?> operator;
+
+               private final Object lock;
+
+               private final StreamStatusMaintainer streamStatusMaintainer;
+
+               private final WatermarkGauge inputWatermarkGauge;
+
+               private final int inputIndex;
+
+               private ForwardingValveOutputHandler(
+                       TwoInputStreamOperator<IN1, IN2, ?> operator,
+                       Object lock,
+                       StreamStatusMaintainer streamStatusMaintainer,
+                       WatermarkGauge inputWatermarkGauge,
+                       int inputIndex) {
+
+                       this.operator = checkNotNull(operator);
+                       this.lock = checkNotNull(lock);
+
+                       this.streamStatusMaintainer = 
checkNotNull(streamStatusMaintainer);
+
+                       this.inputWatermarkGauge = inputWatermarkGauge;
+
+                       this.inputIndex = inputIndex;
+               }
+
+               @Override
+               public void handleWatermark(Watermark watermark) {
+                       try {
+                               synchronized (lock) {
+                                       
inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
+                                       if (inputIndex == 0) {
+                                               
operator.processWatermark1(watermark);
+                                       } else {
+                                               
operator.processWatermark2(watermark);
+                                       }
+                               }
+                       } catch (Exception e) {
+                               throw new RuntimeException("Exception occurred 
while processing valve output watermark of input"
+                                       + (inputIndex + 1) + ": ", e);
+                       }
+               }
+
+               @Override
+               public void handleStreamStatus(StreamStatus streamStatus) {
+                       try {
+                               synchronized (lock) {
+                                       final StreamStatus anotherStreamStatus;
+                                       if (inputIndex == 0) {
+                                               firstStatus = streamStatus;
+                                               anotherStreamStatus = 
secondStatus;
+                                       } else {
+                                               secondStatus = streamStatus;
+                                               anotherStreamStatus = 
firstStatus;
+                                       }
+
+                                       // check if we need to toggle the 
task's stream status
+                                       if 
(!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {
+                                               if (streamStatus.isActive()) {
+                                                       // we're no longer idle 
if at least one input has become active
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
+                                               } else if 
(anotherStreamStatus.isIdle()) {
+                                                       // we're idle once both 
inputs are idle
+                                                       
streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
+                                               }
+                                       }
+                               }
+                       } catch (Exception e) {
+                               throw new RuntimeException("Exception occurred 
while processing valve output stream status of input"
+                                       + (inputIndex + 1) + ": ", e);
+                       }
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
new file mode 100644
index 0000000..33e1f50
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputSelectableStreamTask.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor;
+
+import java.util.Collection;
+
+/**
+ * A {@link StreamTask} for executing a {@link TwoInputStreamOperator} and 
supporting
+ * the {@link TwoInputStreamOperator} to select input for reading.
+ */
+@Internal
+public class TwoInputSelectableStreamTask<IN1, IN2, OUT> extends 
AbstractTwoInputStreamTask<IN1, IN2, OUT> {
+
+       private StreamTwoInputSelectableProcessor<IN1, IN2> inputProcessor;
+
+       public TwoInputSelectableStreamTask(Environment env) {
+               super(env);
+       }
+
+       @Override
+       protected void createInputProcessor(
+               Collection<InputGate> inputGates1,
+               Collection<InputGate> inputGates2,
+               TypeSerializer<IN1> inputDeserializer1,
+               TypeSerializer<IN2> inputDeserializer2) {
+
+               this.inputProcessor = new StreamTwoInputSelectableProcessor<>(
+                       inputGates1, inputGates2,
+                       inputDeserializer1, inputDeserializer2,
+                       this,
+                       getEnvironment().getIOManager(),
+                       getStreamStatusMaintainer(),
+                       this.headOperator,
+                       input1WatermarkGauge,
+                       input2WatermarkGauge);
+       }
+
+       @Override
+       protected void performDefaultAction(ActionContext context) throws 
Exception {
+               if (!inputProcessor.processInput()) {
+                       context.allActionsCompleted();
+               }
+       }
+
+       @Override
+       protected void cleanup() throws Exception {
+               if (inputProcessor != null) {
+                       inputProcessor.cleanup();
+               }
+       }
+
+       @Override
+       protected void cancelTask() {
+
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
new file mode 100644
index 0000000..a56dc9b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSelectiveReadingTest.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test selective reading.
+ */
+public class StreamTaskSelectiveReadingTest {
+
+       @Test
+       public void testAnyOrderedReading() throws Exception {
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));
+
+               testBase(new AnyReadingStreamOperator("Operator0"), true, 
expectedOutput, true);
+       }
+
+       @Test
+       public void testAnyUnorderedReading() throws Exception {
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));
+
+               testBase(new AnyReadingStreamOperator("Operator0"), false, 
expectedOutput, false);
+       }
+
+       @Test
+       public void testSequentialReading() throws Exception {
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));
+
+               testBase(new SequentialReadingStreamOperator("Operator0"), 
false, expectedOutput, true);
+       }
+
+       @Test
+       public void testSpecialRuleReading() throws Exception {
+               ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 1"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 2"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-1]: 
Hello-3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 3"));
+               expectedOutput.add(new StreamRecord<>("[Operator0-2]: 4"));
+
+               testBase(new SpecialRuleReadingStreamOperator("Operator0", 3, 
4, 2), false, expectedOutput, true);
+       }
+
+       @Test
+       public void testReadFinishedInput() {
+               try {
+                       testBase(new TestReadFinishedInputStreamOperator(), 
false, new ConcurrentLinkedQueue<>(), true);
+                       fail("should throw an IOException");
+               } catch (Throwable t) {
+                       assertTrue("wrong exception, should be IOException",
+                               ExceptionUtils.findThrowableWithMessage(t, 
"Could not read the finished input: input1").isPresent());
+               }
+       }
+
+       private void testBase(
+               TwoInputStreamOperator<String, Integer, String> streamOperator,
+               boolean prepareDataBeforeProcessing,
+               ConcurrentLinkedQueue<Object> expectedOutput,
+               boolean orderedCheck) throws Exception {
+
+               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                       TestSelectiveReadingTask::new,
+                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setupOutputForSingletonOperatorChain();
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               streamConfig.setStreamOperator(streamOperator);
+               streamConfig.setOperatorID(new OperatorID());
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+
+               boolean isProcessing = false;
+               if (!prepareDataBeforeProcessing) {
+                       ((TestSelectiveReadingTask) 
testHarness.getTask()).startProcessing();
+                       isProcessing = true;
+               }
+
+               testHarness.processElement(new StreamRecord<>("Hello-1"), 0, 0);
+
+               // wait until the input is processed to test the listening and 
blocking logic
+               if (!prepareDataBeforeProcessing) {
+                       testHarness.waitForInputProcessing();
+               }
+
+               testHarness.processElement(new StreamRecord<>("Hello-2"), 0, 0);
+               testHarness.processElement(new StreamRecord<>("Hello-3"), 0, 0);
+
+               testHarness.processElement(new StreamRecord<>(1), 1, 0);
+               testHarness.processElement(new StreamRecord<>(2), 1, 0);
+               testHarness.processElement(new StreamRecord<>(3), 1, 0);
+               testHarness.processElement(new StreamRecord<>(4), 1, 0);
+
+               testHarness.endInput();
+
+               if (!isProcessing) {
+                       ((TestSelectiveReadingTask) 
testHarness.getTask()).startProcessing();
+               }
+               testHarness.waitForTaskCompletion(10_000L);
+
+               LinkedBlockingQueue<Object> output = testHarness.getOutput();
+               if (orderedCheck) {
+                       TestHarnessUtil.assertOutputEquals("Output was not 
correct.", expectedOutput, output);
+               } else {
+                       String[] expectedResult = expectedOutput.stream()
+                               .map(record -> ((StreamRecord) 
record).getValue().toString())
+                               .toArray(String[]::new);
+                       Arrays.sort(expectedResult);
+
+                       String[] result = output.stream()
+                               .map(record -> ((StreamRecord) 
record).getValue().toString())
+                               .toArray(String[]::new);
+                       Arrays.sort(result);
+
+                       assertArrayEquals("Output was not correct.", 
expectedResult, result);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Utilities
+       // 
------------------------------------------------------------------------
+
+       private static class TestSelectiveReadingTask<IN1, IN2, OUT> extends 
TwoInputSelectableStreamTask<IN1, IN2, OUT> {
+
+               private volatile boolean started;
+
+               TestSelectiveReadingTask(Environment env) {
+                       super(env);
+                       started = false;
+               }
+
+               @Override
+               protected void performDefaultAction(ActionContext context) 
throws Exception {
+                       if (!started) {
+                               synchronized (this) {
+                                       this.wait();
+                               }
+                       }
+
+                       super.performDefaultAction(context);
+               }
+
+               public void startProcessing() {
+                       started = true;
+                       synchronized (this) {
+                               this.notifyAll();
+                       }
+               }
+       }
+
+       private static class AnyReadingStreamOperator extends 
AbstractStreamOperator<String>
+               implements TwoInputStreamOperator<String, Integer, String>, 
InputSelectable {
+
+               private final String name;
+
+               AnyReadingStreamOperator(String name) {
+                       super();
+
+                       this.name = name;
+               }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return InputSelection.ALL;
+               }
+
+               @Override
+               public void processElement1(StreamRecord<String> element) {
+                       output.collect(element.replace("[" + name + "-1]: " + 
element.getValue()));
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Integer> element) {
+                       output.collect(element.replace("[" + name + "-2]: " + 
element.getValue()));
+               }
+       }
+
+       /**
+        * Test operator for sequential reading.
+        */
+       public static class SequentialReadingStreamOperator extends 
AbstractStreamOperator<String>
+               implements TwoInputStreamOperator<String, Integer, String>, 
InputSelectable {
+
+               private final String name;
+
+               private InputSelection inputSelection;
+
+               public SequentialReadingStreamOperator(String name) {
+                       super();
+
+                       this.name = name;
+                       this.inputSelection = InputSelection.FIRST;
+               }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return inputSelection;
+               }
+
+               @Override
+               public void processElement1(StreamRecord<String> element) {
+                       output.collect(element.replace("[" + name + "-1]: " + 
element.getValue()));
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Integer> element) {
+                       output.collect(element.replace("[" + name + "-2]: " + 
element.getValue()));
+
+                       this.inputSelection = InputSelection.SECOND;
+               }
+       }
+
+       private static class SpecialRuleReadingStreamOperator extends 
AbstractStreamOperator<String>
+               implements TwoInputStreamOperator<String, Integer, String>, 
InputSelectable {
+
+               private final String name;
+
+               private final int input1Records;
+               private final int input2Records;
+
+               private final int maxContinuousReadingRecords;
+
+               private int input1ReadingRecords;
+               private int input2ReadingRecords;
+
+               private int continuousReadingRecords;
+               private InputSelection inputSelection;
+
+               SpecialRuleReadingStreamOperator(String name, int 
input1Records, int input2Records, int maxContinuousReadingRecords) {
+                       super();
+
+                       this.name = name;
+                       this.input1Records = input1Records;
+                       this.input2Records = input2Records;
+                       this.maxContinuousReadingRecords = 
maxContinuousReadingRecords;
+
+                       this.input1ReadingRecords = 0;
+                       this.input2ReadingRecords = 0;
+                       this.continuousReadingRecords = 0;
+                       this.inputSelection = InputSelection.FIRST;
+               }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return inputSelection;
+               }
+
+               @Override
+               public void processElement1(StreamRecord<String> element) {
+                       output.collect(element.replace("[" + name + "-1]: " + 
element.getValue()));
+
+                       input1ReadingRecords++;
+                       continuousReadingRecords++;
+                       if (continuousReadingRecords == 
maxContinuousReadingRecords) {
+                               continuousReadingRecords = 0;
+                               if (input2ReadingRecords < input2Records) {
+                                       inputSelection = InputSelection.SECOND;
+                                       return;
+                               }
+                       }
+
+                       inputSelection = InputSelection.FIRST;
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Integer> element) {
+                       output.collect(element.replace("[" + name + "-2]: " + 
element.getValue()));
+
+                       input2ReadingRecords++;
+                       continuousReadingRecords++;
+                       if (continuousReadingRecords == 
maxContinuousReadingRecords) {
+                               continuousReadingRecords = 0;
+                               if (input1ReadingRecords < input1Records) {
+                                       inputSelection = InputSelection.FIRST;
+                                       return;
+                               }
+                       }
+
+                       inputSelection = InputSelection.SECOND;
+               }
+       }
+
+       private static class TestReadFinishedInputStreamOperator extends 
AbstractStreamOperator<String>
+               implements TwoInputStreamOperator<String, Integer, String>, 
InputSelectable {
+
+               private InputSelection inputSelection;
+
+               TestReadFinishedInputStreamOperator() {
+                       super();
+
+                       this.inputSelection = InputSelection.FIRST;
+               }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return inputSelection;
+               }
+
+               @Override
+               public void processElement1(StreamRecord<String> element) {
+
+               }
+
+               @Override
+               public void processElement2(StreamRecord<Integer> element) {
+
+               }
+       }
+}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index f80983f..b30a789 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -40,6 +40,8 @@ import 
org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.InputSelectable;
+import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -49,6 +51,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.HashSet;
@@ -58,16 +62,26 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for {@link 
org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
- * implicitly also test the {@link 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
+ * Tests for {@link TwoInputStreamTask} and {@link 
TwoInputSelectableStreamTask}. Theses tests
+ * implicitly also test the {@link 
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}
+ * and {@link 
org.apache.flink.streaming.runtime.io.StreamTwoInputSelectableProcessor}.
  *
  * <p>Note:<br>
  * We only use a {@link CoStreamMap} operator here. We also test the 
individual operators but Map is
  * used as a representative to test TwoInputStreamTask, since 
TwoInputStreamTask is used for all
  * TwoInputStreamOperators.
  */
+@RunWith(Parameterized.class)
 public class TwoInputStreamTaskTest {
 
+       @Parameterized.Parameter
+       public boolean isInputSelectable;
+
+       @Parameterized.Parameters(name = "isInputSelectable = {0}")
+       public static List<Boolean> parameters() {
+               return Arrays.asList(Boolean.FALSE, Boolean.TRUE);
+       }
+
        /**
         * This test verifies that open() and close() are correctly called. 
This test also verifies
         * that timestamps of emitted elements are correct. {@link CoStreamMap} 
assigns the input
@@ -78,12 +92,13 @@ public class TwoInputStreamTaskTest {
        public void testOpenCloseAndTimestamps() throws Exception {
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
                                new TwoInputStreamTaskTestHarness<>(
-                                               TwoInputStreamTask::new,
+                                               isInputSelectable ? 
TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
                                                BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
+               CoStreamMap<String, Integer, String> coMapOperator = 
isInputSelectable ?
+                       new AnyReadingCoStreamMap<>(new 
TestOpenCloseMapFunction()) : new CoStreamMap<>(new TestOpenCloseMapFunction());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
@@ -125,7 +140,7 @@ public class TwoInputStreamTaskTest {
 
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
                        new TwoInputStreamTaskTestHarness<String, Integer, 
String>(
-                               TwoInputStreamTask::new,
+                               isInputSelectable ? 
TwoInputSelectableStreamTask::new : TwoInputStreamTask::new,
                                2, 2, new int[] {1, 2},
                                BasicTypeInfo.STRING_TYPE_INFO,
                                BasicTypeInfo.INT_TYPE_INFO,
@@ -133,7 +148,8 @@ public class TwoInputStreamTaskTest {
                testHarness.setupOutputForSingletonOperatorChain();
 
                StreamConfig streamConfig = testHarness.getStreamConfig();
-               CoStreamMap<String, Integer, String> coMapOperator = new 
CoStreamMap<String, Integer, String>(new IdentityMap());
+               CoStreamMap<String, Integer, String> coMapOperator = 
isInputSelectable ?
+                       new AnyReadingCoStreamMap<>(new IdentityMap()) : new 
CoStreamMap<>(new IdentityMap());
                streamConfig.setStreamOperator(coMapOperator);
                streamConfig.setOperatorID(new OperatorID());
 
@@ -235,6 +251,11 @@ public class TwoInputStreamTaskTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testCheckpointBarriers() throws Exception {
+               if (isInputSelectable) {
+                       // In the case of selective reading, checkpoints are 
not currently supported, and we skip this test
+                       return;
+               }
+
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
                                new TwoInputStreamTaskTestHarness<String, 
Integer, String>(
                                                TwoInputStreamTask::new,
@@ -319,6 +340,11 @@ public class TwoInputStreamTaskTest {
        @Test
        @SuppressWarnings("unchecked")
        public void testOvertakingCheckpointBarriers() throws Exception {
+               if (isInputSelectable) {
+                       // In the case of selective reading, checkpoints are 
not currently supported, and we skip this test
+                       return;
+               }
+
                final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness =
                                new TwoInputStreamTaskTestHarness<>(
                                                TwoInputStreamTask::new,
@@ -395,7 +421,9 @@ public class TwoInputStreamTaskTest {
 
        @Test
        public void testOperatorMetricReuse() throws Exception {
-               final TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               final TwoInputStreamTaskTestHarness<String, String, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                       isInputSelectable ? TwoInputSelectableStreamTask::new : 
TwoInputStreamTask::new,
+                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
                testHarness.setupOperatorChain(new OperatorID(), new 
DuplicatingOperator())
                        .chain(new OperatorID(), new 
OneInputStreamTaskTest.DuplicatingOperator(), 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
@@ -442,7 +470,8 @@ public class TwoInputStreamTaskTest {
                testHarness.waitForTaskCompletion();
        }
 
-       static class DuplicatingOperator extends AbstractStreamOperator<String> 
implements TwoInputStreamOperator<String, String, String> {
+       static class DuplicatingOperator extends AbstractStreamOperator<String>
+               implements TwoInputStreamOperator<String, String, String>, 
InputSelectable {
 
                @Override
                public void processElement1(StreamRecord<String> element) {
@@ -455,13 +484,21 @@ public class TwoInputStreamTaskTest {
                        output.collect(element);
                        output.collect(element);
                }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return InputSelection.ALL;
+               }
        }
 
        @Test
        public void testWatermarkMetrics() throws Exception {
-               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(TwoInputStreamTask::new, 
BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, 
BasicTypeInfo.STRING_TYPE_INFO);
+               final TwoInputStreamTaskTestHarness<String, Integer, String> 
testHarness = new TwoInputStreamTaskTestHarness<>(
+                       isInputSelectable ? TwoInputSelectableStreamTask::new : 
TwoInputStreamTask::new,
+                       BasicTypeInfo.STRING_TYPE_INFO, 
BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
 
-               CoStreamMap<String, Integer, String> headOperator = new 
CoStreamMap<>(new IdentityMap());
+               CoStreamMap<String, Integer, String> headOperator = 
isInputSelectable ?
+                       new AnyReadingCoStreamMap<>(new IdentityMap()) : new 
CoStreamMap<>(new IdentityMap());
                final OperatorID headOperatorId = new OperatorID();
 
                OneInputStreamTaskTest.WatermarkMetricOperator chainedOperator 
= new OneInputStreamTaskTest.WatermarkMetricOperator();
@@ -621,5 +658,18 @@ public class TwoInputStreamTaskTest {
                        return value.toString();
                }
        }
+
+       private static class AnyReadingCoStreamMap<IN1, IN2, OUT> extends 
CoStreamMap<IN1, IN2, OUT>
+               implements InputSelectable {
+
+               public AnyReadingCoStreamMap(CoMapFunction<IN1, IN2, OUT> 
mapper) {
+                       super(mapper);
+               }
+
+               @Override
+               public InputSelection nextSelection() {
+                       return InputSelection.ALL;
+               }
+       }
 }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
index 4c1c424..aa900ed 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -36,7 +36,7 @@ import java.util.function.Function;
 
 
 /**
- * Test harness for testing a {@link TwoInputStreamTask}.
+ * Test harness for testing a {@link TwoInputStreamTask} or a {@link 
TwoInputSelectableStreamTask}.
  *
  * <p>This mock Invokable provides the task with a basic runtime context and 
allows pushing elements
  * and watermarks into the task. {@link #getOutput()} can be used to get the 
emitted elements
@@ -71,7 +71,7 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
         * it should be assigned to the first (1), or second (2) input of the 
task.
         */
        public TwoInputStreamTaskTestHarness(
-                       Function<Environment, ? extends TwoInputStreamTask<IN1, 
IN2, OUT>> taskFactory,
+                       Function<Environment, ? extends 
AbstractTwoInputStreamTask<IN1, IN2, OUT>> taskFactory,
                        int numInputGates,
                        int numInputChannelsPerGate,
                        int[] inputGateAssignment,
@@ -98,7 +98,7 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
         * second task input.
         */
        public TwoInputStreamTaskTestHarness(
-                       Function<Environment, ? extends TwoInputStreamTask<IN1, 
IN2, OUT>> taskFactory,
+                       Function<Environment, ? extends 
AbstractTwoInputStreamTask<IN1, IN2, OUT>> taskFactory,
                        TypeInformation<IN1> inputType1,
                        TypeInformation<IN2> inputType2,
                        TypeInformation<OUT> outputType) {
@@ -169,8 +169,8 @@ public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> 
extends StreamTaskTest
 
        @Override
        @SuppressWarnings("unchecked")
-       public TwoInputStreamTask<IN1, IN2, OUT> getTask() {
-               return (TwoInputStreamTask<IN1, IN2, OUT>) super.getTask();
+       public AbstractTwoInputStreamTask<IN1, IN2, OUT> getTask() {
+               return (AbstractTwoInputStreamTask<IN1, IN2, OUT>) 
super.getTask();
        }
 }
 
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
index 92af598..97495e5 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/generated/GeneratedClass.java
@@ -91,4 +91,8 @@ public abstract class GeneratedClass<T> implements 
Serializable {
        public Object[] getReferences() {
                return references;
        }
+
+       public Class<?> getClass(ClassLoader classLoader) {
+               return compile(classLoader);
+       }
 }
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java
index 7f1d6f8..61d6fb8 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/CodeGenOperatorFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.table.runtime;
 
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
@@ -56,6 +57,11 @@ public class CodeGenOperatorFactory<OUT> implements 
StreamOperatorFactory<OUT> {
                return strategy;
        }
 
+       @Override
+       public boolean isOperatorSelectiveReading() {
+               return 
InputSelectable.class.isAssignableFrom(generatedClass.getClass(Thread.currentThread().getContextClassLoader()));
+       }
+
        public GeneratedClass<? extends StreamOperator<OUT>> 
getGeneratedClass() {
                return generatedClass;
        }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
new file mode 100644
index 0000000..693de32
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskSelectiveReadingITCase.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import 
org.apache.flink.streaming.runtime.tasks.StreamTaskSelectiveReadingTest.SequentialReadingStreamOperator;
+import org.apache.flink.test.streaming.runtime.util.TestListResultSink;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for selective reading of {@code TwoInputSelectableStreamTask}.
+ */
+public class StreamTaskSelectiveReadingITCase {
+       @Test
+       public void testSequentialReading() throws Exception {
+
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setParallelism(1);
+
+               DataStream<String> source0 = env.addSource(
+                       new TestStringSource("Source0",
+                               new String[] {
+                                       "Hello-1", "Hello-2", "Hello-3", 
"Hello-4", "Hello-5", "Hello-6"
+                       }));
+               DataStream<Integer> source1 = env.addSource(
+                       new TestIntegerSource("Source1",
+                               new Integer[] {
+                                       1, 2, 3
+                               }))
+                       .setParallelism(2);
+               TestListResultSink<String> resultSink = new 
TestListResultSink<>();
+
+               TwoInputStreamOperator<String, Integer, String> 
twoInputStreamOperator = new SequentialReadingStreamOperator("Operator0");
+               
twoInputStreamOperator.setChainingStrategy(ChainingStrategy.NEVER);
+
+               source0.connect(source1)
+                       .transform(
+                               "Custom Operator",
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               twoInputStreamOperator
+                       )
+                       .addSink(resultSink);
+
+               env.execute("Selective reading test");
+
+               List<String> result = resultSink.getResult();
+
+               List<String> expected1 = Arrays.asList(
+                       "[Operator0-1]: [Source0-0]: Hello-1",
+                       "[Operator0-1]: [Source0-0]: Hello-2",
+                       "[Operator0-1]: [Source0-0]: Hello-3",
+                       "[Operator0-1]: [Source0-0]: Hello-4",
+                       "[Operator0-1]: [Source0-0]: Hello-5",
+                       "[Operator0-1]: [Source0-0]: Hello-6"
+               );
+
+               List<String> expected2 = Arrays.asList(
+                       "[Operator0-2]: 1",
+                       "[Operator0-2]: 2",
+                       "[Operator0-2]: 3",
+                       "[Operator0-2]: 2",
+                       "[Operator0-2]: 4",
+                       "[Operator0-2]: 6"
+               );
+               Collections.sort(expected2);
+
+               assertEquals(expected1.size() + expected2.size(), 
result.size());
+               assertEquals(expected1, result.subList(0, expected1.size()));
+
+               List<String> result2 = result.subList(expected1.size(), 
expected1.size() + expected2.size());
+               Collections.sort(result2);
+               assertEquals(expected2, result2);
+       }
+
+       private abstract static class TestSource<T> extends 
RichParallelSourceFunction<T> {
+               private static final long serialVersionUID = 1L;
+
+               protected final String name;
+
+               private volatile boolean running = true;
+               private transient RuntimeContext context;
+
+               private final T[] elements;
+
+               public TestSource(String name, T[] elements) {
+                       this.name = name;
+                       this.elements = elements;
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       this.context = getRuntimeContext();
+               }
+
+               @Override
+               public void run(SourceContext<T> ctx) throws Exception {
+                       int elementIndex = 0;
+                       while (running) {
+                               if (elementIndex < elements.length) {
+                                       synchronized (ctx.getCheckpointLock()) {
+                                               
ctx.collect(outValue(elements[elementIndex], context.getIndexOfThisSubtask()));
+                                               elementIndex++;
+                                       }
+                               } else {
+                                       break;
+                               }
+                       }
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+
+               protected abstract T outValue(T inValue, int subTaskIndex);
+       }
+
+       private static class TestStringSource extends TestSource<String> {
+
+               public TestStringSource(String name, String[] elements) {
+                       super(name, elements);
+               }
+
+               @Override
+               protected String outValue(String inValue, int subTaskIndex) {
+                       return "[" + name + "-" + subTaskIndex + "]: " + 
inValue;
+               }
+       }
+
+       private static class TestIntegerSource extends TestSource<Integer> {
+
+               public TestIntegerSource(String name, Integer[] elements) {
+                       super(name, elements);
+               }
+
+               @Override
+               protected Integer outValue(Integer inValue, int subTaskIndex) {
+                       return inValue * (subTaskIndex + 1);
+               }
+       }
+}

Reply via email to