[FLINK-6796] [tests] Use Environment's class loader in 
AbstractStreamOperatorTestHarness

Generalize KeyedOneInputStreamOperatorTestHarness

Generalize AbstractStreamOperatorTestHarness


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54679660
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54679660
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54679660

Branch: refs/heads/release-1.3
Commit: 546796602e9b8a42f98b76c8d34219eeeb44f33c
Parents: 6ae0995
Author: Till Rohrmann <[email protected]>
Authored: Wed May 31 15:14:11 2017 +0200
Committer: Tzu-Li (Gordon) Tai <[email protected]>
Committed: Tue Jun 13 07:21:09 2017 +0200

----------------------------------------------------------------------
 .../operators/testutils/MockEnvironment.java    | 43 +++++++++++++++++++-
 .../util/AbstractStreamOperatorTestHarness.java | 18 ++++----
 .../KeyedOneInputStreamOperatorTestHarness.java | 16 ++++++++
 .../util/OneInputStreamOperatorTestHarness.java |  7 +---
 4 files changed, 66 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54679660/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 49175c7..4f0242e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 
+import org.apache.flink.util.Preconditions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -97,12 +98,23 @@ public class MockEnvironment implements Environment {
 
        private final int bufferSize;
 
+       private final ClassLoader userCodeClassLoader;
+
        public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize) {
                this(taskName, memorySize, inputSplitProvider, bufferSize, new 
Configuration(), new ExecutionConfig());
        }
 
        public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration 
taskConfiguration, ExecutionConfig executionConfig) {
-               this(taskName, memorySize, inputSplitProvider, bufferSize, 
taskConfiguration, executionConfig, 1, 1, 0);
+               this(
+                       taskName,
+                       memorySize,
+                       inputSplitProvider,
+                       bufferSize,
+                       taskConfiguration,
+                       executionConfig,
+                       1,
+                       1,
+                       0);
        }
 
        public MockEnvironment(
@@ -115,6 +127,31 @@ public class MockEnvironment implements Environment {
                        int maxParallelism,
                        int parallelism,
                        int subtaskIndex) {
+               this(
+                       taskName,
+                       memorySize,
+                       inputSplitProvider,
+                       bufferSize,
+                       taskConfiguration,
+                       executionConfig,
+                       maxParallelism,
+                       parallelism,
+                       subtaskIndex,
+                       Thread.currentThread().getContextClassLoader());
+
+       }
+
+       public MockEnvironment(
+                       String taskName,
+                       long memorySize,
+                       MockInputSplitProvider inputSplitProvider,
+                       int bufferSize,
+                       Configuration taskConfiguration,
+                       ExecutionConfig executionConfig,
+                       int maxParallelism,
+                       int parallelism,
+                       int subtaskIndex,
+                       ClassLoader userCodeClassLoader) {
                this.taskInfo = new TaskInfo(taskName, maxParallelism, 
subtaskIndex, parallelism, 0);
                this.jobConfiguration = new Configuration();
                this.taskConfiguration = taskConfiguration;
@@ -131,6 +168,8 @@ public class MockEnvironment implements Environment {
 
                KvStateRegistry registry = new KvStateRegistry();
                this.kvStateRegistry = registry.createTaskRegistry(jobID, 
getJobVertexId());
+
+               this.userCodeClassLoader = 
Preconditions.checkNotNull(userCodeClassLoader);
        }
 
 
@@ -254,7 +293,7 @@ public class MockEnvironment implements Environment {
 
        @Override
        public ClassLoader getUserClassLoader() {
-               return getClass().getClassLoader();
+               return userCodeClassLoader;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/54679660/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 7a8488f..fdbf990 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -44,6 +44,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -107,7 +108,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
        CloseableRegistry closableRegistry;
 
        // use this as default for tests
-       protected AbstractStateBackend stateBackend = new MemoryStateBackend();
+       protected StateBackend stateBackend = new MemoryStateBackend();
 
        private final Object checkpointLock;
 
@@ -130,9 +131,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
                this(
                        operator,
-                       maxParallelism,
-                       numSubtasks,
-                       subtaskIndex,
                        new MockEnvironment(
                                "MockTask",
                                3 * 1024 * 1024,
@@ -147,9 +145,6 @@ public class AbstractStreamOperatorTestHarness<OUT> {
 
        public AbstractStreamOperatorTestHarness(
                        StreamOperator<OUT> operator,
-                       int maxParallelism,
-                       int numSubtasks,
-                       int subtaskIndex,
                        final Environment environment) throws Exception {
                this.operator = operator;
                this.outputList = new ConcurrentLinkedQueue<>();
@@ -190,7 +185,7 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
                when(mockTask.getEnvironment()).thenReturn(environment);
                when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-               
when(mockTask.getUserCodeClassLoader()).thenReturn(this.getClass().getClassLoader());
+               
when(mockTask.getUserCodeClassLoader()).thenReturn(environment.getUserClassLoader());
                
when(mockTask.getCancelables()).thenReturn(this.closableRegistry);
                
when(mockTask.getStreamStatusMaintainer()).thenReturn(mockStreamStatusMaintainer);
 
@@ -224,8 +219,8 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                                        OperatorStateBackend osb;
 
                                        osb = 
stateBackend.createOperatorStateBackend(
-                                                       environment,
-                                                       
operator.getClass().getSimpleName());
+                                               environment,
+                                               
operator.getClass().getSimpleName());
 
                                        
mockTask.getCancelables().registerClosable(osb);
 
@@ -246,9 +241,10 @@ public class AbstractStreamOperatorTestHarness<OUT> {
                                return processingTimeService;
                        }
                }).when(mockTask).getProcessingTimeService();
+
        }
 
-       public void setStateBackend(AbstractStateBackend stateBackend) {
+       public void setStateBackend(StateBackend stateBackend) {
                this.stateBackend = stateBackend;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/54679660/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index c6d0bce..322de5b 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -92,6 +93,21 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, 
OUT>
                this(operator, keySelector, keyType, 1, 1, 0);
        }
 
+       public KeyedOneInputStreamOperatorTestHarness(
+                       final OneInputStreamOperator<IN, OUT> operator,
+                       final  KeySelector<IN, K> keySelector,
+                       final TypeInformation<K> keyType,
+                       final Environment environment) throws Exception {
+
+               super(operator, environment);
+
+               ClosureCleaner.clean(keySelector, false);
+               config.setStatePartitioner(0, keySelector);
+               
config.setStateKeySerializer(keyType.createSerializer(executionConfig));
+
+               setupMockTaskCreateKeyedBackend();
+       }
+
        private void setupMockTaskCreateKeyedBackend() {
 
                try {

http://git-wip-us.apache.org/repos/asf/flink/blob/54679660/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index ced8cca..8f4e784 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -52,7 +52,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
                OneInputStreamOperator<IN, OUT> operator,
                TypeSerializer<IN> typeSerializerIn,
                Environment environment) throws Exception {
-               this(operator, 1, 1, 0, environment);
+               this(operator, environment);
 
                
config.setTypeSerializerIn1(Preconditions.checkNotNull(typeSerializerIn));
        }
@@ -73,11 +73,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT>
 
        public OneInputStreamOperatorTestHarness(
                OneInputStreamOperator<IN, OUT> operator,
-               int maxParallelism,
-               int numTubtasks,
-               int subtaskIndex,
                Environment environment) throws Exception {
-               super(operator, maxParallelism, numTubtasks, subtaskIndex, 
environment);
+               super(operator, environment);
 
                this.oneInputOperator = operator;
        }

Reply via email to