This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 16a0334879ac468b8cb5bda67116b5eab6549d1a
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Wed Feb 26 09:39:53 2020 +0100

    [FLINK-16316][operators] Implement new AbstractStreamOperatorV2 as a 
replacement for AbstractStreamOperator
    
    The new base class for operators tries to address couple of limitations in 
the AbstractStreamOperator like:
    - lack of support for multiple inputs
    - setup(...) method
---
 .../api/operators/AbstractStreamOperator.java      |  10 +-
 ...Operator.java => AbstractStreamOperatorV2.java} | 288 +++++++--------------
 .../api/operators/StreamOperatorParameters.java    |   6 +-
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  65 ++++-
 .../util/TestBoundedMultipleInputOperator.java     |   8 +-
 .../streaming/runtime/MultipleInputITCase.java     |  34 +--
 6 files changed, 182 insertions(+), 229 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6507ce9..61925f4 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,7 +72,11 @@ import java.util.Optional;
  * the timer service, timer callbacks are also guaranteed not to be called 
concurrently with
  * methods on {@code StreamOperator}.
  *
- * @param <OUT> The output type of the operator
+ * <p>Note, this class is going to be removed and replaced in the future by 
{@link AbstractStreamOperatorV2}.
+ * However as {@link AbstractStreamOperatorV2} is currently experimental, 
{@link AbstractStreamOperator}
+ * has not been deprecated just yet.
+ *
+ * @param <OUT> The output type of the operator.
  */
 @PublicEvolving
 public abstract class AbstractStreamOperator<OUT>
@@ -385,15 +389,18 @@ public abstract class AbstractStreamOperator<OUT>
         * to interact with systems such as broadcast variables and managed 
state. This also allows
         * to register timers.
         */
+       @VisibleForTesting
        public StreamingRuntimeContext getRuntimeContext() {
                return runtimeContext;
        }
 
        @SuppressWarnings("unchecked")
+       @VisibleForTesting
        public <K> KeyedStateBackend<K> getKeyedStateBackend() {
                return stateHandler.getKeyedStateBackend();
        }
 
+       @VisibleForTesting
        public OperatorStateBackend getOperatorStateBackend() {
                return stateHandler.getOperatorStateBackend();
        }
@@ -402,6 +409,7 @@ public abstract class AbstractStreamOperator<OUT>
         * Returns the {@link ProcessingTimeService} responsible for getting 
the current
         * processing time and registering timers.
         */
