Repository: flink Updated Branches: refs/heads/flip-6 2486d3787 -> a19cae3b0
[hotfix] [tests] Migrate some test tasks to Java Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/da16b0a7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/da16b0a7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/da16b0a7 Branch: refs/heads/flip-6 Commit: da16b0a7b1bf5d771d07428ae485048ce540bbf7 Parents: 2486d37 Author: Stephan Ewen <se...@apache.org> Authored: Fri Oct 14 23:54:29 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Oct 16 22:01:32 2016 +0200 ---------------------------------------------------------------------- .../StackTraceSampleCoordinatorITCase.java | 8 ++-- .../checkpoint/CoordinatorShutdownTest.java | 8 ++-- .../ExecutionGraphMetricsTest.java | 4 +- .../ExecutionGraphRestartTest.java | 23 +++++------ .../runtime/jobmanager/JobManagerTest.java | 9 +++-- .../flink/runtime/jobmanager/JobSubmitTest.java | 9 ++--- .../runtime/taskmanager/TaskManagerTest.java | 41 ++++++++++---------- .../testtasks/BlockingNoOpInvokable.java | 39 +++++++++++++++++++ .../flink/runtime/testtasks/NoOpInvokable.java | 30 ++++++++++++++ .../runtime/testtasks/WaitingNoOpInvokable.java | 34 ++++++++++++++++ .../TaskManagerLossFailsTasksTest.scala | 3 +- .../runtime/jobmanager/JobManagerITCase.scala | 1 + .../apache/flink/runtime/jobmanager/Tasks.scala | 20 ---------- .../JobSubmissionFailsITCase.java | 6 +-- .../JobManagerHACheckpointRecoveryITCase.java | 4 +- .../JobManagerHAJobGraphRecoveryITCase.java | 4 +- .../jobmanager/JobManagerFailsITCase.scala | 2 +- .../taskmanager/TaskManagerFailsITCase.scala | 3 +- 18 files changed, 170 insertions(+), 78 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java index 9b1f608..d74af08 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorITCase.java @@ -20,7 +20,7 @@ package org.apache.flink.runtime.webmonitor; import akka.actor.ActorSystem; import akka.testkit.JavaTestKit; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,14 +31,16 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.util.TestLogger; + import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -81,7 +83,7 @@ public class StackTraceSampleCoordinatorITCase extends TestLogger { final int parallelism = 1; final JobVertex task = new JobVertex("Task"); - task.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + task.setInvokableClass(BlockingNoOpInvokable.class); task.setParallelism(parallelism); jobGraph.addVertex(task); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index ea4d322..a346a80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -28,11 +28,13 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; + import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -58,7 +60,7 @@ public class CoordinatorShutdownTest { // build a test graph with snapshotting enabled JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setInvokableClass(Tasks.NoOpInvokable.class); + vertex.setInvokableClass(NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); @@ -110,7 +112,7 @@ public class CoordinatorShutdownTest { // build a test graph with snapshotting enabled JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setInvokableClass(Tasks.NoOpInvokable.class); + vertex.setInvokableClass(NoOpInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java index 70c2bf9..1f8845d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java @@ -41,13 +41,13 @@ import org.apache.flink.runtime.instance.Slot; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.messages.Messages; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; @@ -89,7 +89,7 @@ public class ExecutionGraphMetricsTest extends TestLogger { JobVertex jobVertex = new JobVertex("TestVertex"); jobVertex.setParallelism(parallelism); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Test Job", jobVertex); Configuration config = new Configuration(); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java index 0d09e38..9fbda51 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java @@ -36,15 +36,16 @@ import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.SerializedValue; import org.apache.flink.util.TestLogger; import org.junit.Test; + import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -102,8 +103,8 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance); - JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class); - JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class); + JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, NoOpInvokable.class); + JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, NoOpInvokable.class); SlotSharingGroup sharingGroup = new SlotSharingGroup(); groupVertex.setSlotSharingGroup(sharingGroup); @@ -237,7 +238,7 @@ public class ExecutionGraphRestartTest extends TestLogger { new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE)); JobVertex jobVertex = new JobVertex("NoOpInvokable"); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); jobVertex.setParallelism(NUM_TASKS); JobGraph jobGraph = new JobGraph("TestJob", jobVertex); @@ -369,8 +370,8 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance); - JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class); - JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class); + JobVertex sender = newJobVertex("Task1", 1, NoOpInvokable.class); + JobVertex receiver = newJobVertex("Task2", 1, NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver); ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000)); eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources()); @@ -432,7 +433,7 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance); - JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class); + JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( @@ -477,7 +478,7 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance); - JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class); + JobVertex vertex = newJobVertex("Test Vertex", 1, NoOpInvokable.class); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart( @@ -525,7 +526,7 @@ public class ExecutionGraphRestartTest extends TestLogger { scheduler.newInstanceAvailable(instance); JobVertex sender = new JobVertex("Task"); - sender.setInvokableClass(Tasks.NoOpInvokable.class); + sender.setInvokableClass(NoOpInvokable.class); sender.setParallelism(NUM_TASKS); JobGraph jobGraph = new JobGraph("Pointwise job", sender); @@ -639,7 +640,7 @@ public class ExecutionGraphRestartTest extends TestLogger { Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext()); scheduler.newInstanceAvailable(instance); - JobVertex sender = newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class); + JobVertex sender = newJobVertex("Task", NUM_TASKS, NoOpInvokable.class); JobGraph jobGraph = new JobGraph("Pointwise job", sender); @@ -656,7 +657,7 @@ public class ExecutionGraphRestartTest extends TestLogger { return new Tuple2<>(eg, instance); } - private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) { + private static JobVertex newJobVertex(String task1, int numTasks, Class<NoOpInvokable> invokable) { JobVertex groupVertex = new JobVertex(task1); groupVertex.setInvokableClass(invokable); groupVertex.setParallelism(numTasks); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index 183477a..3019b06 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -80,6 +80,7 @@ import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist; import org.apache.flink.runtime.testingUtils.TestingTaskManager; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -169,7 +170,7 @@ public class JobManagerTest { // Create a task final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(1); - sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender.setInvokableClass(BlockingNoOpInvokable.class); // just block sender.createAndAddResultDataSet(rid, PIPELINED); final JobGraph jobGraph = new JobGraph("Blocking test job", sender); @@ -340,7 +341,7 @@ public class JobManagerTest { // Create a task final JobVertex sender = new JobVertex("Sender"); sender.setParallelism(1); - sender.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block + sender.setInvokableClass(BlockingNoOpInvokable.class); // just block final JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", sender); final JobID jid = jobGraph.getJobID(); @@ -442,11 +443,11 @@ public class JobManagerTest { JobGraph jobGraph = new JobGraph("croissant"); JobVertex jobVertex1 = new JobVertex("cappuccino"); jobVertex1.setParallelism(4); - jobVertex1.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + jobVertex1.setInvokableClass(BlockingNoOpInvokable.class); JobVertex jobVertex2 = new JobVertex("americano"); jobVertex2.setParallelism(4); - jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + jobVertex2.setInvokableClass(BlockingNoOpInvokable.class); jobGraph.addVertex(jobVertex1); jobGraph.addVertex(jobVertex2); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java index 959b9a7..f793524 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java @@ -18,9 +18,8 @@ package org.apache.flink.runtime.jobmanager; -import akka.actor.ActorRef; import akka.actor.ActorSystem; -import org.apache.flink.api.common.ExecutionConfig; + import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -28,12 +27,12 @@ import org.apache.flink.runtime.akka.ListeningBehaviour; import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.NetUtils; @@ -108,7 +107,7 @@ public class JobSubmitTest { try { // create a simple job graph JobVertex jobVertex = new JobVertex("Test Vertex"); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); JobGraph jg = new JobGraph("test job", jobVertex); // request the blob port from the job manager @@ -172,7 +171,7 @@ public class JobSubmitTest { } }; - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); JobGraph jg = new JobGraph("test job", jobVertex); // submit the job http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java index d1909fe..22a68b1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java @@ -67,6 +67,7 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.StoppableInvokable; import org.apache.flink.util.NetUtils; import org.apache.flink.util.SerializedValue; @@ -1099,25 +1100,25 @@ public class TaskManagerTest extends TestLogger { // Single blocking task final TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor( - new JobID(), - new AllocationID(), - "Job", - new JobVertexID(), - new ExecutionAttemptID(), - new SerializedValue<>(new ExecutionConfig()), - "Task", - 1, - 0, - 1, - 0, - new Configuration(), - new Configuration(), - Tasks.BlockingNoOpInvokable.class.getName(), - Collections.<ResultPartitionDeploymentDescriptor>emptyList(), - Collections.<InputGateDeploymentDescriptor>emptyList(), - Collections.<BlobKey>emptyList(), - Collections.<URL>emptyList(), - 0); + new JobID(), + new AllocationID(), + "Job", + new JobVertexID(), + new ExecutionAttemptID(), + new SerializedValue<>(new ExecutionConfig()), + "Task", + 1, + 0, + 1, + 0, + new Configuration(), + new Configuration(), + BlockingNoOpInvokable.class.getName(), + Collections.<ResultPartitionDeploymentDescriptor>emptyList(), + Collections.<InputGateDeploymentDescriptor>emptyList(), + Collections.<BlobKey>emptyList(), + Collections.<URL>emptyList(), + 0); // Submit the task new Within(d) { @@ -1220,7 +1221,7 @@ public class TaskManagerTest extends TestLogger { // Look for BlockingNoOpInvokable#invoke for (StackTraceElement elem : trace) { if (elem.getClassName().equals( - Tasks.BlockingNoOpInvokable.class.getName())) { + BlockingNoOpInvokable.class.getName())) { assertEquals("invoke", elem.getMethodName()); success = true; http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java new file mode 100644 index 0000000..c9adba8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/BlockingNoOpInvokable.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * A task that does nothing but blocks indefinitely, until the executing thread is interrupted. + */ +public class BlockingNoOpInvokable extends AbstractInvokable { + + @Override + public void invoke() throws Exception { + final Object o = new Object(); + //noinspection SynchronizationOnLocalVariableOrMethodParameter + synchronized (o) { + //noinspection InfiniteLoopStatement + while (true) { + o.wait(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java new file mode 100644 index 0000000..fa9949a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/NoOpInvokable.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * A simple task that does nothing and finishes immediately. + */ +public class NoOpInvokable extends AbstractInvokable { + + @Override + public void invoke() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java new file mode 100644 index 0000000..de7d59a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testtasks/WaitingNoOpInvokable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.testtasks; + +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +/** + * A simple task that does nothing and finishes after a short delay of 100 milliseconds. + */ +public class WaitingNoOpInvokable extends AbstractInvokable { + + private static final long waitingTime = 100L; + + @Override + public void invoke() throws Exception { + Thread.sleep(waitingTime); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala index c30d244..ff0286d 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/executiongraph/TaskManagerLossFailsTasksTest.scala @@ -27,6 +27,7 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertex} import org.apache.flink.runtime.jobmanager.Tasks import org.apache.flink.runtime.jobmanager.scheduler.Scheduler import org.apache.flink.runtime.testingUtils.TestingUtils +import org.apache.flink.runtime.testtasks.NoOpInvokable import org.apache.flink.util.SerializedValue import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -48,7 +49,7 @@ class TaskManagerLossFailsTasksTest extends WordSpecLike with Matchers { scheduler.newInstanceAvailable(instance2) val sender = new JobVertex("Task") - sender.setInvokableClass(classOf[Tasks.NoOpInvokable]) + sender.setInvokableClass(classOf[NoOpInvokable]) sender.setParallelism(20) val jobGraph = new JobGraph("Pointwise job", sender) http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index 0569297..4aa0565 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -33,6 +33,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableExcepti import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._ import org.apache.flink.runtime.testingUtils.{ScalaTestingUtils, TestingUtils} +import org.apache.flink.runtime.testtasks._ import org.junit.runner.RunWith import org.mockito.Mockito import org.mockito.Mockito._ http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala index 87c123a..fabd66b 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala @@ -25,26 +25,6 @@ import org.apache.flink.types.IntValue object Tasks { - class BlockingNoOpInvokable extends AbstractInvokable { - override def invoke(): Unit = { - val o = new Object() - o.synchronized{ - o.wait() - } - } - } - - class NoOpInvokable extends AbstractInvokable{ - override def invoke(): Unit = {} - } - - class WaitingNoOpInvokable extends AbstractInvokable{ - val waitingTime = 100L - - override def invoke(): Unit = { - Thread.sleep(waitingTime) - } - } class Sender extends AbstractInvokable{ http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java index 178656d..256b1ae 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/failingPrograms/JobSubmissionFailsITCase.java @@ -25,9 +25,9 @@ import org.apache.flink.runtime.client.JobExecutionException; import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -63,7 +63,7 @@ public class JobSubmissionFailsITCase { cluster.start(); final JobVertex jobVertex = new JobVertex("Working job vertex."); - jobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + jobVertex.setInvokableClass(NoOpInvokable.class); workingJobGraph = new JobGraph("Working testing job", jobVertex); } catch (Exception e) { @@ -113,7 +113,7 @@ public class JobSubmissionFailsITCase { public void testExceptionInInitializeOnMaster() { try { final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex"); - failingJobVertex.setInvokableClass(Tasks.NoOpInvokable.class); + failingJobVertex.setInvokableClass(NoOpInvokable.class); final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java index 49eaeb7..92b90da 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java @@ -31,10 +31,10 @@ import org.apache.flink.runtime.instance.AkkaActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerProcess; @@ -372,7 +372,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger { // BLocking JobGraph JobVertex blockingVertex = new JobVertex("Blocking vertex"); - blockingVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + blockingVertex.setInvokableClass(BlockingNoOpInvokable.class); JobGraph jobGraph = new JobGraph(blockingVertex); // Submit the job in detached mode http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java index bf39c4b..236e922 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAJobGraphRecoveryITCase.java @@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.SubmittedJobGraph; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.leaderelection.TestingListener; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; @@ -48,6 +47,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.JobManagerActorTestUtils; import org.apache.flink.runtime.testutils.JobManagerProcess; @@ -461,7 +461,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger { JobGraph jobGraph = new JobGraph("Blocking program"); JobVertex jobVertex = new JobVertex("Blocking Vertex"); - jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class); + jobVertex.setInvokableClass(BlockingNoOpInvokable.class); jobGraph.addVertex(jobVertex); http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala index 1b2838d..258282e 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/jobmanager/JobManagerFailsITCase.scala @@ -23,7 +23,7 @@ import akka.testkit.{ImplicitSender, TestKit} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.jobgraph.{JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, NoOpInvokable} +import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.Messages.Acknowledge import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered http://git-wip-us.apache.org/repos/asf/flink/blob/da16b0a7/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala index 3b39b3f..e141cc2 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/taskmanager/TaskManagerFailsITCase.scala @@ -25,7 +25,8 @@ import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex} -import org.apache.flink.runtime.jobmanager.Tasks.{BlockingNoOpInvokable, BlockingReceiver, NoOpInvokable, Sender} +import org.apache.flink.runtime.jobmanager.Tasks.{BlockingReceiver, Sender} +import org.apache.flink.runtime.testtasks.{BlockingNoOpInvokable, NoOpInvokable} import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{NotifyWhenRegisteredAtJobManager, RegisteredAtJobManager} import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages._