metaswirl commented on a change in pull request #18637:
URL: https://github.com/apache/flink/pull/18637#discussion_r800795884



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleanerTest.java
##########
@@ -181,33 +212,97 @@ public void 
testMediumPriorityCleanupBlocksAllLowerPrioritizedCleanups() {
         assertThat(noPriorityCleanup1.isDone()).isTrue();
     }
 
+    @Test
+    public void testCleanupWithRetries() {
+        final Collection<JobID> actualJobIds = new ArrayList<>();
+        final CleanupCallback cleanupWithRetries = 
cleanupWithRetry(actualJobIds, 2);
+        final SingleCallCleanup oneRunCleanup = 
SingleCallCleanup.withCompletionOnCleanup();
+
+        final CompletableFuture<Void> compositeCleanupResult =
+                createTestInstanceBuilder(new FixedRetryStrategy(2, 
Duration.ZERO))
+                        .withRegularCleanup(cleanupWithRetries)
+                        .withRegularCleanup(oneRunCleanup)
+                        .build()
+                        .cleanupAsync(JOB_ID);
+
+        
assertThat(compositeCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+        assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+        assertThat(oneRunCleanup.isDone()).isTrue();
+        assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID, JOB_ID);
+    }
+
+    @Test
+    public void testCleanupWithSingleRetryInHighPriorityTask() {
+        final Collection<JobID> actualJobIds = new ArrayList<>();
+        final CleanupCallback cleanupWithRetry = 
cleanupWithRetry(actualJobIds, 1);
+        final SingleCallCleanup oneRunCleanup = 
SingleCallCleanup.withCompletionOnCleanup();
+
+        final CompletableFuture<Void> compositeCleanupResult =
+                createTestInstanceBuilder(new FixedRetryStrategy(1, 
Duration.ZERO))
+                        .withPrioritizedCleanup(cleanupWithRetry)
+                        .withRegularCleanup(oneRunCleanup)
+                        .build()
+                        .cleanupAsync(JOB_ID);
+
+        
assertThat(compositeCleanupResult).succeedsWithin(Duration.ofMillis(100));
+
+        assertThat(oneRunCleanup.getProcessedJobId()).isEqualTo(JOB_ID);
+        assertThat(oneRunCleanup.isDone()).isTrue();
+        assertThat(actualJobIds).containsExactly(JOB_ID, JOB_ID);
+    }
+
     private static DefaultResourceCleaner.Builder<CleanupCallback> 
createTestInstanceBuilder() {
+        return createTestInstanceBuilder(new FixedRetryStrategy(0, 
Duration.ZERO));
+    }
+
+    private static DefaultResourceCleaner.Builder<CleanupCallback> 
createTestInstanceBuilder(
+            RetryStrategy retryStrategy) {
         return DefaultResourceCleaner.forCleanableResources(
                 ComponentMainThreadExecutorServiceAdapter.forMainThread(),
                 EXECUTOR,
-                CleanupCallback::cleanup);
+                CleanupCallback::apply,
+                retryStrategy);
+    }
+
+    private static CleanupCallback cleanupWithRetry(
+            Collection<JobID> actualJobIds, int numberOfFailureRuns) {
+        final AtomicInteger failureRunCount = new 
AtomicInteger(numberOfFailureRuns);
+        return (actualJobId, executor) -> {
+            actualJobIds.add(actualJobId);
+            if (failureRunCount.getAndDecrement() > 0) {
+                return FutureUtils.completedExceptionally(
+                        new RuntimeException("Expected RuntimeException"));
+            }
+
+            return FutureUtils.completedVoidFuture();
+        };
+    }
+
+    private interface CleanupCallback extends BiFunction<JobID, Executor, 
CompletableFuture<Void>> {
+        // empty interface to remove necessity use generics all the time
     }
 
-    private static class CleanupCallback {
+    private static class SingleCallCleanup implements CleanupCallback {
 
         private final CompletableFuture<Void> resultFuture = new 
CompletableFuture<>();
         private JobID jobId;
 
         private final Consumer<CompletableFuture<Void>> internalFunction;
 
-        public static CleanupCallback withCompletionOnCleanup() {
-            return new CleanupCallback(resultFuture -> 
resultFuture.complete(null));
+        public static SingleCallCleanup withCompletionOnCleanup() {
+            return new SingleCallCleanup(resultFuture -> 
resultFuture.complete(null));
         }
 
-        public static CleanupCallback withoutCompletionOnCleanup() {
-            return new CleanupCallback(ignoredResultFuture -> {});
+        public static SingleCallCleanup withoutCompletionOnCleanup() {
+            return new SingleCallCleanup(ignoredResultFuture -> {});
         }
 
-        private CleanupCallback(Consumer<CompletableFuture<Void>> 
internalFunction) {
+        private SingleCallCleanup(Consumer<CompletableFuture<Void>> 
internalFunction) {
             this.internalFunction = internalFunction;
         }
 
-        public CompletableFuture<Void> cleanup(JobID jobId, Executor executor) 
{
+        public CompletableFuture<Void> apply(JobID jobId, Executor executor) {
             Preconditions.checkState(this.jobId == null);
             this.jobId = jobId;

Review comment:
       Would it make sense to wrap this in a synchronized, to avoid a race 
condition?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to