+       @VisibleForTesting
        public ProcessingTimeService getProcessingTimeService() {
                return processingTimeService;
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
similarity index 66%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
index 6507ce9..a4ad021 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.KeyedStateStore;
@@ -56,118 +56,98 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Locale;
 import java.util.Optional;
 
 /**
- * Base class for all stream operators. Operators that contain a user function 
should extend the class
- * {@link AbstractUdfStreamOperator} instead (which is a specialized subclass 
of this class).
+ * New base class for all stream operators, intended to eventually replace 
{@link AbstractStreamOperator}.
+ * Currently intended to work smoothly just with {@link 
MultipleInputStreamOperator}.
  *
- * <p>For concrete implementations, one of the following two interfaces must 
also be implemented, to
- * mark the operator as unary or binary:
- * {@link OneInputStreamOperator} or {@link TwoInputStreamOperator}.
+ * <p>One note-able difference in comparison to {@link AbstractStreamOperator} 
is lack of
+ * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in 
favor of initialisation
+ * in the constructor, and removed some tight coupling with classes like 
{@link StreamTask}.
  *
- * <p>Methods of {@code StreamOperator} are guaranteed not to be called 
concurrently. Also, if using
- * the timer service, timer callbacks are also guaranteed not to be called 
concurrently with
- * methods on {@code StreamOperator}.
+ * <p>Methods are guaranteed not to be called concurrently.
  *
  * @param <OUT> The output type of the operator
  */
-@PublicEvolving
-public abstract class AbstractStreamOperator<OUT>
-               implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, 
CheckpointedStreamOperator, Serializable {
-
-       private static final long serialVersionUID = 1L;
-
+@Experimental
+public abstract class AbstractStreamOperatorV2<OUT> implements 
StreamOperator<OUT>, CheckpointedStreamOperator {
        /** The logger used by the operator class and its subclasses. */
-       protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamOperator.class);
-
-       // ----------- configuration properties -------------
-
-       // A sane default for most operators
-       protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
-
-       // ---------------- runtime fields ------------------
-
-       /** The task that contains this operator (and other operators in the 
same chain). */
-       private transient StreamTask<?, ?> container;
-
-       protected transient StreamConfig config;
-
-       protected transient Output<StreamRecord<OUT>> output;
-
-       /** The runtime context for UDFs. */
-       private transient StreamingRuntimeContext runtimeContext;
-
-       // ---------------- key/value state ------------------
-
-       /**
-        * {@code KeySelector} for extracting a key from an element being 
processed. This is used to
-        * scope keyed state to a key. This is null if the operator is not a 
keyed operator.
-        *
-        * <p>This is for elements from the first input.
-        */
-       private transient KeySelector<?, ?> stateKeySelector1;
-
-       /**
-        * {@code KeySelector} for extracting a key from an element being 
processed. This is used to
-        * scope keyed state to a key. This is null if the operator is not a 
keyed operator.
-        *
-        * <p>This is for elements from the second input.
-        */
-       private transient KeySelector<?, ?> stateKeySelector2;
+       protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractStreamOperatorV2.class);
 
-       private transient StreamOperatorStateHandler stateHandler;
-
-       private transient InternalTimeServiceManager<?> timeServiceManager;
-
-       // --------------- Metrics ---------------------------
+       protected final StreamConfig config;
+       protected final Output<StreamRecord<OUT>> output;
+       private final StreamingRuntimeContext runtimeContext;
+       private final ExecutionConfig executionConfig;
+       private final ClassLoader userCodeClassLoader;
+       private final CloseableRegistry cancelables;
+       private final long[] inputWatermarks;
 
        /** Metric group for the operator. */
-       protected transient OperatorMetricGroup metrics;
-
-       protected transient LatencyStats latencyStats;
+       protected final OperatorMetricGroup metrics;
+       protected final LatencyStats latencyStats;
+       protected final ProcessingTimeService processingTimeService;
 
-       // ---------------- time handler ------------------
-
-       protected transient ProcessingTimeService processingTimeService;
-
-       // ---------------- two-input operator watermarks ------------------
+       private StreamOperatorStateHandler stateHandler;
+       private InternalTimeServiceManager<?> timeServiceManager;
 
        // We keep track of watermarks from both inputs, the combined input is 
the minimum
        // Once the minimum advances we emit a new watermark for downstream 
operators
        private long combinedWatermark = Long.MIN_VALUE;
-       private long input1Watermark = Long.MIN_VALUE;
-       private long input2Watermark = Long.MIN_VALUE;
-
-       // 
------------------------------------------------------------------------
-       //  Life Cycle
-       // 
------------------------------------------------------------------------
 
-       @Override
-       public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
Output<StreamRecord<OUT>> output) {
-               final Environment environment = containingTask.getEnvironment();
-               this.container = containingTask;
-               this.config = config;
+       public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> 
parameters, int numberOfInputs) {
+               inputWatermarks = new long[numberOfInputs];
+               Arrays.fill(inputWatermarks, Long.MIN_VALUE);
+               final Environment environment = 
parameters.getContainingTask().getEnvironment();
+               config = parameters.getStreamConfig();
+               CountingOutput<OUT> countingOutput;
+               OperatorMetricGroup operatorMetricGroup;
                try {
-                       OperatorMetricGroup operatorMetricGroup = 
environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
-                       this.output = new CountingOutput(output, 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
+                       operatorMetricGroup = 
environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), 
config.getOperatorName());
+                       countingOutput = new 
CountingOutput(parameters.getOutput(), 
operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
                        if (config.isChainStart()) {
                                
operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
                        }
                        if (config.isChainEnd()) {
                                
operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask();
                        }
-                       this.metrics = operatorMetricGroup;
                } catch (Exception e) {
                        LOG.warn("An error occurred while instantiating task 
metrics.", e);
-                       this.metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
-                       this.output = output;
+                       countingOutput = null;
+                       operatorMetricGroup = null;
+               }
+
+               if (countingOutput == null || operatorMetricGroup == null) {
+                       metrics = 
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+                       output = parameters.getOutput();
+               }
+               else {
+                       metrics = operatorMetricGroup;
+                       output = countingOutput;
                }
 
+               latencyStats = createLatencyStats(
+                       environment.getTaskManagerInfo().getConfiguration(),
+                       
parameters.getContainingTask().getIndexInSubtaskGroup());
+
+               processingTimeService = 
Preconditions.checkNotNull(parameters.getProcessingTimeService());
+               executionConfig = 
parameters.getContainingTask().getExecutionConfig();
+               userCodeClassLoader = 
parameters.getContainingTask().getUserCodeClassLoader();
+               cancelables = parameters.getContainingTask().getCancelables();
+
+               runtimeContext = new StreamingRuntimeContext(
+                       environment,
+                       environment.getAccumulatorRegistry().getUserMap(),
+                       operatorMetricGroup,
+                       getOperatorID(),
+                       processingTimeService,
+                       null);
+       }
+
+       private LatencyStats createLatencyStats(Configuration 
taskManagerConfig, int indexInSubtaskGroup) {
                try {
-                       Configuration taskManagerConfig = 
environment.getTaskManagerInfo().getConfiguration();
                        int historySize = 
taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE);
                        if (historySize <= 0) {
                                LOG.warn("{} has been set to a value equal or 
below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
@@ -187,40 +167,20 @@ public abstract class AbstractStreamOperator<OUT>
                                        granularity);
                        }
                        TaskManagerJobMetricGroup jobMetricGroup = 
this.metrics.parent().parent();
-                       this.latencyStats = new 
LatencyStats(jobMetricGroup.addGroup("latency"),
+                       return new 
LatencyStats(jobMetricGroup.addGroup("latency"),
                                historySize,
-                               container.getIndexInSubtaskGroup(),
+                               indexInSubtaskGroup,
                                getOperatorID(),
                                granularity);
                } catch (Exception e) {
                        LOG.warn("An error occurred while instantiating latency 
metrics.", e);
-                       this.latencyStats = new LatencyStats(
+                       return new LatencyStats(
                                
UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
                                1,
                                0,
                                new OperatorID(),
                                LatencyStats.Granularity.SINGLE);
                }
-
-               this.runtimeContext = new StreamingRuntimeContext(
-                       environment,
-                       environment.getAccumulatorRegistry().getUserMap(),
-                       getMetricGroup(),
-                       getOperatorID(),
-                       getProcessingTimeService(),
-                       null);
-
-               stateKeySelector1 = config.getStatePartitioner(0, 
getUserCodeClassloader());
-               stateKeySelector2 = config.getStatePartitioner(1, 
getUserCodeClassloader());
-       }
-
-       /**
-        * @deprecated The {@link ProcessingTimeService} instance should be 
passed by the operator
-        * constructor and this method will be removed along with {@link 
SetupableStreamOperator}.
-        */
-       @Deprecated
-       public void setProcessingTimeService(ProcessingTimeService 
processingTimeService) {
-               this.processingTimeService = 
Preconditions.checkNotNull(processingTimeService);
        }
 
        @Override
@@ -230,14 +190,8 @@ public abstract class AbstractStreamOperator<OUT>
 
        @Override
        public final void initializeState(StreamTaskStateInitializer 
streamTaskStateManager) throws Exception {
-
                final TypeSerializer<?> keySerializer = 
config.getStateKeySerializer(getUserCodeClassloader());
 
-               final StreamTask<?, ?> containingTask =
-                       Preconditions.checkNotNull(getContainingTask());
-               final CloseableRegistry streamTaskCloseableRegistry =
-                       
Preconditions.checkNotNull(containingTask.getCancelables());
-
                final StreamOperatorStateContext context =
                        streamTaskStateManager.streamOperatorStateContext(
                                getOperatorID(),
@@ -245,13 +199,12 @@ public abstract class AbstractStreamOperator<OUT>
                                getProcessingTimeService(),
                                this,
                                keySerializer,
-                               streamTaskCloseableRegistry,
+                               cancelables,
                                metrics);
 
-               stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), streamTaskCloseableRegistry);
+               stateHandler = new StreamOperatorStateHandler(context, 
getExecutionConfig(), cancelables);
                timeServiceManager = context.internalTimerServiceManager();
                stateHandler.initializeOperatorState(this);
-               
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
        }
 
        /**
@@ -301,10 +254,7 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @Override
-       public final OperatorSnapshotFutures snapshotState(
-                       long checkpointId,
-                       long timestamp,
-                       CheckpointOptions checkpointOptions,
+       public final OperatorSnapshotFutures snapshotState(long checkpointId, 
long timestamp, CheckpointOptions checkpointOptions,
                        CheckpointStreamFactory factory) throws Exception {
                return stateHandler.snapshotState(
                        this,
@@ -350,19 +300,15 @@ public abstract class AbstractStreamOperator<OUT>
         * @return The job's execution config.
         */
        public ExecutionConfig getExecutionConfig() {
-               return container.getExecutionConfig();
+               return executionConfig;
        }
 
        public StreamConfig getOperatorConfig() {
                return config;
        }
 
-       public StreamTask<?, ?> getContainingTask() {
-               return container;
-       }
-
        public ClassLoader getUserCodeClassloader() {
-               return container.getUserCodeClassLoader();
+               return userCodeClassLoader;
        }
 
        /**
@@ -390,10 +336,12 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        @SuppressWarnings("unchecked")
+       @VisibleForTesting
        public <K> KeyedStateBackend<K> getKeyedStateBackend() {
-               return stateHandler.getKeyedStateBackend();
+               return (KeyedStateBackend<K>) 
stateHandler.getKeyedStateBackend();
        }
 
+       @VisibleForTesting
        public OperatorStateBackend getOperatorStateBackend() {
                return stateHandler.getOperatorStateBackend();
        }
@@ -402,6 +350,7 @@ public abstract class AbstractStreamOperator<OUT>
         * Returns the {@link ProcessingTimeService} responsible for getting 
the current
         * processing time and registering timers.
         */
+       @VisibleForTesting
        public ProcessingTimeService getProcessingTimeService() {
                return processingTimeService;
        }
@@ -417,8 +366,8 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        protected <N, S extends State, T> S getOrCreateKeyedState(
-                       TypeSerializer<N> namespaceSerializer,
-                       StateDescriptor<S, T> stateDescriptor) throws Exception 
{
+               TypeSerializer<N> namespaceSerializer,
+               StateDescriptor<S, T> stateDescriptor) throws Exception {
                return stateHandler.getOrCreateKeyedState(namespaceSerializer, 
stateDescriptor);
        }
 
@@ -429,25 +378,13 @@ public abstract class AbstractStreamOperator<OUT>
         * @throws Exception Thrown, if the state backend cannot create the 
key/value state.
         */
        protected <S extends State, N> S getPartitionedState(
-                       N namespace,
-                       TypeSerializer<N> namespaceSerializer,
-                       StateDescriptor<S, ?> stateDescriptor) throws Exception 
{
+               N namespace,
+               TypeSerializer<N> namespaceSerializer,
+               StateDescriptor<S, ?> stateDescriptor) throws Exception {
                return stateHandler.getPartitionedState(namespace, 
namespaceSerializer, stateDescriptor);
        }
 
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public void setKeyContextElement1(StreamRecord record) throws Exception 
{
-               setKeyContextElement(record, stateKeySelector1);
-       }
-
-       @Override
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public void setKeyContextElement2(StreamRecord record) throws Exception 
{
-               setKeyContextElement(record, stateKeySelector2);
-       }
-
-       private <T> void setKeyContextElement(StreamRecord<T> record, 
KeySelector<T, ?> selector) throws Exception {
+       protected <T> void internalSetKeyContextElement(StreamRecord<T> record, 
KeySelector<T, ?> selector) throws Exception {
                if (selector != null) {
                        Object key = selector.getKey(record.getValue());
                        setCurrentKey(key);
@@ -464,43 +401,11 @@ public abstract class AbstractStreamOperator<OUT>
                return stateHandler.getCurrentKey();
        }
 
-       public KeyedStateStore getKeyedStateStore() {
+       public Optional<KeyedStateStore> getKeyedStateStore() {
                if (stateHandler == null) {
-                       return null;
+                       return Optional.empty();
                }
-               return stateHandler.getKeyedStateStore().orElse(null);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Context and chaining properties
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public final void setChainingStrategy(ChainingStrategy strategy) {
-               this.chainingStrategy = strategy;
-       }
-
-       @Override
-       public final ChainingStrategy getChainingStrategy() {
-               return chainingStrategy;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Metrics
-       // 
------------------------------------------------------------------------
-
-       // ------- One input stream
-       public void processLatencyMarker(LatencyMarker latencyMarker) throws 
Exception {
-               reportOrForwardLatencyMarker(latencyMarker);
-       }
-
-       // ------- Two input stream
-       public void processLatencyMarker1(LatencyMarker latencyMarker) throws 
Exception {
-               reportOrForwardLatencyMarker(latencyMarker);
-       }
-
-       public void processLatencyMarker2(LatencyMarker latencyMarker) throws 
Exception {
-               reportOrForwardLatencyMarker(latencyMarker);
+               return stateHandler.getKeyedStateStore();
        }
 
        protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
@@ -536,6 +441,7 @@ public abstract class AbstractStreamOperator<OUT>
         *
         * @param <N> The type of the timer namespace.
         */
+       @VisibleForTesting
        public <K, N> InternalTimerService<N> getInternalTimerService(
                        String name,
                        TypeSerializer<N> namespaceSerializer,
@@ -558,18 +464,12 @@ public abstract class AbstractStreamOperator<OUT>
                output.emitWatermark(mark);
        }
 
-       public void processWatermark1(Watermark mark) throws Exception {
-               input1Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
-               if (newMin > combinedWatermark) {
-                       combinedWatermark = newMin;
-                       processWatermark(new Watermark(combinedWatermark));
+       protected void reportWatermark(Watermark mark, int inputId) throws 
Exception {
+               inputWatermarks[inputId] = mark.getTimestamp();
+               long newMin = mark.getTimestamp();
+               for (long inputWatermark : inputWatermarks) {
+                       newMin = Math.min(inputWatermark, newMin);
                }
-       }
-
-       public void processWatermark2(Watermark mark) throws Exception {
-               input2Watermark = mark.getTimestamp();
-               long newMin = Math.min(input1Watermark, input2Watermark);
                if (newMin > combinedWatermark) {
                        combinedWatermark = newMin;
                        processWatermark(new Watermark(combinedWatermark));
@@ -591,6 +491,16 @@ public abstract class AbstractStreamOperator<OUT>
                return timeServiceManager == null ? 0 : 
timeServiceManager.numEventTimeTimers();
        }
 
+       @Override
+       public void setKeyContextElement1(StreamRecord<?> record) throws 
Exception {
+               throw new IllegalStateException("This method should never be 
called. Use Input class instead");
+       }
+
+       @Override
+       public void setKeyContextElement2(StreamRecord<?> record) throws 
Exception {
+               throw new IllegalStateException("This method should never be 
called. Use Input class instead");
+       }
+
        protected Optional<InternalTimeServiceManager<?>> 
getTimeServiceManager() {
                return Optional.ofNullable(timeServiceManager);
        }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
index 1f6f1a0..5aaffcc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorParameters.java
@@ -25,9 +25,9 @@ import 
org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 /**
- * Helper  class to construct {@link StreamOperatorBase}. Wraps couple of 
internal parameters
- * to simplify for users construction of classes extending {@link 
StreamOperatorBase} and to
- * allow for backward compatible changes in the {@link StreamOperatorBase}'s 
constructor.
+ * Helper  class to construct {@link AbstractStreamOperatorV2}. Wraps couple 
of internal parameters
+ * to simplify for users construction of classes extending {@link 
AbstractStreamOperatorV2} and to
+ * allow for backward compatible changes in the {@link 
AbstractStreamOperatorV2}'s constructor.
  *
  * @param <OUT> The output type of an operator that will be constructed using 
{@link StreamOperatorParameters}.
  */
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index fc0144b..88c4f9a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -30,9 +30,12 @@ import 
org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
 import org.apache.flink.streaming.runtime.io.InputStatus;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
@@ -68,7 +71,7 @@ public class MultipleInputStreamTaskTest {
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                .addInput(BasicTypeInfo.INT_TYPE_INFO)
                                .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO)
-                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperator())
+                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
                                .build()) {
 
                        long initialTime = 0L;
@@ -98,7 +101,7 @@ public class MultipleInputStreamTaskTest {
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
                                .addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
                                .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperator())
+                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
                                .build()) {
                        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
                        long initialTime = 0L;
@@ -148,7 +151,7 @@ public class MultipleInputStreamTaskTest {
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO, 2)
                                .addInput(BasicTypeInfo.INT_TYPE_INFO, 2)
                                .addInput(BasicTypeInfo.DOUBLE_TYPE_INFO, 2)
-                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperator())
+                               .setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
                                .build()) {
                        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
                        long initialTime = 0L;
@@ -217,7 +220,7 @@ public class MultipleInputStreamTaskTest {
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                               .setupOperatorChain(new DuplicatingOperator())
+                               .setupOperatorChain(new 
DuplicatingOperatorFactory())
                                .chain(new 
OneInputStreamTaskTest.DuplicatingOperator(), 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                                .chain(new 
OneInputStreamTaskTest.DuplicatingOperator(), 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                                .finish()
@@ -246,9 +249,13 @@ public class MultipleInputStreamTaskTest {
                }
        }
 
-       static class DuplicatingOperator extends AbstractStreamOperator<String>
+       static class DuplicatingOperator extends 
AbstractStreamOperatorV2<String>
                implements MultipleInputStreamOperator<String> {
 
+               public DuplicatingOperator(StreamOperatorParameters<String> 
parameters) {
+                       super(parameters, 3);
+               }
+
                @Override
                public List<Input> getInputs() {
                        return Arrays.asList(new DuplicatingInput(), new 
DuplicatingInput(), new DuplicatingInput());
@@ -270,7 +277,7 @@ public class MultipleInputStreamTaskTest {
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                .addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                               .setupOperatorChain(new 
TestBoundedMultipleInputOperator("Operator0"))
+                               .setupOperatorChain(new 
TestBoundedMultipleInputOperatorFactory())
                                .chain(new 
TestBoundedOneInputStreamOperator("Operator1"), 
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()))
                                .finish()
                                .build();
@@ -317,7 +324,7 @@ public class MultipleInputStreamTaskTest {
                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
                                        
.addInput(BasicTypeInfo.STRING_TYPE_INFO)
-                                       
.setupOutputForSingletonOperatorChain(new MapToStringMultipleInputOperator())
+                                       
.setupOutputForSingletonOperatorChain(new 
MapToStringMultipleInputOperatorFactory())
                                        .build()) {
                        ArrayDeque<Object> expectedOutput = new ArrayDeque<>();
 
@@ -352,12 +359,16 @@ public class MultipleInputStreamTaskTest {
        // This must only be used in one test, otherwise the static fields will 
be changed
        // by several tests concurrently
        private static class MapToStringMultipleInputOperator
-                       extends AbstractStreamOperator<String> implements 
MultipleInputStreamOperator<String> {
+                       extends AbstractStreamOperatorV2<String> implements 
MultipleInputStreamOperator<String> {
                private static final long serialVersionUID = 1L;
 
                private boolean openCalled;
                private boolean closeCalled;
 
+               public 
MapToStringMultipleInputOperator(StreamOperatorParameters<String> parameters) {
+                       super(parameters, 3);
+               }
+
                @Override
                public void open() throws Exception {
                        super.open();
@@ -418,5 +429,41 @@ public class MultipleInputStreamTaskTest {
                        return value.toString();
                }
        }
+
+       private static class TestBoundedMultipleInputOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
+               @Override
+               public <T extends StreamOperator<String>> T 
createStreamOperator(StreamOperatorParameters<String> parameters) {
+                       return (T) new 
TestBoundedMultipleInputOperator("Operator0", parameters);
+               }
+
+               @Override
+               public Class<? extends StreamOperator<String>> 
getStreamOperatorClass(ClassLoader classLoader) {
+                       return TestBoundedMultipleInputOperator.class;
+               }
+       }
+
+       private static class DuplicatingOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
+               @Override
+               public <T extends StreamOperator<String>> T 
createStreamOperator(StreamOperatorParameters<String> parameters) {
+                       return (T) new DuplicatingOperator(parameters);
+               }
+
+               @Override
+               public Class<? extends StreamOperator<String>> 
getStreamOperatorClass(ClassLoader classLoader) {
+                       return DuplicatingOperator.class;
+               }
+       }
+
+       private static class MapToStringMultipleInputOperatorFactory extends 
AbstractStreamOperatorFactory<String> {
+               @Override
+               public <T extends StreamOperator<String>> T 
createStreamOperator(StreamOperatorParameters<String> parameters) {
+                       return (T) new 
MapToStringMultipleInputOperator(parameters);
+               }
+
+               @Override
+               public Class<? extends StreamOperator<String>> 
getStreamOperatorClass(ClassLoader classLoader) {
+                       return MapToStringMultipleInputOperator.class;
+               }
+       }
 }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
index 89d242a..e6ebe84 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestBoundedMultipleInputOperator.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.streaming.util;
 
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.BoundedMultiInput;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Arrays;
@@ -30,14 +31,15 @@ import java.util.List;
 /**
  * A test operator class implementing {@link BoundedMultiInput}.
  */
-public class TestBoundedMultipleInputOperator extends 
AbstractStreamOperator<String>
+public class TestBoundedMultipleInputOperator extends 
AbstractStreamOperatorV2<String>
        implements MultipleInputStreamOperator<String>, BoundedMultiInput {
 
        private static final long serialVersionUID = 1L;
 
        private final String name;
 
-       public TestBoundedMultipleInputOperator(String name) {
+       public TestBoundedMultipleInputOperator(String name, 
StreamOperatorParameters<String> parameters) {
+               super(parameters, 3);
                this.name = name;
        }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
index f81bac9..32b6424 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/MultipleInputITCase.java
@@ -21,12 +21,11 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
 import org.apache.flink.streaming.api.operators.Input;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
 import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -84,13 +83,14 @@ public class MultipleInputITCase extends AbstractTestBase {
 
        /**
         * 3 input operator that sums all of it inputs.
-        * TODO: provide non {@link SetupableStreamOperator} variant of {@link 
AbstractStreamOperator}?
-        * TODO: provide non {@link AbstractStreamOperator} seems to 
pre-override processWatermark1/2 and other
-        * methods that are not defined there?
         */
-       public static class SumAllInputOperator extends 
AbstractStreamOperator<Long> implements MultipleInputStreamOperator<Long> {
+       public static class SumAllInputOperator extends 
AbstractStreamOperatorV2<Long> implements MultipleInputStreamOperator<Long> {
                private long sum;
 
+               public SumAllInputOperator(StreamOperatorParameters<Long> 
parameters) {
+                       super(parameters, 3);
+               }
+
                @Override
                public List<Input> getInputs() {
                        return Arrays.asList(
@@ -114,29 +114,15 @@ public class MultipleInputITCase extends AbstractTestBase 
{
        /**
         * Factory for {@link SumAllInputOperator}.
         */
-       public static class SumAllInputOperatorFactory implements 
StreamOperatorFactory<Long> {
-               private ChainingStrategy chainingStrategy;
-
+       public static class SumAllInputOperatorFactory extends 
AbstractStreamOperatorFactory<Long> {
                @Override
                public <T extends StreamOperator<Long>> T 
createStreamOperator(StreamOperatorParameters<Long> parameters) {
-                       SumAllInputOperator sumAllInputOperator = new 
SumAllInputOperator();
-                       
sumAllInputOperator.setup(parameters.getContainingTask(), 
parameters.getStreamConfig(), parameters.getOutput());
-                       return (T) sumAllInputOperator;
-               }
-
-               @Override
-               public void setChainingStrategy(ChainingStrategy 
chainingStrategy) {
-                       this.chainingStrategy = chainingStrategy;
-               }
-
-               @Override
-               public ChainingStrategy getChainingStrategy() {
-                       return chainingStrategy;
+                       return (T) new SumAllInputOperator(parameters);
                }
 
                @Override
                public Class<? extends StreamOperator> 
getStreamOperatorClass(ClassLoader classLoader) {
-                       throw new UnsupportedOperationException();
+                       return SumAllInputOperator.class;
                }
        }
 }

Reply via email to