Repository: flink
Updated Branches:
  refs/heads/master 428419d59 -> 1e475c768


[FLINK-4842] Introduce test to enforce order of operator / udf lifecycles


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1e475c76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1e475c76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1e475c76

Branch: refs/heads/master
Commit: 1e475c768ae0d7e13746a3ca6aa258141016d419
Parents: cab9cd4
Author: Stefan Richter <[email protected]>
Authored: Thu Oct 13 11:32:19 2016 +0200
Committer: Aljoscha Krettek <[email protected]>
Committed: Thu Oct 20 16:14:21 2016 +0200

----------------------------------------------------------------------
 .../AbstractUdfStreamOperatorLifecycleTest.java | 293 +++++++++++++++++++
 .../AbstractUdfStreamOperatorTest.java          | 219 --------------
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 3 files changed, 295 insertions(+), 221 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
new file mode 100644
index 0000000..cbb833b
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Serializable;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * This test secures the lifecycle of AbstractUdfStreamOperator, including 
it's UDF handling.
+ */
+public class AbstractUdfStreamOperatorLifecycleTest {
+
+       private static final List<String> EXPECTED_CALL_ORDER_FULL = 
Arrays.asList(
+                       "OPERATOR::setup",
+                       "UDF::setRuntimeContext",
+                       "OPERATOR::initializeState",
+                       "OPERATOR::open",
+                       "UDF::open",
+                       "OPERATOR::run",
+                       "UDF::run",
+                       "OPERATOR::snapshotState",
+                       "OPERATOR::close",
+                       "UDF::close",
+                       "OPERATOR::dispose");
+
+       private static final List<String> EXPECTED_CALL_ORDER_CANCEL_RUNNING = 
Arrays.asList(
+                       "OPERATOR::setup",
+                       "UDF::setRuntimeContext",
+                       "OPERATOR::initializeState",
+                       "OPERATOR::open",
+                       "UDF::open",
+                       "OPERATOR::run",
+                       "UDF::run",
+                       "OPERATOR::cancel",
+                       "UDF::cancel",
+                       "OPERATOR::dispose",
+                       "UDF::close");
+
+       private static final String ALL_METHODS_STREAM_OPERATOR = "[close[], 
dispose[], getChainingStrategy[], " +
+                       "getMetricGroup[], initializeState[class 
org.apache.flink.streaming.runtime.tasks.OperatorStateHandles], " +
+                       "notifyOfCompletedCheckpoint[long], open[], 
setChainingStrategy[class " +
+                       
"org.apache.flink.streaming.api.operators.ChainingStrategy], 
setKeyContextElement1[class " +
+                       
"org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
+                       "setKeyContextElement2[class 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord], " +
+                       "setup[class 
org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
+                       "org.apache.flink.streaming.api.graph.StreamConfig, 
interface " +
+                       "org.apache.flink.streaming.api.operators.Output], 
snapshotState[long, long, " +
+                       "interface 
org.apache.flink.runtime.state.CheckpointStreamFactory]]";
+
+       private static final String ALL_METHODS_RICH_FUNCTION = "[close[], 
getIterationRuntimeContext[], getRuntimeContext[]" +
+                       ", open[class 
org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
+                       
"org.apache.flink.api.common.functions.RuntimeContext]]";
+
+       private static final List<String> ACTUAL_ORDER_TRACKING =
+                       Collections.synchronizedList(new 
ArrayList<String>(EXPECTED_CALL_ORDER_FULL.size()));
+
+       @Test
+       public void testAllMethodsRegisteredInTest() {
+               List<String> methodsWithSignatureString = new ArrayList<>();
+               for (Method method : StreamOperator.class.getMethods()) {
+                       methodsWithSignatureString.add(method.getName() + 
Arrays.toString(method.getParameterTypes()));
+               }
+               Collections.sort(methodsWithSignatureString);
+               Assert.assertEquals("It seems like new methods have been 
introduced to " + StreamOperator.class +
+                               ". Please register them with this test and 
ensure to document their position in the lifecycle " +
+                               "(if applicable).", 
ALL_METHODS_STREAM_OPERATOR, methodsWithSignatureString.toString());
+
+               methodsWithSignatureString = new ArrayList<>();
+               for (Method method : RichFunction.class.getMethods()) {
+                       methodsWithSignatureString.add(method.getName() + 
Arrays.toString(method.getParameterTypes()));
+               }
+               Collections.sort(methodsWithSignatureString);
+               Assert.assertEquals("It seems like new methods have been 
introduced to " + RichFunction.class +
+                               ". Please register them with this test and 
ensure to document their position in the lifecycle " +
+                               "(if applicable).", ALL_METHODS_RICH_FUNCTION, 
methodsWithSignatureString.toString());
+       }
+
+       @Test
+       public void testLifeCycleFull() throws Exception {
+               ACTUAL_ORDER_TRACKING.clear();
+
+               Configuration taskManagerConfig = new Configuration();
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               MockSourceFunction srcFun = new MockSourceFunction();
+
+               cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, 
true));
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               Task task = StreamTaskTest.createTask(SourceStreamTask.class, 
cfg, taskManagerConfig);
+
+               task.startTaskThread();
+
+               LifecycleTrackingStreamSource.runStarted.await();
+
+               // wait for clean termination
+               task.getExecutingThread().join();
+               assertEquals(ExecutionState.FINISHED, task.getExecutionState());
+               assertEquals(EXPECTED_CALL_ORDER_FULL, ACTUAL_ORDER_TRACKING);
+       }
+
+       @Test
+       public void testLifeCycleCancel() throws Exception {
+               ACTUAL_ORDER_TRACKING.clear();
+
+               Configuration taskManagerConfig = new Configuration();
+               StreamConfig cfg = new StreamConfig(new Configuration());
+               MockSourceFunction srcFun = new MockSourceFunction();
+               cfg.setStreamOperator(new LifecycleTrackingStreamSource(srcFun, 
false));
+               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+
+               Task task = StreamTaskTest.createTask(SourceStreamTask.class, 
cfg, taskManagerConfig);
+
+               task.startTaskThread();
+               LifecycleTrackingStreamSource.runStarted.await();
+
+               // this should cancel the task even though it is blocked on 
runFinished
+               task.cancelExecution();
+
+               // wait for clean termination
+               task.getExecutingThread().join();
+               assertEquals(ExecutionState.CANCELED, task.getExecutionState());
+               assertEquals(EXPECTED_CALL_ORDER_CANCEL_RUNNING, 
ACTUAL_ORDER_TRACKING);
+       }
+
+       private static class MockSourceFunction extends 
RichSourceFunction<Long> {
+
+               private static final long serialVersionUID = 1L;
+
+               @Override
+               public void run(SourceContext<Long> ctx) {
+                       ACTUAL_ORDER_TRACKING.add("UDF::run");
+               }
+
+               @Override
+               public void cancel() {
+                       ACTUAL_ORDER_TRACKING.add("UDF::cancel");
+               }
+
+               @Override
+               public void setRuntimeContext(RuntimeContext t) {
+                       ACTUAL_ORDER_TRACKING.add("UDF::setRuntimeContext");
+                       super.setRuntimeContext(t);
+               }
+
+               @Override
+               public void open(Configuration parameters) throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("UDF::open");
+                       super.open(parameters);
+               }
+
+               @Override
+               public void close() throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("UDF::close");
+                       super.close();
+               }
+       }
+
+       private static class LifecycleTrackingStreamSource<OUT, SRC extends 
SourceFunction<OUT>>
+                       extends StreamSource<OUT, SRC> implements Serializable {
+
+               private static final long serialVersionUID = 
2431488948886850562L;
+               private transient Thread testCheckpointer;
+
+               private final boolean simulateCheckpointing;
+
+               static OneShotLatch runStarted;
+               static OneShotLatch runFinish;
+
+               public LifecycleTrackingStreamSource(SRC sourceFunction, 
boolean simulateCheckpointing) {
+                       super(sourceFunction);
+                       this.simulateCheckpointing = simulateCheckpointing;
+                       runStarted = new OneShotLatch();
+                       runFinish = new OneShotLatch();
+               }
+
+               @Override
+               public void run(Object lockingObject, Output<StreamRecord<OUT>> 
collector) throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::run");
+                       super.run(lockingObject, collector);
+                       runStarted.trigger();
+                       runFinish.await();
+               }
+
+               @Override
+               public void setup(StreamTask<?, ?> containingTask, StreamConfig 
config, Output<StreamRecord<OUT>> output) {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::setup");
+                       super.setup(containingTask, config, output);
+                       if (simulateCheckpointing) {
+                               testCheckpointer = new Thread() {
+                                       @Override
+                                       public void run() {
+                                               long id = 0;
+                                               while (true) {
+                                                       try {
+                                                               
Thread.sleep(50);
+                                                               if 
(getContainingTask().isCanceled() || getContainingTask().triggerCheckpoint(
+                                                                               
new CheckpointMetaData(id++, System.currentTimeMillis()))) {
+                                                                       
LifecycleTrackingStreamSource.runFinish.trigger();
+                                                                       break;
+                                                               }
+                                                       } catch (Exception e) {
+                                                               
e.printStackTrace();
+                                                               Assert.fail();
+                                                       }
+                                               }
+                                       }
+                               };
+                               testCheckpointer.start();
+                       }
+               }
+
+               @Override
+               public void snapshotState(StateSnapshotContext context) throws 
Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::snapshotState");
+                       super.snapshotState(context);
+               }
+
+               @Override
+               public void initializeState(StateInitializationContext context) 
throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::initializeState");
+                       super.initializeState(context);
+               }
+
+               @Override
+               public void open() throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::open");
+                       super.open();
+               }
+
+               @Override
+               public void close() throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::close");
+                       super.close();
+               }
+
+               @Override
+               public void cancel() {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::cancel");
+                       super.cancel();
+               }
+
+               @Override
+               public void dispose() throws Exception {
+                       ACTUAL_ORDER_TRACKING.add("OPERATOR::dispose");
+                       super.dispose();
+                       if (simulateCheckpointing) {
+                               testCheckpointer.join();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
deleted file mode 100644
index f5d633c..0000000
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorTest.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.filecache.FileCache;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.runtime.taskmanager.CheckpointResponder;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.taskmanager.TaskManagerConnection;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.Serializable;
-import java.net.URL;
-import java.util.Collections;
-import java.util.concurrent.Executor;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class AbstractUdfStreamOperatorTest {
-
-       @Test
-       public void testLifeCycle() throws Exception {
-
-               Configuration taskManagerConfig = new Configuration();
-
-               StreamConfig cfg = new StreamConfig(new Configuration());
-               cfg.setStreamOperator(new LifecycleTrackingStreamSource(new 
MockSourceFunction()));
-               cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-               Task task = createTask(SourceStreamTask.class, cfg, 
taskManagerConfig);
-
-               task.startTaskThread();
-
-               // wait for clean termination
-               task.getExecutingThread().join();
-               assertEquals(ExecutionState.FINISHED, task.getExecutionState());
-       }
-
-       private static class MockSourceFunction extends 
RichSourceFunction<Long> {
-
-               private static final long serialVersionUID = 1L;
-
-               @Override
-               public void run(SourceContext<Long> ctx) {
-               }
-
-               @Override
-               public void cancel() {
-               }
-
-               @Override
-               public void setRuntimeContext(RuntimeContext t) {
-                       System.out.println("!setRuntimeContext");
-                       super.setRuntimeContext(t);
-               }
-
-               @Override
-               public void open(Configuration parameters) throws Exception {
-                       System.out.println("!open");
-                       super.open(parameters);
-               }
-
-               @Override
-               public void close() throws Exception {
-                       System.out.println("!close");
-                       super.close();
-               }
-       }
-
-       private Task createTask(
-                       Class<? extends AbstractInvokable> invokable,
-                       StreamConfig taskConfig,
-                       Configuration taskManagerConfig) throws Exception {
-
-               LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-               
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
-
-               ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
-               ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
-               PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
-               Executor executor = mock(Executor.class);
-
-               NetworkEnvironment network = mock(NetworkEnvironment.class);
-               
when(network.getResultPartitionManager()).thenReturn(partitionManager);
-               
when(network.getDefaultIOMode()).thenReturn(IOManager.IOMode.SYNC);
-               when(network.createKvStateTaskRegistry(any(JobID.class), 
any(JobVertexID.class)))
-                               .thenReturn(mock(TaskKvStateRegistry.class));
-
-               TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-                               new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "Test Task", 1, 0, 1, 0,
-                               new Configuration(),
-                               taskConfig.getConfiguration(),
-                               invokable.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               0);
-
-               return new Task(
-                               tdd,
-                               mock(MemoryManager.class),
-                               mock(IOManager.class),
-                               network,
-                               mock(BroadcastVariableManager.class),
-                               mock(TaskManagerConnection.class),
-                               mock(InputSplitProvider.class),
-                               mock(CheckpointResponder.class),
-                               libCache,
-                               mock(FileCache.class),
-                               new TaskManagerRuntimeInfo("localhost", 
taskManagerConfig, System.getProperty("java.io.tmpdir")),
-                               new UnregisteredTaskMetricsGroup(),
-                               consumableNotifier,
-                               partitionStateChecker,
-                               executor);
-       }
-
-       static class LifecycleTrackingStreamSource<OUT, SRC extends 
SourceFunction<OUT>>
-                       extends StreamSource<OUT, SRC> implements Serializable {
-
-               //private transient final AtomicInteger currentState;
-
-               private static final long serialVersionUID = 
2431488948886850562L;
-
-               public LifecycleTrackingStreamSource(SRC sourceFunction) {
-                       super(sourceFunction);
-               }
-
-               @Override
-               public void setup(StreamTask<?, ?> containingTask, StreamConfig 
config, Output<StreamRecord<OUT>> output) {
-                       System.out.println("setup");
-                       super.setup(containingTask, config, output);
-               }
-
-               @Override
-               public void snapshotState(StateSnapshotContext context) throws 
Exception {
-                       System.out.println("snapshotState");
-                       super.snapshotState(context);
-               }
-
-               @Override
-               public void initializeState(StateInitializationContext context) 
throws Exception {
-                       System.out.println("initializeState");
-                       super.initializeState(context);
-               }
-
-               @Override
-               public void open() throws Exception {
-                       System.out.println("open");
-                       super.open();
-               }
-
-               @Override
-               public void close() throws Exception {
-                       System.out.println("close");
-                       super.close();
-               }
-
-               @Override
-               public void dispose() throws Exception {
-                       super.dispose();
-                       System.out.println("dispose");
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1e475c76/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 8aae19f..94f6d5a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -200,13 +200,13 @@ public class StreamTaskTest {
                }
        }
 
-       private Task createTask(
+       public static Task createTask(
                        Class<? extends AbstractInvokable> invokable,
                        StreamConfig taskConfig,
                        Configuration taskManagerConfig) throws Exception {
 
                LibraryCacheManager libCache = mock(LibraryCacheManager.class);
-               
when(libCache.getClassLoader(any(JobID.class))).thenReturn(getClass().getClassLoader());
+               
when(libCache.getClassLoader(any(JobID.class))).thenReturn(StreamTaskTest.class.getClassLoader());
                
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);

Reply via email to