Repository: flink
Updated Branches:
  refs/heads/flip-6 2486d3787 -> a19cae3b0


[hotfix] [tests] Migrate some test tasks to Java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7

Branch: refs/heads/flip-6
Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7
Parents: 2486d37
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 14 23:54:29 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 16 22:01:32 2016 +0200

----------------------------------------------------------------------
 .../StackTraceSampleCoordinatorITCase.java      |  8 ++--
 .../checkpoint/CoordinatorShutdownTest.java     |  8 ++--
 .../ExecutionGraphMetricsTest.java              |  4 +-
 .../ExecutionGraphRestartTest.java              | 23 +++++------
 .../runtime/jobmanager/JobManagerTest.java      |  9 +++--
 .../flink/runtime/jobmanager/JobSubmitTest.java |  9 ++---
 .../runtime/taskmanager/TaskManagerTest.java    | 41 ++++++++++----------
 .../testtasks/BlockingNoOpInvokable.java        | 39 +++++++++++++++++++
 .../flink/runtime/testtasks/NoOpInvokable.java  | 30 ++++++++++++++
 .../runtime/testtasks/WaitingNoOpInvokable.java | 34 ++++++++++++++++
 .../TaskManagerLossFailsTasksTest.scala         |  3 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  1 +
 .../apache/flink/runtime/jobmanager/Tasks.scala | 20 ----------
 .../JobSubmissionFailsITCase.java               |  6 +--
 .../JobManagerHACheckpointRecoveryITCase.java   |  4 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |  4 +-
 .../jobmanager/JobManagerFailsITCase.scala      |  2 +-
 .../taskmanager/TaskManagerFailsITCase.scala    |  3 +-
 18 files changed, 170 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
index 9b1f608..d74af08 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
 import akka.testkit.JavaTestKit;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends 
TestLogger {
                        final int parallelism = 1;
 
                        final JobVertex task = new JobVertex("Task");
-                       
task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+                       task.setInvokableClass(BlockingNoOpInvokable.class);
                        task.setParallelism(parallelism);
 
                        jobGraph.addVertex(task);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..a346a80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -58,7 +60,7 @@ public class CoordinatorShutdownTest {
                        
                        // build a test graph with snapshotting enabled
                        JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       vertex.setInvokableClass(NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
@@ -110,7 +112,7 @@ public class CoordinatorShutdownTest {
                        
                        // build a test graph with snapshotting enabled
                        JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       vertex.setInvokableClass(NoOpInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
                        JobGraph testGraph = new JobGraph("test job", vertex);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 70c2bf9..1f8845d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -41,13 +41,13 @@ import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -89,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
        
                        JobVertex jobVertex = new JobVertex("TestVertex");
                        jobVertex.setParallelism(parallelism);
-                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       jobVertex.setInvokableClass(NoOpInvokable.class);
                        JobGraph jobGraph = new JobGraph("Test Job", jobVertex);
        
                        Configuration config = new Configuration();

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 0d09e38..9fbda51 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,15 +36,16 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
@@ -102,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, 
Tasks.NoOpInvokable.class);
-               JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, 
Tasks.NoOpInvokable.class);
+               JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, 
NoOpInvokable.class);
+               JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, 
NoOpInvokable.class);
 
                SlotSharingGroup sharingGroup = new SlotSharingGroup();
                groupVertex.setSlotSharingGroup(sharingGroup);
@@ -237,7 +238,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                        new FixedDelayRestartStrategy(Integer.MAX_VALUE, 
Long.MAX_VALUE));
 
                JobVertex jobVertex = new JobVertex("NoOpInvokable");
-               jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+               jobVertex.setInvokableClass(NoOpInvokable.class);
                jobVertex.setParallelism(NUM_TASKS);
 
                JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
