[FLINK-4877] Refactor Operator TestHarnesses to use Common Base Class

This also introduces KeyedTwoInputStreamOperatorTestHarness which
is similar to KeyedOneInputStreamOperatorTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f305baab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f305baab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f305baab

Branch: refs/heads/master
Commit: f305baabbcd9b035b696b8b7a4334deaf1b28d23
Parents: 0859a69
Author: Aljoscha Krettek <[email protected]>
Authored: Thu Sep 29 16:04:29 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../runtime/streamrecord/StreamRecord.java      |   2 +-
 .../util/AbstractStreamOperatorTestHarness.java | 366 +++++++++++++++++++
 .../KeyedOneInputStreamOperatorTestHarness.java |  18 +-
 .../KeyedTwoInputStreamOperatorTestHarness.java | 144 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java | 328 +----------------
 .../util/TwoInputStreamOperatorTestHarness.java | 130 +------
 6 files changed, 537 insertions(+), 451 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 9f75161..da606a9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -171,7 +171,7 @@ public final class StreamRecord<T> extends StreamElement {
                else if (o != null && getClass() == o.getClass()) {
                        StreamRecord<?> that = (StreamRecord<?>) o;
                        return this.hasTimestamp == that.hasTimestamp &&
-                                       this.timestamp == that.timestamp &&
+                                       (!this.hasTimestamp || this.timestamp 
== that.timestamp) &&
                                        (this.value == null ? that.value == 
null : this.value.equals(that.value));
                }
                else {

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
new file mode 100644
index 0000000..a61d995
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.ClosableRegistry;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+/**
+ * Base class for {@code AbstractStreamOperator} test harnesses.
+ */
+public class AbstractStreamOperatorTestHarness<OUT> {
+
+       public static final int MAX_PARALLELISM = 10;
+
+       final protected StreamOperator<OUT> operator;
+
+       final protected ConcurrentLinkedQueue<Object> outputList;
+
+       final protected StreamConfig config;
+
+       final protected ExecutionConfig executionConfig;
+
+       final protected TestProcessingTimeService processingTimeService;
+
+       final protected StreamTask<?, ?> mockTask;
+
+       ClosableRegistry closableRegistry;
+
+       // use this as default for tests
+       protected AbstractStateBackend stateBackend = new MemoryStateBackend();
+
+       private final Object checkpointLock;
+
+       /**
+        * Whether setup() was called on the operator. This is reset when 
calling close().
+        */
+       private boolean setupCalled = false;
+       private boolean initializeCalled = false;
+
+       private volatile boolean wasFailedExternally = false;
+
+       public AbstractStreamOperatorTestHarness(StreamOperator<OUT> operator) 
throws Exception {
+               this(operator, new ExecutionConfig());
+       }
+
+       public AbstractStreamOperatorTestHarness(
+                       StreamOperator<OUT> operator,
+                       ExecutionConfig executionConfig) throws Exception {
+               this.operator = operator;
+               this.outputList = new ConcurrentLinkedQueue<>();
+               Configuration underlyingConfig = new Configuration();
+               this.config = new StreamConfig(underlyingConfig);
+               this.config.setCheckpointingEnabled(true);
+               this.executionConfig = executionConfig;
+               this.closableRegistry = new ClosableRegistry();
+               this.checkpointLock = new Object();
+
+               final Environment env = new MockEnvironment(
+                               "MockTask",
+                               3 * 1024 * 1024,
+                               new MockInputSplitProvider(),
+                               1024,
+                               underlyingConfig,
+                               executionConfig,
+                               MAX_PARALLELISM,
+                               1, 0);
+
+               mockTask = mock(StreamTask.class);
+               processingTimeService = new TestProcessingTimeService();
+               processingTimeService.setCurrentTime(0);
+
+               when(mockTask.getName()).thenReturn("Mock Task");
+               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+               when(mockTask.getConfiguration()).thenReturn(config);
+               
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
+               when(mockTask.getEnvironment()).thenReturn(env);
+               when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
+               
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
+               
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
+
+               doAnswer(new Answer<Void>() {
+                       @Override
+                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
+                               wasFailedExternally = true;
+                               return null;
+                       }
+               }).when(mockTask).handleAsyncException(any(String.class), 
any(Throwable.class));
+
+               try {
+                       doAnswer(new Answer<CheckpointStreamFactory>() {
+                               @Override
+                               public CheckpointStreamFactory 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+                                       final StreamOperator<?> operator = 
(StreamOperator<?>) invocationOnMock.getArguments()[0];
+                                       return 
stateBackend.createStreamFactory(new JobID(), 
operator.getClass().getSimpleName());
+                               }
+                       
}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
+               } catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+
+               try {
+                       doAnswer(new Answer<OperatorStateBackend>() {
+                               @Override
+                               public OperatorStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+                                       final StreamOperator<?> operator = 
(StreamOperator<?>) invocationOnMock.getArguments()[0];
+                                       final Collection<OperatorStateHandle> 
stateHandles = (Collection<OperatorStateHandle>) 
invocationOnMock.getArguments()[1];
+                                       OperatorStateBackend osb;
+                                       if (null == stateHandles) {
+                                               osb = 
stateBackend.createOperatorStateBackend(env, 
operator.getClass().getSimpleName());
+                                       } else {
+                                               osb = 
stateBackend.restoreOperatorStateBackend(env, 
operator.getClass().getSimpleName(), stateHandles);
+                                       }
+                                       
mockTask.getCancelables().registerClosable(osb);
+                                       return osb;
+                               }
+                       
}).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), 
any(Collection.class));
+               } catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+
+               doAnswer(new Answer<ProcessingTimeService>() {
+                       @Override
+                       public ProcessingTimeService answer(InvocationOnMock 
invocation) throws Throwable {
+                               return processingTimeService;
+                       }
+               }).when(mockTask).getProcessingTimeService();
+       }
+
+       public void setStateBackend(AbstractStateBackend stateBackend) {
+               this.stateBackend = stateBackend;
+       }
+
+       public Object getCheckpointLock() {
+               return mockTask.getCheckpointLock();
+       }
+
+       public Environment getEnvironment() {
+               return this.mockTask.getEnvironment();
+       }
+
+       /**
+        * Get all the output from the task. This contains StreamRecords and 
Events interleaved.
+        */
+       public ConcurrentLinkedQueue<Object> getOutput() {
+               return outputList;
+       }
+
+       /**
+        * Get all the output from the task and clear the output buffer.
+        * This contains only StreamRecords.
+        */
+       @SuppressWarnings("unchecked")
+       public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
+               List<StreamRecord<? extends OUT>> resultElements = new 
LinkedList<>();
+               for (Object e: getOutput()) {
+                       if (e instanceof StreamRecord) {
+                               resultElements.add((StreamRecord<OUT>) e);
+                       }
+               }
+               return resultElements;
+       }
+
+       /**
+        * Calls
+        * {@link StreamOperator#setup(StreamTask, StreamConfig, Output)} ()}
+        */
+       public void setup() throws Exception {
+               operator.setup(mockTask, config, new MockOutput());
+               setupCalled = true;
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
+        * if it was not called before.
+        */
+       public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
+               if (!setupCalled) {
+                       setup();
+               }
+               operator.initializeState(operatorStateHandles);
+               initializeCalled = true;
+       }
+
+
+       /**
+        * Calls {@link StreamOperator#open()}. This also
+        * calls {@link StreamOperator#setup(StreamTask, StreamConfig, Output)}
+        * if it was not called before.
+        */
+       public void open() throws Exception {
+               if (!initializeCalled) {
+                       initializeState(null);
+               }
+               operator.open();
+       }
+
+       /**
+        * Calls {@link StreamOperator#snapshotState(long, long, 
CheckpointStreamFactory)}.
+        */
+       public OperatorSnapshotResult snapshot(long checkpointId, long 
timestamp) throws Exception {
+
+               CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(
+                               new JobID(),
+                               "test_op");
+
+               return operator.snapshotState(checkpointId, timestamp, 
streamFactory);
+       }
+
+       /**
+        * Calls {@link 
StreamCheckpointedOperator#snapshotState(FSDataOutputStream, long, long)} if
+        * the operator implements this interface.
+        */
+       @Deprecated
+       public StreamStateHandle snapshotLegacy(long checkpointId, long 
timestamp) throws Exception {
+
+               CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
+                               new JobID(),
+                               
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
+               if(operator instanceof StreamCheckpointedOperator) {
+                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
+                       return outStream.closeAndGetHandle();
+               } else {
+                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
+               }
+       }
+
+       /**
+        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
 ()}
+        */
+       public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
+               operator.notifyOfCompletedCheckpoint(checkpointId);
+       }
+
+       /**
+        * Calls {@link 
StreamCheckpointedOperator#restoreState(FSDataInputStream)} if
+        * the operator implements this interface.
+        */     @Deprecated
+       public void restore(StreamStateHandle snapshot) throws Exception {
+               if(operator instanceof StreamCheckpointedOperator) {
+                       try (FSDataInputStream in = snapshot.openInputStream()) 
{
+                               ((StreamCheckpointedOperator) 
operator).restoreState(in);
+                       }
+               } else {
+                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
+               }
+       }
+
+       /**
+        * Calls close and dispose on the operator.
+        */
+       public void close() throws Exception {
+               operator.close();
+               operator.dispose();
+               if (processingTimeService != null) {
+                       processingTimeService.shutdownService();
+               }
+               setupCalled = false;
+       }
+
+       public void setProcessingTime(long time) throws Exception {
+               processingTimeService.setCurrentTime(time);
+       }
+
+       public long getProcessingTime() {
+               return processingTimeService.getCurrentProcessingTime();
+       }
+
+       public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
+               this.config.setTimeCharacteristic(timeCharacteristic);
+       }
+
+       public TimeCharacteristic getTimeCharacteristic() {
+               return this.config.getTimeCharacteristic();
+       }
+
+       public boolean wasFailedExternally() {
+               return wasFailedExternally;
+       }
+
+       private class MockOutput implements Output<StreamRecord<OUT>> {
+
+               private TypeSerializer<OUT> outputSerializer;
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       outputList.add(mark);
+               }
+
+               @Override
+               public void emitLatencyMarker(LatencyMarker latencyMarker) {
+                       outputList.add(latencyMarker);
+               }
+
+               @Override
+               public void collect(StreamRecord<OUT> element) {
+                       if (outputSerializer == null) {
+                               outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
+                       }
+                       if (element.hasTimestamp()) {
+                               outputList.add(new 
StreamRecord<>(outputSerializer.copy(element.getValue()),element.getTimestamp()));
+                       } else {
+                               outputList.add(new 
StreamRecord<>(outputSerializer.copy(element.getValue())));
+                       }
+               }
+
+               @Override
+               public void close() {
+                       // ignore
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 9c9d11b..99527e7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -50,7 +50,6 @@ import static org.mockito.Mockito.doAnswer;
 /**
  * Extension of {@link OneInputStreamOperatorTestHarness} that allows the 
operator to get
  * a {@link KeyedStateBackend}.
- *
  */
 public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
                extends OneInputStreamOperatorTestHarness<IN, OUT> {
@@ -171,7 +170,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
        }
 
        /**
-        * 
+        *
         */
        @Override
        public void restore(StreamStateHandle snapshot) throws Exception {
@@ -189,21 +188,12 @@ public class KeyedOneInputStreamOperatorTestHarness<K, 
IN, OUT>
                }
        }
 
