tillrohrmann commented on a change in pull request #15561:
URL: https://github.com/apache/flink/pull/15561#discussion_r629951907
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -770,6 +770,14 @@ private void cleanUpJobData(JobID jobId, boolean
cleanupHA) {
jobId,
e);
}
+
+ try {
+ highAvailabilityServices.cleanupJobData(jobId);
+ } catch (Exception e) {
+ log.warn(
+ "Could not properly clean data for job {} stored by ha
services", jobId, e);
+ }
Review comment:
I think we mustn't clean up the job data if failed to delete the
`JobGraph` from the `jobGraphWriter`. The problem is that we might lose
pointers to some checkpoints while still trying to resume a `Job` in case of a
failover.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
##########
@@ -313,6 +314,15 @@ public static ZooKeeperLeaderElectionDriverFactory
createLeaderElectionDriverFac
return new ZooKeeperLeaderElectionDriverFactory(client, latchPath,
leaderPath);
}
+ public static List<String> getLeaderPathsForJob(
+ final Configuration configuration, final String pathSuffix) {
+ return Arrays.asList(
+
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH)
+ + pathSuffix,
+
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH)
+ + pathSuffix);
Review comment:
What about the checkpoints ZNodes under
`HighAvailabilityOptions.HA_ZOOKEEPER_CHECKPOINTS_PATH/jobId`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.java
##########
@@ -456,6 +459,31 @@ public void
testDuplicateJobSubmissionDoesNotDeleteJobMetaData() throws Exceptio
assertThatHABlobsHaveBeenRemoved();
}
+ @Test
+ public void testHaDataCleanupWhenJobFinished() throws Exception {
+ TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
+ TestingJobManagerRunner jobManagerRunner =
+ jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ finishJob(jobManagerRunner);
+ JobID jobID = cleanupJobHADataFuture.get(2000, TimeUnit.MILLISECONDS);
+ assertThat(jobID, is(this.jobId));
+ }
+
+ @Test
+ public void testHaDataCleanupWhenJobNotFinished() throws Exception {
+ TestingJobManagerRunnerFactory jobManagerRunnerFactory =
startDispatcherAndSubmitJob();
+ TestingJobManagerRunner jobManagerRunner =
+ jobManagerRunnerFactory.takeCreatedJobManagerRunner();
+ jobManagerRunner.completeResultFutureExceptionally(new
JobNotFinishedException(jobId));
+ try {
+ cleanupJobHADataFuture.get(50L, TimeUnit.MILLISECONDS);
Review comment:
```suggestion
cleanupJobHADataFuture.get(10L, TimeUnit.MILLISECONDS);
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/AbstractHaServicesTest.java
##########
@@ -106,6 +112,29 @@ public void
testCloseAndCleanupAllDataDoesNotDeleteBlobsIfCleaningUpHADataFails(
assertThat(closeOperations, contains(CloseOperations.HA_CLOSE,
CloseOperations.BLOB_CLOSE));
}
+ @Test
+ public void testCleanupJobData() throws Exception {
+ final Queue<CloseOperations> closeOperations = new ArrayDeque<>(3);
+ final TestingBlobStoreService testingBlobStoreService =
+ new TestingBlobStoreService(closeOperations);
+
+ JobID jobID = new JobID();
+ CompletableFuture<JobID> jobCleanupFuture = new CompletableFuture<>();
+
+ final TestingHaServices haServices =
+ new TestingHaServices(
+ new Configuration(),
+ Executors.directExecutor(),
+ testingBlobStoreService,
+ closeOperations,
+ () -> {},
+ jobCleanupFuture::complete);
+
+ haServices.cleanupJobData(jobID);
+ JobID jobIDCleaned = jobCleanupFuture.get(2000, TimeUnit.MILLISECONDS);
Review comment:
```suggestion
JobID jobIDCleaned = jobCleanupFuture.get();
```
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServicesTest.java
##########
@@ -148,6 +149,46 @@ public void testCloseAndCleanupAllDataWithUncle() throws
Exception {
assertThat(client.checkExists().forPath(unclePath),
is(notNullValue()));
}
+ /** Tests that the ZooKeeperHaServices cleans up paths for job manager. */
+ @Test
+ public void testCleanupJobData() throws Exception {
+ String rootPath = "/foo/bar/flink";
+ final Configuration configuration = createConfiguration(rootPath);
+ String namespace =
configuration.getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
+ String latchFullPath =
+ rootPath
+ + namespace
+ +
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LATCH_PATH);
+ String leaderFullPath =
+ rootPath
+ + namespace
+ +
configuration.getString(HighAvailabilityOptions.HA_ZOOKEEPER_LEADER_PATH);
+
+ final TestingBlobStoreService blobStoreService = new
TestingBlobStoreService();
+
+ JobID jobID = new JobID();
+ runCleanupTestWithJob(
+ configuration,
+ blobStoreService,
+ jobID,
+ haServices -> {
+ List<String> latchChildrenBefore =
client.getChildren().forPath(latchFullPath);
+ List<String> leaderChildrenBefore =
+ client.getChildren().forPath(leaderFullPath);
+
+ haServices.cleanupJobData(jobID);
+
+ List<String> latchChildrenAfter =
client.getChildren().forPath(latchFullPath);
+ List<String> leaderChildrenAfter =
client.getChildren().forPath(leaderFullPath);
+
+ latchChildrenBefore.removeAll(latchChildrenAfter);
+ leaderChildrenBefore.removeAll(leaderChildrenAfter);
+
+ assertThat(latchChildrenBefore,
contains(jobID.toString()));
+ assertThat(leaderChildrenBefore,
contains(jobID.toString()));
Review comment:
I needed a bit to understand this logic. Could we make this a bit more
explicit. E.g. asserting that `*ChildrenBefore` contains `JobID` and
`*ChildrenAfter` does not contain `JobID`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]