@@ -369,8 +370,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex sender = newJobVertex("Task1", 1, 
Tasks.NoOpInvokable.class);
-               JobVertex receiver = newJobVertex("Task2", 1, 
Tasks.NoOpInvokable.class);
+               JobVertex sender = newJobVertex("Task1", 1, 
NoOpInvokable.class);
+               JobVertex receiver = newJobVertex("Task2", 1, 
NoOpInvokable.class);
                JobGraph jobGraph = new JobGraph("Pointwise job", sender, 
receiver);
                ExecutionGraph eg = newExecutionGraph(new 
FixedDelayRestartStrategy(1, 1000));
                
eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
@@ -432,7 +433,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex vertex = newJobVertex("Test Vertex", 1, 
Tasks.NoOpInvokable.class);
+               JobVertex vertex = newJobVertex("Test Vertex", 1, 
NoOpInvokable.class);
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -477,7 +478,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex vertex = newJobVertex("Test Vertex", 1, 
Tasks.NoOpInvokable.class);
+               JobVertex vertex = newJobVertex("Test Vertex", 1, 
NoOpInvokable.class);
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -525,7 +526,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                scheduler.newInstanceAvailable(instance);
 
                JobVertex sender = new JobVertex("Task");
-               sender.setInvokableClass(Tasks.NoOpInvokable.class);
+               sender.setInvokableClass(NoOpInvokable.class);
                sender.setParallelism(NUM_TASKS);
 
                JobGraph jobGraph = new JobGraph("Pointwise job", sender);
@@ -639,7 +640,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                Scheduler scheduler = new 
Scheduler(TestingUtils.defaultExecutionContext());
                scheduler.newInstanceAvailable(instance);
 
-               JobVertex sender = newJobVertex("Task", NUM_TASKS, 
Tasks.NoOpInvokable.class);
+               JobVertex sender = newJobVertex("Task", NUM_TASKS, 
NoOpInvokable.class);
 
                JobGraph jobGraph = new JobGraph("Pointwise job", sender);
 
@@ -656,7 +657,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
                return new Tuple2<>(eg, instance);
        }
 
