This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6299b09bbba30932351d78d9897fc0e86c9dbe9
Author: sunhaibotb <[email protected]>
AuthorDate: Thu Jun 6 21:38:06 2019 +0800

    [FLINK-11877][runtime] Abstract AbstractTwoInputStreamTask from 
TwoInputStreamTask
---
 ...amTask.java => AbstractTwoInputStreamTask.java} | 55 ++++---------
 .../runtime/tasks/TwoInputStreamTask.java          | 90 ++++++----------------
 2 files changed, 36 insertions(+), 109 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
similarity index 71%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
index 2092c45..7c3ddaa 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java
@@ -25,32 +25,31 @@ import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 
 /**
- * A {@link StreamTask} for executing a {@link TwoInputStreamOperator}.
+ * Abstract class for executing a {@link TwoInputStreamOperator}.
  */
 @Internal
-public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, 
TwoInputStreamOperator<IN1, IN2, OUT>> {
+public abstract class AbstractTwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> {
 
-       private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
-
-       private final WatermarkGauge input1WatermarkGauge;
-       private final WatermarkGauge input2WatermarkGauge;
-       private final MinWatermarkGauge minInputWatermarkGauge;
+       protected final WatermarkGauge input1WatermarkGauge;
+       protected final WatermarkGauge input2WatermarkGauge;
+       protected final MinWatermarkGauge minInputWatermarkGauge;
 
        /**
         * Constructor for initialization, possibly with initial state 
(recovery / savepoint / etc).
         *
         * @param env The task environment for this task.
         */
-       public TwoInputStreamTask(Environment env) {
+       public AbstractTwoInputStreamTask(Environment env) {
                super(env);
+
                input1WatermarkGauge = new WatermarkGauge();
                input2WatermarkGauge = new WatermarkGauge();
                minInputWatermarkGauge = new 
MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge);
@@ -86,20 +85,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                        }
                }
 
-               this.inputProcessor = new StreamTwoInputProcessor<>(
-                               inputList1, inputList2,
-                               inputDeserializer1, inputDeserializer2,
-                               this,
-                               configuration.getCheckpointMode(),
-                               getCheckpointLock(),
-                               getEnvironment().getIOManager(),
-                               
getEnvironment().getTaskManagerInfo().getConfiguration(),
-                               getStreamStatusMaintainer(),
-                               this.headOperator,
-                               
getEnvironment().getMetricGroup().getIOMetricGroup(),
-                               input1WatermarkGauge,
-                               input2WatermarkGauge,
-                               getTaskNameWithSubtaskAndId());
+               createInputProcessor(inputList1, inputList2, 
inputDeserializer1, inputDeserializer2);
 
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
                
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
@@ -108,22 +94,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends 
StreamTask<OUT, TwoInputS
                
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge::getValue);
        }
 
-       @Override
-       protected void performDefaultAction(ActionContext context) throws 
Exception {
-               if (!inputProcessor.processInput()) {
-                       context.allActionsCompleted();
-               }
-       }
-
-       @Override
-       protected void cleanup() throws Exception {
-               if (inputProcessor != null) {
-                       inputProcessor.cleanup();
-               }
-       }
-
-       @Override
-       protected void cancelTask() {
-
-       }
+       protected abstract void createInputProcessor(
+               Collection<InputGate> inputGates1,
+               Collection<InputGate> inputGates2,
+               TypeSerializer<IN1> inputDeserializer1,
+               TypeSerializer<IN2> inputDeserializer2) throws Exception;
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 2092c45..8cfcac7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -21,91 +21,45 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.metrics.MetricNames;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
-import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
-import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
 
 /**
- * A {@link StreamTask} for executing a {@link TwoInputStreamOperator}.
+ * A {@link StreamTask} that executes a {@link TwoInputStreamOperator} but 
does not support
+ * the {@link TwoInputStreamOperator} to select input for reading.
  */
 @Internal
-public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, 
TwoInputStreamOperator<IN1, IN2, OUT>> {
+public class TwoInputStreamTask<IN1, IN2, OUT> extends 
AbstractTwoInputStreamTask<IN1, IN2, OUT> {
 
        private StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
-       private final WatermarkGauge input1WatermarkGauge;
-       private final WatermarkGauge input2WatermarkGauge;
-       private final MinWatermarkGauge minInputWatermarkGauge;
-
-       /**
-        * Constructor for initialization, possibly with initial state 
(recovery / savepoint / etc).
-        *
-        * @param env The task environment for this task.
-        */
        public TwoInputStreamTask(Environment env) {
                super(env);
-               input1WatermarkGauge = new WatermarkGauge();
-               input2WatermarkGauge = new WatermarkGauge();
-               minInputWatermarkGauge = new 
MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge);
        }
 
        @Override
-       public void init() throws Exception {
-               StreamConfig configuration = getConfiguration();
-               ClassLoader userClassLoader = getUserCodeClassLoader();
-
-               TypeSerializer<IN1> inputDeserializer1 = 
configuration.getTypeSerializerIn1(userClassLoader);
-               TypeSerializer<IN2> inputDeserializer2 = 
configuration.getTypeSerializerIn2(userClassLoader);
-
-               int numberOfInputs = configuration.getNumberOfInputs();
-
-               ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-               ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
-               List<StreamEdge> inEdges = 
configuration.getInPhysicalEdges(userClassLoader);
-
-               for (int i = 0; i < numberOfInputs; i++) {
-                       int inputType = inEdges.get(i).getTypeNumber();
-                       InputGate reader = getEnvironment().getInputGate(i);
-                       switch (inputType) {
-                               case 1:
-                                       inputList1.add(reader);
-                                       break;
-                               case 2:
-                                       inputList2.add(reader);
-                                       break;
-                               default:
-                                       throw new RuntimeException("Invalid 
input type number: " + inputType);
-                       }
-               }
+       protected void createInputProcessor(
+               Collection<InputGate> inputGates1,
+               Collection<InputGate> inputGates2,
+               TypeSerializer<IN1> inputDeserializer1,
+               TypeSerializer<IN2> inputDeserializer2) throws Exception {
 
                this.inputProcessor = new StreamTwoInputProcessor<>(
-                               inputList1, inputList2,
-                               inputDeserializer1, inputDeserializer2,
-                               this,
-                               configuration.getCheckpointMode(),
-                               getCheckpointLock(),
-                               getEnvironment().getIOManager(),
-                               
getEnvironment().getTaskManagerInfo().getConfiguration(),
-                               getStreamStatusMaintainer(),
-                               this.headOperator,
-                               
getEnvironment().getMetricGroup().getIOMetricGroup(),
-                               input1WatermarkGauge,
-                               input2WatermarkGauge,
-                               getTaskNameWithSubtaskAndId());
-
-               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge);
-               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, 
input1WatermarkGauge);
-               
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, 
input2WatermarkGauge);
-               // wrap watermark gauge since registered metrics must be unique
-               
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, 
minInputWatermarkGauge::getValue);
+                       inputGates1, inputGates2,
+                       inputDeserializer1, inputDeserializer2,
+                       this,
+                       getConfiguration().getCheckpointMode(),
+                       getCheckpointLock(),
+                       getEnvironment().getIOManager(),
+                       
getEnvironment().getTaskManagerInfo().getConfiguration(),
+                       getStreamStatusMaintainer(),
+                       this.headOperator,
+                       getEnvironment().getMetricGroup().getIOMetricGroup(),
+                       input1WatermarkGauge,
+                       input2WatermarkGauge,
+                       getTaskNameWithSubtaskAndId());
        }
 
        @Override

Reply via email to