metaswirl commented on a change in pull request #18637:
URL: https://github.com/apache/flink/pull/18637#discussion_r801395392
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -114,32 +137,37 @@ private DefaultResourceCleaner(
Executor cleanupExecutor,
CleanupFn<T> cleanupFn,
Collection<T> prioritizedCleanup,
- Collection<T> regularCleanup) {
+ Collection<T> regularCleanup,
+ RetryStrategy retryStrategy) {
this.mainThreadExecutor = mainThreadExecutor;
this.cleanupExecutor = cleanupExecutor;
this.cleanupFn = cleanupFn;
this.prioritizedCleanup = prioritizedCleanup;
this.regularCleanup = regularCleanup;
+ this.retryStrategy = retryStrategy;
}
@Override
public CompletableFuture<Void> cleanupAsync(JobID jobId) {
mainThreadExecutor.assertRunningInMainThread();
+
CompletableFuture<Void> cleanupFuture =
FutureUtils.completedVoidFuture();
for (T cleanup : prioritizedCleanup) {
- cleanupFuture =
- cleanupFuture.thenCompose(
- ignoredValue ->
- cleanupFn.cleanupAsync(cleanup, jobId,
cleanupExecutor));
+ cleanupFuture = cleanupFuture.thenCompose(ignoredValue ->
withRetry(jobId, cleanup));
}
+
return cleanupFuture.thenCompose(
ignoredValue ->
FutureUtils.completeAll(
regularCleanup.stream()
- .map(
- cleanup ->
- cleanupFn.cleanupAsync(
- cleanup,
jobId, cleanupExecutor))
+ .map(cleanup -> withRetry(jobId,
cleanup))
.collect(Collectors.toList())));
}
+
+ private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {
Review comment:
minor: Shouldn't this be called "cleanupWithRetry"?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
##########
@@ -173,41 +205,108 @@ public void
testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
lowerThanHighPriorityCleanup.completeCleanup();
-
assertThat(overallCleanupResult).succeedsWithin(Duration.ofMillis(100));
+ assertThat(overallCleanupResult).succeedsWithin(Duration.ZERO);
assertThat(highPriorityCleanup.isDone()).isTrue();
assertThat(lowerThanHighPriorityCleanup.isDone()).isTrue();
assertThat(noPriorityCleanup0.isDone()).isTrue();
assertThat(noPriorityCleanup1.isDone()).isTrue();
}
+ @Test
+ public void testCleanupWithRetries() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetries =
cleanupWithRetry(actualJobIds, 2);
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+
createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(2))
+ .withRegularCleanup(cleanupWithRetries)
+ .withRegularCleanup(oneRunCleanup)
+ .build()
+ .cleanupAsync(JOB_ID);
+
+
assertThat(compositeCleanupResult).succeedsWithin(TIMEOUT_FOR_RUNS_WITH_RETRY);
+
+ assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+ assertThat(oneRunCleanup.isDone()).isTrue();
+ assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID, JOB_ID);
+ }
+
+ @Test
+ public void testCleanupWithSingleRetryInHighPriorityTask() {
+ final Collection<JobID> actualJobIds = new ArrayList<>();
+ final CleanupCallback cleanupWithRetry =
cleanupWithRetry(actualJobIds, 1);
+ final CleanupCallback oneRunHigherPriorityCleanup =
+ SingleCallCleanup.withCompletionOnCleanup();
+ final SingleCallCleanup oneRunCleanup =
SingleCallCleanup.withCompletionOnCleanup();
+
+ final CompletableFuture<Void> compositeCleanupResult =
+
createTestInstanceBuilder(TestingRetryStrategies.createWithNumberOfRetries(1))
+ .withPrioritizedCleanup(cleanupWithRetry)
+ .withPrioritizedCleanup(oneRunHigherPriorityCleanup)
+ .withRegularCleanup(oneRunCleanup)
+ .build()
+ .cleanupAsync(JOB_ID);
+
+
assertThat(compositeCleanupResult).succeedsWithin(TIMEOUT_FOR_RUNS_WITH_RETRY);
+
+ assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+ assertThat(oneRunCleanup.isDone()).isTrue();
+ assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID);
+ }
+
private static DefaultResourceCleaner.Builder<CleanupCallback>
createTestInstanceBuilder() {
+ return
createTestInstanceBuilder(TestingRetryStrategies.NO_RETRY_STRATEGY);
+ }
+
+ private static DefaultResourceCleaner.Builder<CleanupCallback>
createTestInstanceBuilder(
+ RetryStrategy retryStrategy) {
return DefaultResourceCleaner.forCleanableResources(
ComponentMainThreadExecutorServiceAdapter.forMainThread(),
EXECUTOR,
- CleanupCallback::cleanup);
+ CleanupCallback::apply,
+ retryStrategy);
+ }
+
+ private static CleanupCallback cleanupWithRetry(
Review comment:
This code is not responsible for the retries, right? So maybe we could
name it `cleanupWithInitialFailures` or `cleanupWithMultipleFailures`,
`cleanupWithFailuresBeforeSuccess`
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
##########
@@ -110,70 +113,117 @@ public void tearDown() {
}
@Test
- public void
testRecoverFromCheckpointAfterJobGraphRemovalOfTerminatedJobFailed()
- throws Exception {
+ public void testCleanupThroughRetries() throws Exception {
final JobGraph jobGraph = createJobGraph();
final JobID jobId = jobGraph.getJobID();
- // Construct job graph store.
- final Error temporaryError = new Error("Unable to remove job graph.");
- final AtomicReference<? extends Error> temporaryErrorRef =
- new AtomicReference<>(temporaryError);
- final TestingJobGraphStore jobGraphStore =
- TestingJobGraphStore.newBuilder()
- .setGlobalCleanupFunction(
- (ignoredJobId, ignoredExecutor) -> {
- final Error error =
temporaryErrorRef.getAndSet(null);
- if (error != null) {
- throw error;
- }
+ // JobGraphStore
+ final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+ final OneShotLatch successfulCleanupLatch = new OneShotLatch();
+ final int numberOfErrors = 5;
+ final RuntimeException temporaryError =
+ new RuntimeException("Expected RuntimeException: Unable to
remove job graph.");
+ final JobGraphStore jobGraphStore =
+ createAndStartJobGraphStoreWithCleanupFailures(
+ numberOfErrors,
+ temporaryError,
+ actualGlobalCleanupCallCount,
+ successfulCleanupLatch);
+ haServices.setJobGraphStore(jobGraphStore);
- return FutureUtils.completedVoidFuture();
- })
+ // Construct leader election service.
+ final TestingLeaderElectionService leaderElectionService =
+ new TestingLeaderElectionService();
+ haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
+
+ // start the dispatcher with enough retries on cleanup
+ final JobManagerRunnerRegistry jobManagerRunnerRegistry =
+ new DefaultJobManagerRunnerRegistry(2);
+ final Dispatcher dispatcher =
+ createTestingDispatcherBuilder()
+ .setResourceCleanerFactory(
+ new DispatcherResourceCleanerFactory(
+ ForkJoinPool.commonPool(),
+
TestingRetryStrategies.createWithNumberOfRetries(
+ numberOfErrors),
+ jobManagerRunnerRegistry,
+ haServices.getJobGraphStore(),
+ blobServer,
+ haServices,
+ UnregisteredMetricGroups
+
.createUnregisteredJobManagerMetricGroup()))
.build();
- jobGraphStore.start(null);
+ dispatcher.start();
+
+ toTerminate.add(dispatcher);
+ leaderElectionService.isLeader(UUID.randomUUID());
+ final DispatcherGateway dispatcherGateway =
+ dispatcher.getSelfGateway(DispatcherGateway.class);
+ dispatcherGateway.submitJob(jobGraph, TIMEOUT).get();
+
+ waitForJobToFinish(leaderElectionService, dispatcherGateway, jobId);
+
+ successfulCleanupLatch.await();
+
+ assertThat(actualGlobalCleanupCallCount.get(), equalTo(numberOfErrors
+ 1));
+
+ assertThat(
+ "The JobGraph should be removed from JobGraphStore.",
+ haServices.getJobGraphStore().getJobIds(),
+ IsEmptyCollection.empty());
+ assertThat(
+ "The JobResultStore has this job marked as clean.",
+ haServices.getJobResultStore().hasCleanJobResultEntry(jobId),
+ CoreMatchers.is(true));
+ }
+
+ @Test
+ public void testCleanupAfterLeadershipChange() throws Exception {
+ final JobGraph jobGraph = createJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ // Construct job graph store.
+ final AtomicInteger actualGlobalCleanupCallCount = new AtomicInteger();
+ final OneShotLatch successfulCleanupLatch = new OneShotLatch();
+ final RuntimeException temporaryError = new RuntimeException("Unable
to remove job graph.");
+ final JobGraphStore jobGraphStore =
+ createAndStartJobGraphStoreWithCleanupFailures(
+ 1, temporaryError, actualGlobalCleanupCallCount,
successfulCleanupLatch);
haServices.setJobGraphStore(jobGraphStore);
// Construct leader election service.
final TestingLeaderElectionService leaderElectionService =
new TestingLeaderElectionService();
haServices.setJobMasterLeaderElectionService(jobId,
leaderElectionService);
- // Start the first dispatcher and submit the job.
+ // start the dispatcher with no retries on cleanup
final CountDownLatch jobGraphRemovalErrorReceived = new
CountDownLatch(1);
final Dispatcher dispatcher =
- createRecoveredDispatcher(
- throwable -> {
- final Optional<Error> maybeError =
- ExceptionUtils.findThrowable(throwable,
Error.class);
- if (maybeError.isPresent() &&
temporaryError.equals(maybeError.get())) {
- jobGraphRemovalErrorReceived.countDown();
- } else {
- testingFatalErrorHandlerResource
- .getFatalErrorHandler()
- .onFatalError(throwable);
- }
- });
+ createTestingDispatcherBuilder()
+ .setFatalErrorHandler(
+ throwable -> {
+ final Optional<Throwable> maybeError =
+ ExceptionUtils.findThrowable(
+ throwable,
temporaryError::equals);
+ if (maybeError.isPresent()
+ &&
temporaryError.equals(maybeError.get())) {
Review comment:
```suggestion
ExceptionUtils.findThrowable(
throwable,
temporaryError::equals);
if (maybeError.isPresent()) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]