[FLINK-4907] Add Num-Subtasks/Subtask-Index Parameter to Operator Test Harnesses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e396a5a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e396a5a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e396a5a7

Branch: refs/heads/master
Commit: e396a5a783f32e1eedef4008bf02c10525f65050
Parents: 9dc2635
Author: Aljoscha Krettek <[email protected]>
Authored: Wed Oct 26 13:38:30 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Wed Oct 26 23:26:28 2016 +0200

----------------------------------------------------------------------
 .../StreamOperatorSnapshotRestoreTest.java       |  8 ++++++--
 .../util/AbstractStreamOperatorTestHarness.java  | 19 +++++++++++--------
 .../KeyedOneInputStreamOperatorTestHarness.java  |  8 +++++---
 .../KeyedTwoInputStreamOperatorTestHarness.java  |  8 +++++---
 .../util/OneInputStreamOperatorTestHarness.java  |  8 +++++---
 .../util/TwoInputStreamOperatorTestHarness.java  |  8 +++++---
 .../streaming/util/WindowingTestHarness.java     |  2 +-
 7 files changed, 38 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
index 997ceb7..c02a7c3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java
@@ -68,7 +68,9 @@ public class StreamOperatorSnapshotRestoreTest {
                                                        }
                                                },
                                                
TypeInformation.of(Integer.class),
-                                               MAX_PARALLELISM);
+                                               MAX_PARALLELISM,
+                                               1 /* num subtasks */,
+                                               0 /* subtask index */);
 
                testHarness.open();
 
@@ -92,7 +94,9 @@ public class StreamOperatorSnapshotRestoreTest {
                                        }
                                },
                                TypeInformation.of(Integer.class),
-                               MAX_PARALLELISM);
+                               MAX_PARALLELISM,
+                               1 /* num subtasks */,
+                               0 /* subtask index */);
 
                testHarness.initializeState(handles);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7189ce0..dfc0af0 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -66,8 +66,6 @@ import static org.mockito.Mockito.*;
  */
 public class AbstractStreamOperatorTestHarness<OUT> {
 
-       protected final static int DEFAULT_MAX_PARALLELISM = 1;
-
        final protected StreamOperator<OUT> operator;
 
        final protected ConcurrentLinkedQueue<Object> outputList;
@@ -80,6 +78,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
        final protected StreamTask<?, ?> mockTask;
 
+       final Environment environment;
+
        ClosableRegistry closableRegistry;
 
        // use this as default for tests
@@ -97,7 +97,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
        public AbstractStreamOperatorTestHarness(
                        StreamOperator<OUT> operator,
-                       int maxParallelism) throws Exception {
+                       int maxParallelism,
+                       int numSubtasks,
+                       int subtaskIndex) throws Exception {
                this.operator = operator;
                this.outputList = new ConcurrentLinkedQueue<>();
                Configuration underlyingConfig = new Configuration();
@@ -107,7 +109,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                this.closableRegistry = new ClosableRegistry();
                this.checkpointLock = new Object();
 
-               final Environment env = new MockEnvironment(
+               environment = new MockEnvironment(
                                "MockTask",
                                3 * 1024 * 1024,
                                new MockInputSplitProvider(),
@@ -115,7 +117,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                                underlyingConfig,
                                executionConfig,
                                maxParallelism,
-                               1, 0);
+                               numSubtasks,
+                               subtaskIndex);
 
                mockTask = mock(StreamTask.class);
                processingTimeService = new TestProcessingTimeService();
@@ -125,7 +128,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
                when(mockTask.getConfiguration()).thenReturn(config);
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
-               when(mockTask.getEnvironment()).thenReturn(env);
+               when(mockTask.getEnvironment()).thenReturn(environment);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
                
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
                
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
@@ -159,9 +162,9 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                                        final Collection<OperatorStateHandle> 
stateHandles = (Collection<OperatorStateHandle>) 
invocationOnMock.getArguments()[1];
                                        OperatorStateBackend osb;
                                        if (null == stateHandles) {
-                                               osb = 
stateBackend.createOperatorStateBackend(env, 
operator.getClass().getSimpleName());
+                                               osb = 
stateBackend.createOperatorStateBackend(environment, 
operator.getClass().getSimpleName());
                                        } else {
-                                               osb = 
stateBackend.restoreOperatorStateBackend(env, 
operator.getClass().getSimpleName(), stateHandles);
+                                               osb = 
stateBackend.restoreOperatorStateBackend(environment, 
operator.getClass().getSimpleName(), stateHandles);
                                        }
                                        
mockTask.getCancelables().registerClosable(osb);
                                        return osb;

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 25563a3..7d87eb8 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -65,8 +65,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                        OneInputStreamOperator<IN, OUT> operator,
                        final KeySelector<IN, K> keySelector,
                        TypeInformation<K> keyType,
-                       int maxParallelism) throws Exception {
-               super(operator, maxParallelism);
+                       int maxParallelism,
+                       int numSubtasks,
+                       int subtaskIndex) throws Exception {
+               super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
                ClosureCleaner.clean(keySelector, false);
                config.setStatePartitioner(0, keySelector);
@@ -80,7 +82,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                        OneInputStreamOperator<IN, OUT> operator,
                        final KeySelector<IN, K> keySelector,
                        TypeInformation<K> keyType) throws Exception {
-               this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM);
+               this(operator, keySelector, keyType, 1, 1, 0);
        }
 
        private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