-       /**
-        * Calls close and dispose on the operator.
-        */
-       public void close() throws Exception {
-               super.close();
-               if (keyedStateBackend != null) {
-                       keyedStateBackend.dispose();
-               }
-       }
-
        @Override
        public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
-               if (null != operatorStateHandles) {
-                       this.restoredKeyedState = 
operatorStateHandles.getManagedKeyedState();
+               if (operatorStateHandles != null) {
+                       restoredKeyedState = 
operatorStateHandles.getManagedKeyedState();
                }
+
                super.initializeState(operatorStateHandles);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
new file mode 100644
index 0000000..2e9885c
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.ClosureCleaner;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.Collection;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.anyInt;
+import static org.mockito.Mockito.doAnswer;
+
+/**
+ * Extension of {@link TwoInputStreamOperatorTestHarness} that allows the 
operator to get
+ * a {@link KeyedStateBackend}.
+ */
+public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT>
+               extends TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
+
+       // in case the operator creates one we store it here so that we
+       // can snapshot its state
+       private AbstractKeyedStateBackend<?> keyedStateBackend = null;
+
+       // when we restore we keep the state here so that we can call restore
+       // when the operator requests the keyed state backend
+       private Collection<KeyGroupsStateHandle> restoredKeyedState = null;
+
+       public KeyedTwoInputStreamOperatorTestHarness(
+                       TwoInputStreamOperator<IN1, IN2, OUT> operator,
+                       final KeySelector<IN1, K> keySelector1,
+                       final KeySelector<IN2, K> keySelector2,
+                       TypeInformation<K> keyType) throws Exception {
+               super(operator);
+
+               ClosureCleaner.clean(keySelector1, false);
+               ClosureCleaner.clean(keySelector2, false);
+               config.setStatePartitioner(0, keySelector1);
+               config.setStatePartitioner(1, keySelector2);
+               
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+               config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+               setupMockTaskCreateKeyedBackend();
+       }
+
+       public KeyedTwoInputStreamOperatorTestHarness(
+                       TwoInputStreamOperator<IN1, IN2, OUT> operator,
+                       ExecutionConfig executionConfig,
+                       KeySelector<IN1, K> keySelector1,
+                       KeySelector<IN2, K> keySelector2,
+                       TypeInformation<K> keyType) throws Exception {
+               super(operator, executionConfig);
+
+               ClosureCleaner.clean(keySelector1, false);
+               ClosureCleaner.clean(keySelector2, false);
+               config.setStatePartitioner(0, keySelector1);
+               config.setStatePartitioner(1, keySelector2);
+               
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+               config.setNumberOfKeyGroups(MAX_PARALLELISM);
+
+               setupMockTaskCreateKeyedBackend();
+       }
+
+       private void setupMockTaskCreateKeyedBackend() {
+
+               try {
+                       doAnswer(new Answer<KeyedStateBackend>() {
+                               @Override
+                               public KeyedStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
+
+                                       final TypeSerializer keySerializer = 
(TypeSerializer) invocationOnMock.getArguments()[0];
+                                       final int numberOfKeyGroups = (Integer) 
invocationOnMock.getArguments()[1];
+                                       final KeyGroupRange keyGroupRange = 
(KeyGroupRange) invocationOnMock.getArguments()[2];
+
+                                       if(keyedStateBackend != null) {
+                                               keyedStateBackend.close();
+                                       }
+
+                                       if (restoredKeyedState == null) {
+                                               keyedStateBackend = 
stateBackend.createKeyedStateBackend(
+                                                               
mockTask.getEnvironment(),
+                                                               new JobID(),
+                                                               "test_op",
+                                                               keySerializer,
+                                                               
numberOfKeyGroups,
+                                                               keyGroupRange,
+                                                               
mockTask.getEnvironment().getTaskKvStateRegistry());
+                                               return keyedStateBackend;
+                                       } else {
+                                               keyedStateBackend = 
stateBackend.restoreKeyedStateBackend(
+                                                               
mockTask.getEnvironment(),
+                                                               new JobID(),
+                                                               "test_op",
+                                                               keySerializer,
+                                                               
numberOfKeyGroups,
+                                                               keyGroupRange,
+                                                               
restoredKeyedState,
+                                                               
mockTask.getEnvironment().getTaskKvStateRegistry());
+                                               restoredKeyedState = null;
+                                               return keyedStateBackend;
+                                       }
+                               }
+                       
}).when(mockTask).createKeyedStateBackend(any(TypeSerializer.class), anyInt(), 
any(KeyGroupRange.class));
+               } catch (Exception e) {
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+       }
+
+       @Override
+       public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
+               if (restoredKeyedState != null) {
+                       restoredKeyedState = 
operatorStateHandles.getManagedKeyedState();
+               }
+
+               super.initializeState(operatorStateHandles);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 5b277bf..a3e095a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -18,89 +18,23 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.runtime.state.OperatorStateBackend;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.InstantiationUtil;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.RunnableFuture;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * A test harness for testing a {@link OneInputStreamOperator}.
  *
- * <p>
- * This mock task provides the operator with a basic runtime context and 
allows pushing elements
+ * <p>This mock task provides the operator with a basic runtime context and 
allows pushing elements
  * and watermarks into the operator. {@link java.util.Deque}s containing the 
emitted elements
  * and watermarks can be retrieved. You are free to modify these.
  */
-public class OneInputStreamOperatorTestHarness<IN, OUT> {
-
-       public static final int MAX_PARALLELISM = 10;
-
-       final OneInputStreamOperator<IN, OUT> operator;
-
-       final ConcurrentLinkedQueue<Object> outputList;
-
-       final StreamConfig config;
-
-       final ExecutionConfig executionConfig;
-
-       final TestProcessingTimeService processingTimeService;
-
-       StreamTask<?, ?> mockTask;
-
-       ClosableRegistry closableRegistry;
-
-       // use this as default for tests
-       AbstractStateBackend stateBackend = new MemoryStateBackend();
+public class OneInputStreamOperatorTestHarness<IN, OUT>
+               extends AbstractStreamOperatorTestHarness<OUT> {
 
-       private final Object checkpointLock;
-
-       /**
-        * Whether setup() was called on the operator. This is reset when 
calling close().
-        */
-       private boolean setupCalled = false;
-       private boolean initializeCalled = false;
-
-       private volatile boolean wasFailedExternally = false;
+       private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) throws Exception {
                this(operator, new ExecutionConfig());
@@ -109,268 +43,24 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
                        ExecutionConfig executionConfig) throws Exception {
-               this.operator = operator;
-               this.outputList = new ConcurrentLinkedQueue<>();
-               Configuration underlyingConfig = new Configuration();
-               this.config = new StreamConfig(underlyingConfig);
-               this.config.setCheckpointingEnabled(true);
-               this.executionConfig = executionConfig;
-               this.closableRegistry = new ClosableRegistry();
-
-               this.checkpointLock = new Object();
-
-               final Environment env = new MockEnvironment("MockTwoInputTask", 
3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, 
executionConfig, MAX_PARALLELISM, 1, 0);
-               mockTask = mock(StreamTask.class);
-               processingTimeService = new TestProcessingTimeService();
-               processingTimeService.setCurrentTime(0);
-
-               when(mockTask.getName()).thenReturn("Mock Task");
-               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-               when(mockTask.getConfiguration()).thenReturn(config);
-               
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
-               when(mockTask.getEnvironment()).thenReturn(env);
-               when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-               
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
-               
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
-               doAnswer(new Answer<Void>() {
-                       @Override
-                       public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                               wasFailedExternally = true;
-                               return null;
-                       }
-               }).when(mockTask).handleAsyncException(any(String.class), 
any(Throwable.class));
-
-               try {
-                       doAnswer(new Answer<CheckpointStreamFactory>() {
-                               @Override
-                               public CheckpointStreamFactory 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-
-                                       final StreamOperator<?> operator = 
(StreamOperator<?>) invocationOnMock.getArguments()[0];
-                                       return 
stateBackend.createStreamFactory(new JobID(), 
operator.getClass().getSimpleName());
-                               }
-                       
}).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
-               } catch (Exception e) {
-                       throw new RuntimeException(e.getMessage(), e);
-               }
-
-               try {
-                       doAnswer(new Answer<OperatorStateBackend>() {
-                               @Override
-                               public OperatorStateBackend 
answer(InvocationOnMock invocationOnMock) throws Throwable {
-                                       final StreamOperator<?> operator = 
(StreamOperator<?>) invocationOnMock.getArguments()[0];
-                                       final Collection<OperatorStateHandle> 
stateHandles = (Collection<OperatorStateHandle>) 
invocationOnMock.getArguments()[1];
-                                       OperatorStateBackend osb;
-                                       if (null == stateHandles) {
-                                               osb = 
stateBackend.createOperatorStateBackend(env, 
operator.getClass().getSimpleName());
-                                       } else {
-                                               osb = 
stateBackend.restoreOperatorStateBackend(env, 
operator.getClass().getSimpleName(), stateHandles);
-                                       }
-                                       
mockTask.getCancelables().registerClosable(osb);
-                                       return osb;
-                               }
-                       
}).when(mockTask).createOperatorStateBackend(any(StreamOperator.class), 
any(Collection.class));
-               } catch (Exception e) {
-                       throw new RuntimeException(e.getMessage(), e);
-               }
-
-               doAnswer(new Answer<ProcessingTimeService>() {
-                       @Override
-                       public ProcessingTimeService answer(InvocationOnMock 
invocation) throws Throwable {
-                               return processingTimeService;
-                       }
-               }).when(mockTask).getProcessingTimeService();
-       }
-
-       public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
-               this.config.setTimeCharacteristic(timeCharacteristic);
-       }
-
-       public TimeCharacteristic getTimeCharacteristic() {
-               return this.config.getTimeCharacteristic();
-       }
-
-       public boolean wasFailedExternally() {
-               return wasFailedExternally;
-       }
-
-       public void setStateBackend(AbstractStateBackend stateBackend) {
-               this.stateBackend = stateBackend;
-       }
-
-       public Object getCheckpointLock() {
-               return mockTask.getCheckpointLock();
-       }
-
-       public Environment getEnvironment() {
-               return this.mockTask.getEnvironment();
-       }
-
-       /**
-        * Get all the output from the task. This contains StreamRecords and 
Events interleaved.
-        */
-       public ConcurrentLinkedQueue<Object> getOutput() {
-               return outputList;
-       }
-
-       /**
-        * Get all the output from the task and clear the output buffer.
-        * This contains only StreamRecords.
-        */
-       @SuppressWarnings("unchecked")
-       public List<StreamRecord<? extends OUT>> extractOutputStreamRecords() {
-               List<StreamRecord<? extends OUT>> resultElements = new 
LinkedList<>();
-               for (Object e: getOutput()) {
-                       if (e instanceof StreamRecord) {
-                               resultElements.add((StreamRecord<OUT>) e);
-                       }
-               }
-               return resultElements;
-       }
-
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)} ()}
-        */
-       public void setup() throws Exception {
-               operator.setup(mockTask, config, new MockOutput());
-               setupCalled = true;
-       }
-
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#setup(StreamTask, 
StreamConfig, Output)}
-        * if it was not called before.
-        */
-       public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
-               if (!setupCalled) {
-                       setup();
-               }
-               operator.initializeState(operatorStateHandles);
-               initializeCalled = true;
-       }
-
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}
 if it
