[FLINK-6796] [tests] Use Environment's class loader in AbstractStreamOperatorTestHarness
Generalize KeyedOneInputStreamOperatorTestHarness Generalize AbstractStreamOperatorTestHarness Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e35c575a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e35c575a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e35c575a Branch: refs/heads/master Commit: e35c575ac41f2bdb3855170be883d0e21aa0379e Parents: 7aad0ec Author: Till Rohrmann <[email protected]> Authored: Wed May 31 15:14:11 2017 +0200 Committer: Tzu-Li (Gordon) Tai <[email protected]> Committed: Tue Jun 13 06:38:16 2017 +0200 ---------------------------------------------------------------------- .../operators/testutils/MockEnvironment.java | 43 +++++++++++++++++++- .../util/AbstractStreamOperatorTestHarness.java | 18 ++++---- .../KeyedOneInputStreamOperatorTestHarness.java | 16 ++++++++ .../util/OneInputStreamOperatorTestHarness.java | 7 +--- 4 files changed, 66 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java index 49175c7..4f0242e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.flink.types.Record; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.Preconditions; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -97,12 +98,23 @@ public class MockEnvironment implements Environment { private final int bufferSize; + private final ClassLoader userCodeClassLoader; + public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) { this(taskName, memorySize, inputSplitProvider, bufferSize, new Configuration(), new ExecutionConfig()); } public MockEnvironment(String taskName, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration taskConfiguration, ExecutionConfig executionConfig) { - this(taskName, memorySize, inputSplitProvider, bufferSize, taskConfiguration, executionConfig, 1, 1, 0); + this( + taskName, + memorySize, + inputSplitProvider, + bufferSize, + taskConfiguration, + executionConfig, + 1, + 1, + 0); } public MockEnvironment( @@ -115,6 +127,31 @@ public class MockEnvironment implements Environment { int maxParallelism, int parallelism, int subtaskIndex) { + this( + taskName, + memorySize, + inputSplitProvider, + bufferSize, + taskConfiguration, + executionConfig, + maxParallelism, + parallelism, + subtaskIndex, + Thread.currentThread().getContextClassLoader()); + + } + + public MockEnvironment( + String taskName, + long memorySize, + MockInputSplitProvider inputSplitProvider, + int bufferSize, + Configuration taskConfiguration, + ExecutionConfig executionConfig, + int maxParallelism, + int parallelism, + int subtaskIndex, + ClassLoader userCodeClassLoader) { this.taskInfo = new TaskInfo(taskName, maxParallelism, subtaskIndex, parallelism, 0); this.jobConfiguration = new Configuration(); this.taskConfiguration = taskConfiguration; @@ -131,6 +168,8 @@ public class MockEnvironment implements Environment { KvStateRegistry registry = new KvStateRegistry(); this.kvStateRegistry = registry.createTaskRegistry(jobID, getJobVertexId()); + + this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader); } @@ -254,7 +293,7 @@ public class MockEnvironment implements Environment { @Override public ClassLoader getUserClassLoader() { - return getClass().getClassLoader(); + return userCodeClassLoader; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 0a517f0..6f2d349 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; @@ -109,7 +110,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { CloseableRegistry closableRegistry; // use this as default for tests - protected AbstractStateBackend stateBackend = new MemoryStateBackend(); + protected StateBackend stateBackend = new MemoryStateBackend(); private final Object checkpointLock; @@ -132,9 +133,6 @@ public class AbstractStreamOperatorTestHarness<OUT> { this( operator, - maxParallelism, - numSubtasks, - subtaskIndex, new MockEnvironment( "MockTask", 3 * 1024 * 1024, @@ -149,9 +147,6 @@ public class AbstractStreamOperatorTestHarness<OUT> { public AbstractStreamOperatorTestHarness( StreamOperator<OUT> operator, - int maxParallelism, - int numSubtasks, - int subtaskIndex, final Environment environment) throws Exception { this.operator = operator; this.outputList = new ConcurrentLinkedQueue<>(); @@ -192,7 +187,7 @@ public class AbstractStreamOperatorTestHarness<OUT> { when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig); when(mockTask.getEnvironment()).thenReturn(environment); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader()); + when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader()); when(mockTask.getCancelables()).thenReturn(this.closableRegistry); when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer); @@ -226,8 +221,8 @@ public class AbstractStreamOperatorTestHarness<OUT> { OperatorStateBackend osb; osb = stateBackend.createOperatorStateBackend( - environment, - operator.getClass().getSimpleName()); + environment, + operator.getClass().getSimpleName()); mockTask.getCancelables().registerClosable(osb); @@ -248,9 +243,10 @@ public class AbstractStreamOperatorTestHarness<OUT> { return processingTimeService; } }).when(mockTask).getProcessingTimeService(); + } - public void setStateBackend(AbstractStateBackend stateBackend) { + public void setStateBackend(StateBackend stateBackend) { this.stateBackend = stateBackend; } http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java index 8f4908a..0d42d9f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java @@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.KeyGroupRange; @@ -92,6 +93,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> this(operator, keySelector, keyType, 1, 1, 0); } + public KeyedOneInputStreamOperatorTestHarness( + final OneInputStreamOperator<IN, OUT> operator, + final KeySelector<IN, K> keySelector, + final TypeInformation<K> keyType, + final Environment environment) throws Exception { + + super(operator, environment); + + ClosureCleaner.clean(keySelector, false); + config.setStatePartitioner(0, keySelector); + config.setStateKeySerializer(keyType.createSerializer(executionConfig)); + + setupMockTaskCreateKeyedBackend(); + } + private void setupMockTaskCreateKeyedBackend() { try { http://git-wip-us.apache.org/repos/asf/flink/blob/e35c575a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java index 652d016..8a0996f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java @@ -53,7 +53,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> OneInputStreamOperator<IN, OUT> operator, TypeSerializer<IN> typeSerializerIn, Environment environment) throws Exception { - this(operator, 1, 1, 0, environment); + this(operator, environment); config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn)); } @@ -74,11 +74,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> public OneInputStreamOperatorTestHarness( OneInputStreamOperator<IN, OUT> operator, - int maxParallelism, - int numTubtasks, - int subtaskIndex, Environment environment) throws Exception { - super(operator, maxParallelism, numTubtasks, subtaskIndex, environment); + super(operator, environment); this.oneInputOperator = operator; }