index 1a01ea3..0aa91d9 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java
@@ -57,8 +57,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
                        KeySelector<IN1, K> keySelector1,
                        KeySelector<IN2, K> keySelector2,
                        TypeInformation<K> keyType,
-                       int maxParallelism) throws Exception {
-               super(operator, maxParallelism);
+                       int maxParallelism,
+                       int numSubtasks,
+                       int subtaskIndex) throws Exception {
+               super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
                ClosureCleaner.clean(keySelector1, false);
                ClosureCleaner.clean(keySelector2, false);
@@ -74,7 +76,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, 
IN2, OUT>
                        final KeySelector<IN1, K> keySelector1,
                        final KeySelector<IN2, K> keySelector2,
                        TypeInformation<K> keyType) throws Exception {
-               this(operator, keySelector1, keySelector2, keyType, 
DEFAULT_MAX_PARALLELISM);
+               this(operator, keySelector1, keySelector2, keyType, 1, 1, 0);
        }
 
        private void setupMockTaskCreateKeyedBackend() {

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8be9c63..105922b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -36,13 +36,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
        private final OneInputStreamOperator<IN, OUT> oneInputOperator;
 
        public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, 
OUT> operator) throws Exception {
-               this(operator, DEFAULT_MAX_PARALLELISM);
+               this(operator, 1, 1, 0);
        }
 
        public OneInputStreamOperatorTestHarness(
                        OneInputStreamOperator<IN, OUT> operator,
-                       int maxParallelism) throws Exception {
-               super(operator, maxParallelism);
+                       int maxParallelism,
+                       int numTubtasks,
+                       int subtaskIndex) throws Exception {
+               super(operator, maxParallelism, numTubtasks, subtaskIndex);
 
                this.oneInputOperator = operator;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index c6f6918..90bdcb2 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -35,13 +35,15 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, 
OUT>extends AbstractStr
        private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator;
 
        public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, 
IN2, OUT> operator) throws Exception {
-               this(operator, DEFAULT_MAX_PARALLELISM);
+               this(operator, 1, 1, 0);
        }
                
        public TwoInputStreamOperatorTestHarness(
                        TwoInputStreamOperator<IN1, IN2, OUT> operator,
-                       int maxParallelism) throws Exception {
-               super(operator, maxParallelism);
+                       int maxParallelism,
+                       int numSubtasks,
+                       int subtaskIndex) throws Exception {
+               super(operator, maxParallelism, numSubtasks, subtaskIndex);
 
                this.twoInputOperator = operator;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index db3a89c..25deb54 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -76,7 +76,7 @@ public class WindowingTestHarness<K, IN, W extends Window> {
                                trigger,
                                allowedLateness);
 
-               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1);
+               testHarness = new 
KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType);
        }
 
        /**

Reply via email to