-        * was not called before.
-        */
-       public void open() throws Exception {
-               if (!initializeCalled) {
-                       initializeState(null);
-               }
-               operator.open();
-       }
-
-       /**
-        *
-        */
-       public OperatorSnapshotResult snapshot(long checkpointId, long 
timestamp) throws Exception {
-
-               CheckpointStreamFactory streamFactory = 
stateBackend.createStreamFactory(
-                               new JobID(),
-                               "test_op");
-
-               return operator.snapshotState(checkpointId, timestamp, 
streamFactory);
-       }
-
-       /**
-        *
-        */
-       @Deprecated
-       public StreamStateHandle snapshotLegacy(long checkpointId, long 
timestamp) throws Exception {
-
-               CheckpointStreamFactory.CheckpointStateOutputStream outStream = 
stateBackend.createStreamFactory(
-                               new JobID(),
-                               
"test_op").createCheckpointStateOutputStream(checkpointId, timestamp);
-               if(operator instanceof StreamCheckpointedOperator) {
-                       ((StreamCheckpointedOperator) 
operator).snapshotState(outStream, checkpointId, timestamp);
-                       return outStream.closeAndGetHandle();
-               } else {
-                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
-               }
-       }
-
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#notifyOfCompletedCheckpoint(long)}
 ()}