-       private static JobVertex newJobVertex(String task1, int numTasks, 
Class<Tasks.NoOpInvokable> invokable) {
+       private static JobVertex newJobVertex(String task1, int numTasks, 
Class<NoOpInvokable> invokable) {
                JobVertex groupVertex = new JobVertex(task1);
                groupVertex.setInvokableClass(invokable);
                groupVertex.setParallelism(numTasks);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 183477a..3019b06 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -80,6 +80,7 @@ import 
org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -169,7 +170,7 @@ public class JobManagerTest {
                                                // Create a task
                                                final JobVertex sender = new 
JobVertex("Sender");
                                                sender.setParallelism(1);
-                                               
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
                                                
sender.createAndAddResultDataSet(rid, PIPELINED);
 
                                                final JobGraph jobGraph = new 
JobGraph("Blocking test job", sender);
@@ -340,7 +341,7 @@ public class JobManagerTest {
                                                // Create a task
                                                final JobVertex sender = new 
JobVertex("Sender");
                                                sender.setParallelism(1);
-                                               
sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
 
                                                final JobGraph jobGraph = new 
JobGraph("Non-Stoppable batching test job", sender);
                                                final JobID jid = 
jobGraph.getJobID();
@@ -442,11 +443,11 @@ public class JobManagerTest {
                JobGraph jobGraph = new JobGraph("croissant");
                JobVertex jobVertex1 = new JobVertex("cappuccino");
                jobVertex1.setParallelism(4);
-               jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+               jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
 
                JobVertex jobVertex2 = new JobVertex("americano");
                jobVertex2.setParallelism(4);
-               jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+               jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
 
                jobGraph.addVertex(jobVertex1);
                jobGraph.addVertex(jobVertex2);

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 959b9a7..f793524 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
-import org.apache.flink.api.common.ExecutionConfig;
+
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -28,12 +27,12 @@ import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.client.JobExecutionException;
-import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.NetUtils;
 
@@ -108,7 +107,7 @@ public class JobSubmitTest {
                try {
                        // create a simple job graph
                        JobVertex jobVertex = new JobVertex("Test Vertex");
-                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       jobVertex.setInvokableClass(NoOpInvokable.class);
                        JobGraph jg = new JobGraph("test job", jobVertex);
 
                        // request the blob port from the job manager
@@ -172,7 +171,7 @@ public class JobSubmitTest {
                                }
                        };
 
-                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       jobVertex.setInvokableClass(NoOpInvokable.class);
                        JobGraph jg = new JobGraph("test job", jobVertex);
 
                        // submit the job

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d1909fe..22a68b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -67,6 +67,7 @@ import 
org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.SerializedValue;
@@ -1099,25 +1100,25 @@ public class TaskManagerTest extends TestLogger {
 
                                // Single blocking task
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                       new JobID(),
-                                       new AllocationID(),
-                                       "Job",
-                                       new JobVertexID(),
-                                       new ExecutionAttemptID(),
-                                       new SerializedValue<>(new 
ExecutionConfig()),
-                                       "Task",
-                                       1,
-                                       0,
-                                       1,
-                                       0,
-                                       new Configuration(),
-                                       new Configuration(),
-                                       
Tasks.BlockingNoOpInvokable.class.getName(),
-                                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                       
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                       Collections.<BlobKey>emptyList(),
-                                       Collections.<URL>emptyList(),
-                                       0);
+                                               new JobID(),
+                                               new AllocationID(),
+                                               "Job",
+                                               new JobVertexID(),
+                                               new ExecutionAttemptID(),
+                                               new SerializedValue<>(new 
ExecutionConfig()),
+                                               "Task",
+                                               1,
+                                               0,
+                                               1,
+                                               0,
+                                               new Configuration(),
+                                               new Configuration(),
+                                               
BlockingNoOpInvokable.class.getName(),
+                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                               
Collections.<BlobKey>emptyList(),
+                                               Collections.<URL>emptyList(),
+                                               0);
 
                                // Submit the task
                                new Within(d) {
@@ -1220,7 +1221,7 @@ public class TaskManagerTest extends TestLogger {
                                                                        // Look 
for BlockingNoOpInvokable#invoke
                                                                        for 
(StackTraceElement elem : trace) {
                                                                                
if (elem.getClassName().equals(
-                                                                               
                Tasks.BlockingNoOpInvokable.class.getName())) {
+                                                                               
                BlockingNoOpInvokable.class.getName())) {
 
                                                                                
        assertEquals("invoke", elem.getMethodName());
                                                                                
        success = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
new file mode 100644
index 0000000..c9adba8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A task that does nothing but blocks indefinitely, until the executing 
thread is interrupted.
+ */
+public class BlockingNoOpInvokable extends AbstractInvokable {
+
+       @Override
+       public void invoke() throws Exception {
+               final Object o = new Object();
+               //noinspection SynchronizationOnLocalVariableOrMethodParameter
+               synchronized (o) {
+                       //noinspection InfiniteLoopStatement
+                       while (true) {
+                               o.wait();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
new file mode 100644
index 0000000..fa9949a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes immediately.
+ */
+public class NoOpInvokable extends AbstractInvokable {
+
+       @Override
+       public void invoke() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
new file mode 100644
index 0000000..de7d59a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.testtasks;
+
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+/**
+ * A simple task that does nothing and finishes after a short delay of 100 
milliseconds.
+ */
+public class WaitingNoOpInvokable extends AbstractInvokable {
+
+       private static final long waitingTime = 100L;
+
+       @Override
+       public void invoke() throws Exception {
+               Thread.sleep(waitingTime);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
index c30d244..ff0286d 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, 
JobStatus, JobVertex}
 import org.apache.flink.runtime.jobmanager.Tasks
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.testingUtils.TestingUtils
+import org.apache.flink.runtime.testtasks.NoOpInvokable
 import org.apache.flink.util.SerializedValue
 import org.junit.runner.RunWith
 import org.scalatest.junit.JUnitRunner
@@ -48,7 +49,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with 
Matchers {
         scheduler.newInstanceAvailable(instance2)
 
         val sender = new JobVertex("Task")
-        sender.setInvokableClass(classOf[Tasks.NoOpInvokable])
+        sender.setInvokableClass(classOf[NoOpInvokable])
         sender.setParallelism(20)
 
         val jobGraph = new JobGraph("Pointwise job", sender)

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 0569297..4aa0565 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableExcepti
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._
 import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils}
+import org.apache.flink.runtime.testtasks._
 import org.junit.runner.RunWith
 import org.mockito.Mockito
 import org.mockito.Mockito._

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 87c123a..fabd66b 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -25,26 +25,6 @@ import org.apache.flink.types.IntValue
 
 
 object Tasks {
-  class BlockingNoOpInvokable extends AbstractInvokable {
-    override def invoke(): Unit = {
-      val o = new Object()
-      o.synchronized{
-        o.wait()
-      }
-    }
-  }
-
-  class NoOpInvokable extends AbstractInvokable{
-    override def invoke(): Unit = {}
-  }
-
-  class WaitingNoOpInvokable extends AbstractInvokable{
-    val waitingTime = 100L
-
-    override def invoke(): Unit = {
-      Thread.sleep(waitingTime)
-    }
-  }
 
   class Sender extends AbstractInvokable{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
index 178656d..256b1ae 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -63,7 +63,7 @@ public class JobSubmissionFailsITCase {
                        cluster.start();
                        
                        final JobVertex jobVertex = new JobVertex("Working job 
vertex.");
-                       jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       jobVertex.setInvokableClass(NoOpInvokable.class);
                        workingJobGraph = new JobGraph("Working testing job", 
jobVertex);
                }
                catch (Exception e) {
@@ -113,7 +113,7 @@ public class JobSubmissionFailsITCase {
        public void testExceptionInInitializeOnMaster() {
                try {
                        final JobVertex failingJobVertex = new 
FailingJobVertex("Failing job vertex");
-                       
failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
                        final JobGraph failingJobGraph = new JobGraph("Failing 
testing job", failingJobVertex);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 49eaeb7..92b90da 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -31,10 +31,10 @@ import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -372,7 +372,7 @@ public class JobManagerHACheckpointRecoveryITCase extends 
TestLogger {
 
                        // BLocking JobGraph
                        JobVertex blockingVertex = new JobVertex("Blocking 
vertex");
-                       
blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+                       
blockingVertex.setInvokableClass(BlockingNoOpInvokable.class);
                        JobGraph jobGraph = new JobGraph(blockingVertex);
 
                        // Submit the job in detached mode

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
index bf39c4b..236e922 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.leaderelection.TestingListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -48,6 +47,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
 import org.apache.flink.runtime.testutils.JobManagerProcess;
@@ -461,7 +461,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends 
TestLogger {
                JobGraph jobGraph = new JobGraph("Blocking program");
 
                JobVertex jobVertex = new JobVertex("Blocking Vertex");
-               jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
+               jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
 
                jobGraph.addVertex(jobVertex);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
index 1b2838d..258282e 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala
@@ -23,7 +23,7 @@ import akka.testkit.{ImplicitSender, TestKit}
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
NoOpInvokable}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, 
NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Acknowledge
 import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered

http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
index 3b39b3f..e141cc2 100644
--- 
a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
+++ 
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala
@@ -25,7 +25,8 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.client.JobExecutionException
 import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, 
JobVertex}
-import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, 
BlockingReceiver, NoOpInvokable, Sender}
+import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender}
+import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, 
NoOpInvokable}
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import 
org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager,
 RegisteredAtJobManager}
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._

Reply via email to