metaswirl commented on a change in pull request #18637:
URL: https://github.com/apache/flink/pull/18637#discussion_r801502009



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -110,70 +113,117 @@ public void tearDown() {
     }
 
     @Test
-    public void 
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
-            throws Exception {
+    public void testCleanupThroughRetries() throws Exception {
         final JobGraph jobGraph = createJobGraph();
         final JobID jobId = jobGraph.getJobID();
 
-        // Construct job graph store.
-        final Error temporaryError = new Error("Unable to remove job graph.");
-        final AtomicReference<? extends Error> temporaryErrorRef =
-                new AtomicReference<>(temporaryError);
-        final TestingJobGraphStore jobGraphStore =
-                TestingJobGraphStore.newBuilder()
-                        .setGlobalCleanupFunction(
-                                (ignoredJobId, ignoredExecutor) -> {
-                                    final Error error = 
temporaryErrorRef.getAndSet(null);
-                                    if (error != null) {
-                                        throw error;
-                                    }
+        // JobGraphStore
+        final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+        final OneShotLatch successfulCleanupLatch = new OneShotLatch();
+        final int numberOfErrors = 5;
+        final RuntimeException temporaryError =
+                new RuntimeException("Expected RuntimeException: Unable to 
remove job graph.");
+        final JobGraphStore jobGraphStore =
+                createAndStartJobGraphStoreWithCleanupFailures(
+                        numberOfErrors,
+                        temporaryError,
+                        actualGlobalCleanupCallCount,
+                        successfulCleanupLatch);
+        haServices.setJobGraphStore(jobGraphStore);
 
-                                    return FutureUtils.completedVoidFuture();
-                                })
+        // Construct leader election service.
+        final TestingLeaderElectionService leaderElectionService =
+                new TestingLeaderElectionService();
+        haServices.setJobMasterLeaderElectionService(jobId, 
leaderElectionService);
+
+        // start the dispatcher with enough retries on cleanup
+        final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+                new DefaultJobManagerRunnerRegistry(2);
+        final Dispatcher dispatcher =
+                createTestingDispatcherBuilder()
+                        .setResourceCleanerFactory(
+                                new DispatcherResourceCleanerFactory(
+                                        ForkJoinPool.commonPool(),
+                                        
TestingRetryStrategies.createWithNumberOfRetries(
+                                                numberOfErrors),
+                                        jobManagerRunnerRegistry,
+                                        haServices.getJobGraphStore(),
+                                        blobServer,
+                                        haServices,
+                                        UnregisteredMetricGroups
+                                                
.createUnregisteredJobManagerMetricGroup()))
                         .build();
-        jobGraphStore.start(null);
+        dispatcher.start();
+
+        toTerminate.add(dispatcher);
+        leaderElectionService.isLeader(UUID.randomUUID());
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+        dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+        waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
+
+        successfulCleanupLatch.await();
+
+        assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors 
+ 1));
+
+        assertThat(
+                "The JobGraph should be removed from JobGraphStore.",
+                haServices.getJobGraphStore().getJobIds(),
+                IsEmptyCollection.empty());
+        assertThat(
+                "The JobResultStore has this job marked as clean.",
+                haServices.getJobResultStore().hasCleanJobResultEntry(jobId),
+                CoreMatchers.is(true));
+    }
+
+    @Test
+    public void testCleanupAfterLeadershipChange() throws Exception {
+        final JobGraph jobGraph = createJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        // Construct job graph store.
+        final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+        final OneShotLatch successfulCleanupLatch = new OneShotLatch();
+        final RuntimeException temporaryError = new RuntimeException("Unable 
to remove job graph.");
+        final JobGraphStore jobGraphStore =
+                createAndStartJobGraphStoreWithCleanupFailures(
+                        1, temporaryError, actualGlobalCleanupCallCount, 
successfulCleanupLatch);
         haServices.setJobGraphStore(jobGraphStore);
 
         // Construct leader election service.
         final TestingLeaderElectionService leaderElectionService =
                 new TestingLeaderElectionService();
         haServices.setJobMasterLeaderElectionService(jobId, 
leaderElectionService);
 
-        // Start the first dispatcher and submit the job.
+        // start the dispatcher with no retries on cleanup
         final CountDownLatch jobGraphRemovalErrorReceived = new 
CountDownLatch(1);
         final Dispatcher dispatcher =
-                createRecoveredDispatcher(
-                        throwable -> {
-                            final Optional<Error> maybeError =
-                                    ExceptionUtils.findThrowable(throwable, 
Error.class);
-                            if (maybeError.isPresent() && 
temporaryError.equals(maybeError.get())) {
-                                jobGraphRemovalErrorReceived.countDown();
-                            } else {
-                                testingFatalErrorHandlerResource
-                                        .getFatalErrorHandler()
-                                        .onFatalError(throwable);
-                            }
-                        });
+                createTestingDispatcherBuilder()
+                        .setFatalErrorHandler(
+                                throwable -> {
+                                    final Optional<Throwable> maybeError =
+                                            ExceptionUtils.findThrowable(
+                                                    throwable, 
temporaryError::equals);
+                                    if (maybeError.isPresent()
+                                            && 
temporaryError.equals(maybeError.get())) {

Review comment:
       ```suggestion
                                               ExceptionUtils.findThrowable(
                                                       throwable, 
temporaryError::equals);
                                       if (maybeError.isPresent()) {
   ```
   
   AFAIK this is checking the exception twice.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to