-        */
-       public void notifyOfCompletedCheckpoint(long checkpointId) throws 
Exception {
-               operator.notifyOfCompletedCheckpoint(checkpointId);
-       }
+               super(operator, executionConfig);
 
-       /**
-        *
-        */
-       @Deprecated
-       public void restore(StreamStateHandle snapshot) throws Exception {
-               if(operator instanceof StreamCheckpointedOperator) {
-                       try (FSDataInputStream in = snapshot.openInputStream()) 
{
-                               ((StreamCheckpointedOperator) 
operator).restoreState(in);
-                       }
-               } else {
-                       throw new RuntimeException("Operator is not 
StreamCheckpointedOperator");
-               }
-       }
-
-       /**
-        * Calls close and dispose on the operator.
-        */
-       public void close() throws Exception {
-               operator.close();
-               operator.dispose();
-               if (processingTimeService != null) {
-                       processingTimeService.shutdownService();
-               }
-               setupCalled = false;
+               this.oneInputOperator = operator;
        }
 
        public void processElement(StreamRecord<IN> element) throws Exception {
                operator.setKeyContextElement1(element);
-               operator.processElement(element);
+               oneInputOperator.processElement(element);
        }
 
        public void processElements(Collection<StreamRecord<IN>> elements) 
throws Exception {
                for (StreamRecord<IN> element: elements) {
                        operator.setKeyContextElement1(element);
-                       operator.processElement(element);
-               }
-       }
-
-       public void setProcessingTime(long time) throws Exception {
-               synchronized (checkpointLock) {
-                       processingTimeService.setCurrentTime(time);
+                       oneInputOperator.processElement(element);
                }
        }
 
        public void processWatermark(Watermark mark) throws Exception {
-               operator.processWatermark(mark);
-       }
-
-       private class MockOutput implements Output<StreamRecord<OUT>> {
-
-               private TypeSerializer<OUT> outputSerializer;
-
-               @Override
-               public void emitWatermark(Watermark mark) {
-                       outputList.add(mark);
-               }
-
-               @Override
-               public void emitLatencyMarker(LatencyMarker latencyMarker) {
-                       outputList.add(latencyMarker);
-               }
-
-               @Override
-               public void collect(StreamRecord<OUT> element) {
-                       if (outputSerializer == null) {
-                               outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-                       }
-                       outputList.add(new 
StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
-                                       element.getTimestamp()));
-               }
-
-               @Override
-               public void close() {
-                       // ignore
-               }
+               oneInputOperator.processWatermark(mark);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f305baab/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index 7df6848..95eea98 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -19,26 +19,9 @@
 package org.apache.flink.streaming.util;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.ClosableRegistry;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * A test harness for testing a {@link TwoInputStreamOperator}.
@@ -48,122 +31,35 @@ import static org.mockito.Mockito.when;
  * and watermarks into the operator. {@link java.util.Deque}s containing the 
emitted elements
  * and watermarks can be retrieved. you are free to modify these.
  */
-public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
-
-       TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
-       final ConcurrentLinkedQueue<Object> outputList;
-
-       final ExecutionConfig executionConfig;
-
-       final Object checkpointLock;
+public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends 
AbstractStreamOperatorTestHarness<OUT> {
 
-       final ClosableRegistry closableRegistry;
+       private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
-       boolean initializeCalled = false;
-
-       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator) {
-               this(operator, new StreamConfig(new Configuration()));
+       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator) throws Exception {
+               this(operator, new ExecutionConfig());
        }
                
-       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator, StreamConfig config) {
-               this.operator = operator;
-               this.outputList = new ConcurrentLinkedQueue<Object>();
-               this.executionConfig = new ExecutionConfig();
-               this.checkpointLock = new Object();
-               this.closableRegistry = new ClosableRegistry();
-
-               Environment env = new MockEnvironment("MockTwoInputTask", 3 * 
1024 * 1024, new MockInputSplitProvider(), 1024);
-               StreamTask<?, ?> mockTask = mock(StreamTask.class);
-               when(mockTask.getName()).thenReturn("Mock Task");
-               when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-               when(mockTask.getConfiguration()).thenReturn(config);
-               when(mockTask.getEnvironment()).thenReturn(env);
-               when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-               
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
-
-               operator.setup(mockTask, new StreamConfig(new Configuration()), 
new MockOutput());
-       }
-
-       /**
-        * Get all the output from the task. This contains StreamRecords and 
Events interleaved. Use
-        * {@link 
org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-        * to extract only the StreamRecords.
-        */
-       public ConcurrentLinkedQueue<Object> getOutput() {
-               return outputList;
-       }
+       public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator, ExecutionConfig executionConfig) throws Exception {
+               super(operator, executionConfig);
 
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#initializeState(OperatorStateHandles)}.
-        */
-       public void initializeState(OperatorStateHandles operatorStateHandles) 
throws Exception {
-               operator.initializeState(operatorStateHandles);
-               initializeCalled = true;
-       }
-
-       /**
-        * Calls {@link 
org.apache.flink.streaming.api.operators.StreamOperator#open()}.
-        */
-       public void open() throws Exception {
-               if(!initializeCalled) {
-                       initializeState(mock(OperatorStateHandles.class));
-               }
-
-               operator.open();
-       }
-
-       /**
-        * Calls close on the operator.
-        */
-       public void close() throws Exception {
-               operator.close();
+               this.twoInputOperator = operator;
        }
 
        public void processElement1(StreamRecord<IN1> element) throws Exception 
{
-               operator.processElement1(element);
+               twoInputOperator.setKeyContextElement1(element);
+               twoInputOperator.processElement1(element);
        }
 
        public void processElement2(StreamRecord<IN2> element) throws Exception 
{
-               operator.processElement2(element);
+               twoInputOperator.setKeyContextElement2(element);
+               twoInputOperator.processElement2(element);
        }
 
        public void processWatermark1(Watermark mark) throws Exception {
-               operator.processWatermark1(mark);
+               twoInputOperator.processWatermark1(mark);
        }
 
        public void processWatermark2(Watermark mark) throws Exception {
-               operator.processWatermark2(mark);
-       }
-
-       private class MockOutput implements Output<StreamRecord<OUT>> {
-
-               private TypeSerializer<OUT> outputSerializer;
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public void emitWatermark(Watermark mark) {
-                       outputList.add(mark);
-               }
-
-               @Override
-               public void emitLatencyMarker(LatencyMarker latencyMarker) {
-                       outputList.add(latencyMarker);
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public void collect(StreamRecord<OUT> element) {
-                       if (outputSerializer == null) {
-                               outputSerializer = 
TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-                       }
-                       outputList.add(new 
StreamRecord<>(outputSerializer.copy(element.getValue()),
-                                       element.getTimestamp()));
-               }
-
-               @Override
-               public void close() {
-                       // ignore
-               }
+               twoInputOperator.processWatermark2(mark);
        }
 }

Reply via email to