Repository: flink Updated Branches: refs/heads/master 094b747a3 -> ba2d007e5
[FLINK-4972] Fix CoordinatorShutdownTest Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f1dc3e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f1dc3e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f1dc3e Branch: refs/heads/master Commit: 99f1dc3e24ec529852ce38bcb9c46ffaf749333d Parents: 094b747 Author: zentol <[email protected]> Authored: Mon Oct 31 14:15:38 2016 +0100 Committer: zentol <[email protected]> Committed: Mon Oct 31 15:12:01 2016 +0100 ---------------------------------------------------------------------- .../checkpoint/CoordinatorShutdownTest.java | 79 +++++++++++++++++--- 1 file changed, 68 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/99f1dc3e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java index 0b2f4f3..777ba9b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java @@ -26,9 +26,9 @@ import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; -import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -51,14 +51,15 @@ public class CoordinatorShutdownTest { public void testCoordinatorShutsDownOnFailure() { LocalFlinkMiniCluster cluster = null; try { - Configuration noTaskManagerConfig = new Configuration(); - noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0); - cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + cluster = new LocalFlinkMiniCluster(config, true); cluster.start(); // build a test graph with snapshotting enabled JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setInvokableClass(Tasks.NoOpInvokable.class); + vertex.setInvokableClass(FailingBlockingInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); @@ -72,11 +73,11 @@ public class CoordinatorShutdownTest { testGraph, ListeningBehaviour.EXECUTION_RESULT); - // submit is successful, but then the job dies because no TaskManager / slot is available + // submit is successful, but then the job blocks due to the invokable Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout); Await.result(submitFuture, timeout); - // get the execution graph and make sure the coordinator is properly shut down + // get the execution graph and store the ExecutionGraph reference Future<Object> jobRequestFuture = jmGateway.ask( new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout); @@ -84,8 +85,12 @@ public class CoordinatorShutdownTest { ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); assertNotNull(graph); + + FailingBlockingInvokable.unblock(); + graph.waitUntilFinished(); + // verify that the coordinator was shut down CheckpointCoordinator coord = graph.getCheckpointCoordinator(); assertTrue(coord == null || coord.isShutdown()); } @@ -105,12 +110,15 @@ public class CoordinatorShutdownTest { public void testCoordinatorShutsDownOnSuccess() { LocalFlinkMiniCluster cluster = null; try { - cluster = new LocalFlinkMiniCluster(new Configuration(), true); + Configuration config = new Configuration(); + config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1); + config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1); + cluster = new LocalFlinkMiniCluster(config, true); cluster.start(); // build a test graph with snapshotting enabled JobVertex vertex = new JobVertex("Test Vertex"); - vertex.setInvokableClass(Tasks.NoOpInvokable.class); + vertex.setInvokableClass(BlockingInvokable.class); List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID()); JobGraph testGraph = new JobGraph("test job", vertex); @@ -124,11 +132,11 @@ public class CoordinatorShutdownTest { testGraph, ListeningBehaviour.EXECUTION_RESULT); - // submit is successful, but then the job dies because no TaskManager / slot is available + // submit is successful, but then the job blocks due to the invokable Future<Object> submitFuture = jmGateway.ask(submitMessage, timeout); Await.result(submitFuture, timeout); - // get the execution graph and make sure the coordinator is properly shut down + // get the execution graph and store the ExecutionGraph reference Future<Object> jobRequestFuture = jmGateway.ask( new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout); @@ -136,8 +144,12 @@ public class CoordinatorShutdownTest { ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph(); assertNotNull(graph); + + BlockingInvokable.unblock(); + graph.waitUntilFinished(); + // verify that the coordinator was shut down CheckpointCoordinator coord = graph.getCheckpointCoordinator(); assertTrue(coord == null || coord.isShutdown()); } @@ -152,4 +164,49 @@ public class CoordinatorShutdownTest { } } } + + public static class BlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; + private static final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while (blocking) { + synchronized (lock) { + lock.wait(); + } + } + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } + } + + public static class FailingBlockingInvokable extends AbstractInvokable { + private static boolean blocking = true; + private static final Object lock = new Object(); + + @Override + public void invoke() throws Exception { + while (blocking) { + synchronized (lock) { + lock.wait(); + } + } + throw new RuntimeException("This exception is expected."); + } + + public static void unblock() { + blocking = false; + + synchronized (lock) { + lock.notifyAll(); + } + } + } }
