tillrohrmann commented on a change in pull request #7647: 
[FLINK-11352][tests]Port JobManagerHACheckpointRecoveryITCase to new code base
URL: https://github.com/apache/flink/pull/7647#discussion_r253529750
 
 

 ##########
 File path: 
flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureRecoveryITCase.java
 ##########
 @@ -220,6 +221,187 @@ public void flatMap(Long value, Collector<Long> out) 
throws Exception {
                env.execute();
        }
 
+       /**
+        * Tests that the JobManager logs failures during recovery properly.
+        *
+        * @see <a 
href="https://issues.apache.org/jira/browse/FLINK-3185";>FLINK-3185</a>
+        */
+       @Test
+       public void testJobManagerRecoveryFailureLog() throws Exception {
+               final Time timeout = Time.seconds(30L);
+               final File zookeeperStoragePath = temporaryFolder.newFolder();
+
+               // Config
+               final int numberOfJobManagers = 2;
+               final int numberOfTaskManagers = 2;
+               final int numberOfSlotsPerTaskManager = 2;
+
+               assertEquals(PARALLELISM, numberOfTaskManagers * 
numberOfSlotsPerTaskManager);
+
+               // Job managers
+               final DispatcherProcess[] dispatcherProcesses = new 
DispatcherProcess[numberOfJobManagers];
+
+               // Task managers
+               TaskManagerRunner[] taskManagerRunners = new 
TaskManagerRunner[numberOfTaskManagers];
+
+               HighAvailabilityServices highAvailabilityServices = null;
+
+               LeaderRetrievalService leaderRetrievalService = null;
+
+               // Coordination between the processes goes through a directory
+               File coordinateTempDir = null;
+
+               // Cluster config
+               Configuration config = 
ZooKeeperTestUtils.createZooKeeperHAConfig(
+                       zooKeeper.getConnectString(), 
zookeeperStoragePath.getPath());
+               // Task manager configuration
+               config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE, "4m");
+               config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 100);
+               config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 2);
+
+               final RpcService rpcService = 
AkkaRpcServiceUtils.createRpcService("localhost", 0, config);
+
+               try {
+                       final Deadline deadline = TestTimeOut.fromNow();
+
+                       // Coordination directory
+                       coordinateTempDir = temporaryFolder.newFolder();
+
+                       // Start first process
+                       dispatcherProcesses[0] = new DispatcherProcess(0, 
config);
+                       dispatcherProcesses[0].startProcess();
+
+                       highAvailabilityServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                               config,
+                               TestingUtils.defaultExecutor());
+
+                       // Start the task manager process
+                       for (int i = 0; i < numberOfTaskManagers; i++) {
+                               taskManagerRunners[i] = new 
TaskManagerRunner(config, ResourceID.generate());
+                               taskManagerRunners[i].start();
+                       }
+
+                       // Leader listener
+                       TestingListener leaderListener = new TestingListener();
+                       leaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+                       leaderRetrievalService.start(leaderListener);
+
+                       // Initial submission
+                       
leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
+
+                       String leaderAddress = leaderListener.getAddress();
+                       UUID leaderId = leaderListener.getLeaderSessionID();
+
+                       final CompletableFuture<DispatcherGateway> 
dispatcherGatewayFuture = rpcService.connect(
+                               leaderAddress,
+                               DispatcherId.fromUuid(leaderId),
+                               DispatcherGateway.class);
+                       final DispatcherGateway dispatcherGateway = 
dispatcherGatewayFuture.get();
+
+                       // Wait for all task managers to connect to the leading 
job manager
+                       waitForTaskManagers(numberOfTaskManagers, 
dispatcherGateway, deadline.timeLeft());
+
+                       final File coordinateDirClosure = coordinateTempDir;
+                       final Throwable[] errorRef = new Throwable[1];
+
+                       // we trigger program execution in a separate thread
+                       Thread programTrigger = new Thread("Program Trigger") {
+                               @Override
+                               public void run() {
+                                       try {
+                                               
testJobManagerFailure(zooKeeper.getConnectString(), coordinateDirClosure, 
zookeeperStoragePath);
+                                       }
+                                       catch (Throwable t) {
+                                               t.printStackTrace();
+                                               errorRef[0] = t;
+                                       }
+                               }
+                       };
+
+                       //start the test program
+                       programTrigger.start();
+
+                       // wait until all marker files are in place, indicating 
that all tasks have started
+                       
AbstractTaskManagerProcessFailureRecoveryTest.waitForMarkerFiles(coordinateTempDir,
+                               READY_MARKER_FILE_PREFIX, PARALLELISM, 
deadline.timeLeft().toMillis());
+
+                       // Delete the coordinateTempDir.
+                       FileUtils.deleteDirectory(coordinateTempDir);
 
 Review comment:
   How does this directory affects the recovery? In the old test, we deleted 
the directory which contained the checkpoint data.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to