[FLINK-4907] Add Num-Subtasks/Subtask-Index Parameter to Operator Test Harnesses
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e396a5a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e396a5a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e396a5a7 Branch: refs/heads/master Commit: e396a5a783f32e1eedef4008bf02c10525f65050 Parents: 9dc2635 Author: Aljoscha Krettek <[email protected]> Authored: Wed Oct 26 13:38:30 2016 +0200 Committer: Aljoscha Krettek <[email protected]> Committed: Wed Oct 26 23:26:28 2016 +0200 ---------------------------------------------------------------------- .../StreamOperatorSnapshotRestoreTest.java | 8 ++++++-- .../util/AbstractStreamOperatorTestHarness.java | 19 +++++++++++-------- .../KeyedOneInputStreamOperatorTestHarness.java | 8 +++++--- .../KeyedTwoInputStreamOperatorTestHarness.java | 8 +++++--- .../util/OneInputStreamOperatorTestHarness.java | 8 +++++--- .../util/TwoInputStreamOperatorTestHarness.java | 8 +++++--- .../streaming/util/WindowingTestHarness.java | 2 +- 7 files changed, 38 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 997ceb7..c02a7c3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -68,7 +68,9 @@ public class StreamOperatorSnapshotRestoreTest { } }, TypeInformation.of(Integer.class), - MAX_PARALLELISM); + MAX_PARALLELISM, + 1 /* num subtasks */, + 0 /* subtask index */); testHarness.open(); @@ -92,7 +94,9 @@ public class StreamOperatorSnapshotRestoreTest { } }, TypeInformation.of(Integer.class), - MAX_PARALLELISM); + MAX_PARALLELISM, + 1 /* num subtasks */, + 0 /* subtask index */); testHarness.initializeState(handles); http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 7189ce0..dfc0af0 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -66,8 +66,6 @@ import static org.mockito.Mockito.*; */ public class AbstractStreamOperatorTestHarness<OUT> { - protected final static int DEFAULT_MAX_PARALLELISM = 1; - final protected StreamOperator<OUT> operator; final protected ConcurrentLinkedQueue<Object> outputList; @@ -80,6 +78,8 @@ public class AbstractStreamOperatorTestHarness<OUT> { final protected StreamTask<?, ?> mockTask; + final Environment environment; + ClosableRegistry closableRegistry; // use this as default for tests @@ -97,7 +97,9 @@ public class AbstractStreamOperatorTestHarness<OUT> { public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, - int maxParallelism) throws Exception { + int maxParallelism, + int numSubtasks, + int subtaskIndex) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); Configuration underlyingConfig = new Configuration(); @@ -107,7 +109,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { this.closableRegistry = new ClosableRegistry(); this.checkpointLock = new Object(); - final Environment env = new MockEnvironment( + environment = new MockEnvironment( "MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), @@ -115,7 +117,8 @@ public class AbstractStreamOperatorTestHarness<OUT> { underlyingConfig, executionConfig, maxParallelism, - 1, 0); + numSubtasks, + subtaskIndex); mockTask = mock(StreamTask.class); processingTimeService = new TestProcessingTimeService(); @@ -125,7 +128,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { when(mockTask.getCheckpointLock()).thenReturn(checkpointLock); when(mockTask.getConfiguration()).thenReturn(config); when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); - when(mockTask.getEnvironment()).thenReturn(env); + when(mockTask.getEnvironment()).thenReturn(environment); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); when(mockTask.getCancelables()).thenReturn(this.closableRegistry); @@ -159,9 +162,9 @@ public class AbstractStreamOperatorTestHarness<OUT> { final Collection<OperatorStateHandle> stateHandles = (Collection<OperatorStateHandle>) invocationOnMock.getArguments()[1]; OperatorStateBackend osb; if (null == stateHandles) { - osb = stateBackend.createOperatorStateBackend(env, operator.getClass().getSimpleName()); + osb = stateBackend.createOperatorStateBackend(environment, operator.getClass().getSimpleName()); } else { - osb = stateBackend.restoreOperatorStateBackend(env, operator.getClass().getSimpleName(), stateHandles); + osb = stateBackend.restoreOperatorStateBackend(environment, operator.getClass().getSimpleName(), stateHandles); } mockTask.getCancelables().registerClosable(osb); return osb; http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 25563a3..7d87eb8 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -65,8 +65,10 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> OneInputStreamOperator<IN, OUT> operator, final KeySelector<IN, K> keySelector, TypeInformation<K> keyType, - int maxParallelism) throws Exception { - super(operator, maxParallelism); + int maxParallelism, + int numSubtasks, + int subtaskIndex) throws Exception { + super(operator, maxParallelism, numSubtasks, subtaskIndex); ClosureCleaner.clean(keySelector, false); config.setStatePartitioner(0, keySelector); @@ -80,7 +82,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> OneInputStreamOperator<IN, OUT> operator, final KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception { - this(operator, keySelector, keyType, DEFAULT_MAX_PARALLELISM); + this(operator, keySelector, keyType, 1, 1, 0); } private void setupMockTaskCreateKeyedBackend() { http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java index 1a01ea3..0aa91d9 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedTwoInputStreamOperatorTestHarness.java @@ -57,8 +57,10 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType, - int maxParallelism) throws Exception { - super(operator, maxParallelism); + int maxParallelism, + int numSubtasks, + int subtaskIndex) throws Exception { + super(operator, maxParallelism, numSubtasks, subtaskIndex); ClosureCleaner.clean(keySelector1, false); ClosureCleaner.clean(keySelector2, false); @@ -74,7 +76,7 @@ public class KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> final KeySelector<IN1, K> keySelector1, final KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception { - this(operator, keySelector1, keySelector2, keyType, DEFAULT_MAX_PARALLELISM); + this(operator, keySelector1, keySelector2, keyType, 1, 1, 0); } private void setupMockTaskCreateKeyedBackend() { http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 8be9c63..105922b 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -36,13 +36,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> private final OneInputStreamOperator<IN, OUT> oneInputOperator; public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) throws Exception { - this(operator, DEFAULT_MAX_PARALLELISM); + this(operator, 1, 1, 0); } public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, - int maxParallelism) throws Exception { - super(operator, maxParallelism); + int maxParallelism, + int numTubtasks, + int subtaskIndex) throws Exception { + super(operator, maxParallelism, numTubtasks, subtaskIndex); this.oneInputOperator = operator; } http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java index c6f6918..90bdcb2 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java @@ -35,13 +35,15 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT>extends AbstractStr private final TwoInputStreamOperator<IN1, IN2, OUT> twoInputOperator; public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) throws Exception { - this(operator, DEFAULT_MAX_PARALLELISM); + this(operator, 1, 1, 0); } public TwoInputStreamOperatorTestHarness( TwoInputStreamOperator<IN1, IN2, OUT> operator, - int maxParallelism) throws Exception { - super(operator, maxParallelism); + int maxParallelism, + int numSubtasks, + int subtaskIndex) throws Exception { + super(operator, maxParallelism, numSubtasks, subtaskIndex); this.twoInputOperator = operator; } http://git-wip-us.apache.org/repos/asf/flink/blob/e396a5a7/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java index db3a89c..25deb54 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java @@ -76,7 +76,7 @@ public class WindowingTestHarness<K, IN, W extends Window> { trigger, allowedLateness); - testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType, 1); + testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, keyType); } /**
