This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6299b09bbba30932351d78d9897fc0e86c9dbe9 Author: sunhaibotb <[email protected]> AuthorDate: Thu Jun 6 21:38:06 2019 +0800 [FLINK-11877][runtime] Abstract AbstractTwoInputStreamTask from TwoInputStreamTask --- ...amTask.java => AbstractTwoInputStreamTask.java} | 55 ++++--------- .../runtime/tasks/TwoInputStreamTask.java | 90 ++++++---------------- 2 files changed, 36 insertions(+), 109 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java similarity index 71% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java index 2092c45..7c3ddaa 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AbstractTwoInputStreamTask.java @@ -25,32 +25,31 @@ import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; -import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; import java.util.ArrayList; +import java.util.Collection; import java.util.List; /** - * A {@link StreamTask} for executing a {@link TwoInputStreamOperator}. + * Abstract class for executing a {@link TwoInputStreamOperator}. */ @Internal -public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> { +public abstract class AbstractTwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> { - private StreamTwoInputProcessor<IN1, IN2> inputProcessor; - - private final WatermarkGauge input1WatermarkGauge; - private final WatermarkGauge input2WatermarkGauge; - private final MinWatermarkGauge minInputWatermarkGauge; + protected final WatermarkGauge input1WatermarkGauge; + protected final WatermarkGauge input2WatermarkGauge; + protected final MinWatermarkGauge minInputWatermarkGauge; /** * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). * * @param env The task environment for this task. */ - public TwoInputStreamTask(Environment env) { + public AbstractTwoInputStreamTask(Environment env) { super(env); + input1WatermarkGauge = new WatermarkGauge(); input2WatermarkGauge = new WatermarkGauge(); minInputWatermarkGauge = new MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge); @@ -86,20 +85,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS } } - this.inputProcessor = new StreamTwoInputProcessor<>( - inputList1, inputList2, - inputDeserializer1, inputDeserializer2, - this, - configuration.getCheckpointMode(), - getCheckpointLock(), - getEnvironment().getIOManager(), - getEnvironment().getTaskManagerInfo().getConfiguration(), - getStreamStatusMaintainer(), - this.headOperator, - getEnvironment().getMetricGroup().getIOMetricGroup(), - input1WatermarkGauge, - input2WatermarkGauge, - getTaskNameWithSubtaskAndId()); + createInputProcessor(inputList1, inputList2, inputDeserializer1, inputDeserializer2); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); @@ -108,22 +94,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue); } - @Override - protected void performDefaultAction(ActionContext context) throws Exception { - if (!inputProcessor.processInput()) { - context.allActionsCompleted(); - } - } - - @Override - protected void cleanup() throws Exception { - if (inputProcessor != null) { - inputProcessor.cleanup(); - } - } - - @Override - protected void cancelTask() { - - } + protected abstract void createInputProcessor( + Collection<InputGate> inputGates1, + Collection<InputGate> inputGates2, + TypeSerializer<IN1> inputDeserializer1, + TypeSerializer<IN2> inputDeserializer2) throws Exception; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java index 2092c45..8cfcac7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java @@ -21,91 +21,45 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; -import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor; -import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; -import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; -import java.util.ArrayList; -import java.util.List; +import java.util.Collection; /** - * A {@link StreamTask} for executing a {@link TwoInputStreamOperator}. + * A {@link StreamTask} that executes a {@link TwoInputStreamOperator} but does not support + * the {@link TwoInputStreamOperator} to select input for reading. */ @Internal -public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputStreamOperator<IN1, IN2, OUT>> { +public class TwoInputStreamTask<IN1, IN2, OUT> extends AbstractTwoInputStreamTask<IN1, IN2, OUT> { private StreamTwoInputProcessor<IN1, IN2> inputProcessor; - private final WatermarkGauge input1WatermarkGauge; - private final WatermarkGauge input2WatermarkGauge; - private final MinWatermarkGauge minInputWatermarkGauge; - - /** - * Constructor for initialization, possibly with initial state (recovery / savepoint / etc). - * - * @param env The task environment for this task. - */ public TwoInputStreamTask(Environment env) { super(env); - input1WatermarkGauge = new WatermarkGauge(); - input2WatermarkGauge = new WatermarkGauge(); - minInputWatermarkGauge = new MinWatermarkGauge(input1WatermarkGauge, input2WatermarkGauge); } @Override - public void init() throws Exception { - StreamConfig configuration = getConfiguration(); - ClassLoader userClassLoader = getUserCodeClassLoader(); - - TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader); - TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader); - - int numberOfInputs = configuration.getNumberOfInputs(); - - ArrayList<InputGate> inputList1 = new ArrayList<InputGate>(); - ArrayList<InputGate> inputList2 = new ArrayList<InputGate>(); - - List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader); - - for (int i = 0; i < numberOfInputs; i++) { - int inputType = inEdges.get(i).getTypeNumber(); - InputGate reader = getEnvironment().getInputGate(i); - switch (inputType) { - case 1: - inputList1.add(reader); - break; - case 2: - inputList2.add(reader); - break; - default: - throw new RuntimeException("Invalid input type number: " + inputType); - } - } + protected void createInputProcessor( + Collection<InputGate> inputGates1, + Collection<InputGate> inputGates2, + TypeSerializer<IN1> inputDeserializer1, + TypeSerializer<IN2> inputDeserializer2) throws Exception { this.inputProcessor = new StreamTwoInputProcessor<>( - inputList1, inputList2, - inputDeserializer1, inputDeserializer2, - this, - configuration.getCheckpointMode(), - getCheckpointLock(), - getEnvironment().getIOManager(), - getEnvironment().getTaskManagerInfo().getConfiguration(), - getStreamStatusMaintainer(), - this.headOperator, - getEnvironment().getMetricGroup().getIOMetricGroup(), - input1WatermarkGauge, - input2WatermarkGauge, - getTaskNameWithSubtaskAndId()); - - headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge); - headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge); - headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge); - // wrap watermark gauge since registered metrics must be unique - getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue); + inputGates1, inputGates2, + inputDeserializer1, inputDeserializer2, + this, + getConfiguration().getCheckpointMode(), + getCheckpointLock(), + getEnvironment().getIOManager(), + getEnvironment().getTaskManagerInfo().getConfiguration(), + getStreamStatusMaintainer(), + this.headOperator, + getEnvironment().getMetricGroup().getIOMetricGroup(), + input1WatermarkGauge, + input2WatermarkGauge, + getTaskNameWithSubtaskAndId()); } @Override
