Repository: flink
Updated Branches:
  refs/heads/master 094b747a3 -> ba2d007e5


[FLINK-4972] Fix CoordinatorShutdownTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/99f1dc3e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/99f1dc3e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/99f1dc3e

Branch: refs/heads/master
Commit: 99f1dc3e24ec529852ce38bcb9c46ffaf749333d
Parents: 094b747
Author: zentol <[email protected]>
Authored: Mon Oct 31 14:15:38 2016 +0100
Committer: zentol <[email protected]>
Committed: Mon Oct 31 15:12:01 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CoordinatorShutdownTest.java     | 79 +++++++++++++++++---
 1 file changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/99f1dc3e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 0b2f4f3..777ba9b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -51,14 +51,15 @@ public class CoordinatorShutdownTest {
        public void testCoordinatorShutsDownOnFailure() {
                LocalFlinkMiniCluster cluster = null;
                try {
-                       Configuration noTaskManagerConfig = new Configuration();
-                       
noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 0);
-                       cluster = new 
LocalFlinkMiniCluster(noTaskManagerConfig, true);
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       cluster = new LocalFlinkMiniCluster(config, true);
                        cluster.start();
                        
                        // build a test graph with snapshotting enabled
                        JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       
vertex.setInvokableClass(FailingBlockingInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
                        
                        JobGraph testGraph = new JobGraph("test job", vertex);
@@ -72,11 +73,11 @@ public class CoordinatorShutdownTest {
                                        testGraph,
                                        ListeningBehaviour.EXECUTION_RESULT);
                        
-                       // submit is successful, but then the job dies because 
no TaskManager / slot is available
+                       // submit is successful, but then the job blocks due to 
the invokable
                        Future<Object> submitFuture = 
jmGateway.ask(submitMessage, timeout);
                        Await.result(submitFuture, timeout);
 
-                       // get the execution graph and make sure the 
coordinator is properly shut down
+                       // get the execution graph and store the ExecutionGraph 
reference
                        Future<Object> jobRequestFuture = jmGateway.ask(
                                        new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
                                        timeout);
@@ -84,8 +85,12 @@ public class CoordinatorShutdownTest {
                        ExecutionGraph graph = 
(ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, 
timeout)).executionGraph();
                        
                        assertNotNull(graph);
+
+                       FailingBlockingInvokable.unblock();
+
                        graph.waitUntilFinished();
                        
+                       // verify that the coordinator was shut down
                        CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
                        assertTrue(coord == null || coord.isShutdown());
                }
@@ -105,12 +110,15 @@ public class CoordinatorShutdownTest {
        public void testCoordinatorShutsDownOnSuccess() {
                LocalFlinkMiniCluster cluster = null;
                try {
-                       cluster = new LocalFlinkMiniCluster(new 
Configuration(), true);
+                       Configuration config = new Configuration();
+                       
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+                       
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+                       cluster = new LocalFlinkMiniCluster(config, true);
                        cluster.start();
                        
                        // build a test graph with snapshotting enabled
                        JobVertex vertex = new JobVertex("Test Vertex");
-                       vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+                       vertex.setInvokableClass(BlockingInvokable.class);
                        List<JobVertexID> vertexIdList = 
Collections.singletonList(vertex.getID());
 
                        JobGraph testGraph = new JobGraph("test job", vertex);
@@ -124,11 +132,11 @@ public class CoordinatorShutdownTest {
                                        testGraph,
                                        ListeningBehaviour.EXECUTION_RESULT);
 
-                       // submit is successful, but then the job dies because 
no TaskManager / slot is available
+                       // submit is successful, but then the job blocks due to 
the invokable
                        Future<Object> submitFuture = 
jmGateway.ask(submitMessage, timeout);
                        Await.result(submitFuture, timeout);
 
-                       // get the execution graph and make sure the 
coordinator is properly shut down
+                       // get the execution graph and store the ExecutionGraph 
reference
                        Future<Object> jobRequestFuture = jmGateway.ask(
                                        new 
JobManagerMessages.RequestJob(testGraph.getJobID()),
                                        timeout);
@@ -136,8 +144,12 @@ public class CoordinatorShutdownTest {
                        ExecutionGraph graph = 
(ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, 
timeout)).executionGraph();
 
                        assertNotNull(graph);
+
+                       BlockingInvokable.unblock();
+                       
                        graph.waitUntilFinished();
 
+                       // verify that the coordinator was shut down
                        CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
                        assertTrue(coord == null || coord.isShutdown());
                }
@@ -152,4 +164,49 @@ public class CoordinatorShutdownTest {
                        }
                }
        }
+
+       public static class BlockingInvokable extends AbstractInvokable {
+               private static boolean blocking = true;
+               private static final Object lock = new Object();
+
+               @Override
+               public void invoke() throws Exception {
+                       while (blocking) {
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+               }
+
+               public static void unblock() {
+                       blocking = false;
+
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
+
+       public static class FailingBlockingInvokable extends AbstractInvokable {
+               private static boolean blocking = true;
+               private static final Object lock = new Object();
+
+               @Override
+               public void invoke() throws Exception {
+                       while (blocking) {
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       }
+                       throw new RuntimeException("This exception is 
expected.");
+               }
+
+               public static void unblock() {
+                       blocking = false;
+
+                       synchronized (lock) {
+                               lock.notifyAll();
+                       }
+               }
+       }
 }

Reply via email to