tillrohrmann commented on a change in pull request #15561:
URL: https://github.com/apache/flink/pull/15561#discussion_r629951907



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -770,6 +770,14 @@ private void cleanUpJobData(JobID jobId, boolean 
cleanupHA) {
                         jobId,
                         e);
             }
+
+            try {
+                highAvailabilityServices.cleanupJobData(jobId);
+            } catch (Exception e) {
+                log.warn(
+                        "Could not properly clean data for job {} stored by ha 
services", jobId, e);
+            }

Review comment:
       I think we mustn't clean up the job data if failed to delete the 
`JobGraph` from the `jobGraphWriter`. The problem is that we might lose 
pointers to some checkpoints while still trying to resume a `Job` in case of a 
failover.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -313,6 +314,15 @@ public static ZooKeeperLeaderElectionDriverFactory 
createLeaderElectionDriverFac
         return new ZooKeeperLeaderElectionDriverFactory(client, latchPath, 
leaderPath);
     }
 
+    public static List<String> getLeaderPathsForJob(
+            final Configuration configuration, final String pathSuffix) {
+        return Arrays.asList(
+                
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH)
+                        + pathSuffix,
+                
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
+                        + pathSuffix);

Review comment:
       What about the checkpoints ZNodes under 
`HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH/jobId`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -456,6 +459,31 @@ public void 
testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio
         assertThatHABlobsHaveBeenRemoved();
     }
 
+    @Test
+    public void testHaDataCleanupWhenJobFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = 
startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        finishJob(jobManagerRunner);
+        JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
+        assertThat(jobID, is(this.jobId));
+    }
+
+    @Test
+    public void testHaDataCleanupWhenJobNotFinished() throws Exception {
+        TestingJobManagerRunnerFactory jobManagerRunnerFactory = 
startDispatcherAndSubmitJob();
+        TestingJobManagerRunner jobManagerRunner =
+                jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+        jobManagerRunner.completeResultFutureExceptionally(new 
JobNotFinishedException(jobId));
+        try {
+            cleanupJobHADataFuture.get(50L, TimeUnit.MILLISECONDS);

Review comment:
       ```suggestion
               cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
##########
@@ -106,6 +112,29 @@ public void 
testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails(
         assertThat(closeOperations, contains(CloseOperations.HA_CLOSE, 
CloseOperations.BLOB_CLOSE));
     }
 
+    @Test
+    public void testCleanupJobData() throws Exception {
+        final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+        final TestingBlobStoreService testingBlobStoreService =
+                new TestingBlobStoreService(closeOperations);
+
+        JobID jobID = new JobID();
+        CompletableFuture<JobID> jobCleanupFuture = new CompletableFuture<>();
+
+        final TestingHaServices haServices =
+                new TestingHaServices(
+                        new Configuration(),
+                        Executors.directExecutor(),
+                        testingBlobStoreService,
+                        closeOperations,
+                        () -> {},
+                        jobCleanupFuture::complete);
+
+        haServices.cleanupJobData(jobID);
+        JobID jobIDCleaned = jobCleanupFuture.get(2000, TimeUnit.MILLISECONDS);

Review comment:
       ```suggestion
           JobID jobIDCleaned = jobCleanupFuture.get();
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
##########
@@ -148,6 +149,46 @@ public void testCloseAndCleanupAllDataWithUncle() throws 
Exception {
         assertThat(client.checkExists().forPath(unclePath), 
is(notNullValue()));
     }
 
+    /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */
+    @Test
+    public void testCleanupJobData() throws Exception {
+        String rootPath = "/foo/bar/flink";
+        final Configuration configuration = createConfiguration(rootPath);
+        String namespace = 
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+        String latchFullPath =
+                rootPath
+                        + namespace
+                        + 
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH);
+        String leaderFullPath =
+                rootPath
+                        + namespace
+                        + 
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
+
+        final TestingBlobStoreService blobStoreService = new 
TestingBlobStoreService();
+
+        JobID jobID = new JobID();
+        runCleanupTestWithJob(
+                configuration,
+                blobStoreService,
+                jobID,
+                haServices -> {
+                    List<String> latchChildrenBefore = 
client.getChildren().forPath(latchFullPath);
+                    List<String> leaderChildrenBefore =
+                            client.getChildren().forPath(leaderFullPath);
+
+                    haServices.cleanupJobData(jobID);
+
+                    List<String> latchChildrenAfter = 
client.getChildren().forPath(latchFullPath);
+                    List<String> leaderChildrenAfter = 
client.getChildren().forPath(leaderFullPath);
+
+                    latchChildrenBefore.removeAll(latchChildrenAfter);
+                    leaderChildrenBefore.removeAll(leaderChildrenAfter);
+
+                    assertThat(latchChildrenBefore, 
contains(jobID.toString()));
+                    assertThat(leaderChildrenBefore, 
contains(jobID.toString()));

Review comment:
       I needed a bit to understand this logic. Could we make this a bit more 
explicit. E.g. asserting that `*ChildrenBefore` contains `JobID` and 
`*ChildrenAfter` does not contain `JobID`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to