XComp commented on a change in pull request #18637:
URL: https://github.com/apache/flink/pull/18637#discussion_r801690629



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/DefaultResourceCleaner.java
##########
@@ -114,32 +137,37 @@ private DefaultResourceCleaner(
             Executor cleanupExecutor,
             CleanupFn<T> cleanupFn,
             Collection<T> prioritizedCleanup,
-            Collection<T> regularCleanup) {
+            Collection<T> regularCleanup,
+            RetryStrategy retryStrategy) {
         this.mainThreadExecutor = mainThreadExecutor;
         this.cleanupExecutor = cleanupExecutor;
         this.cleanupFn = cleanupFn;
         this.prioritizedCleanup = prioritizedCleanup;
         this.regularCleanup = regularCleanup;
+        this.retryStrategy = retryStrategy;
     }
 
     @Override
     public CompletableFuture<Void> cleanupAsync(JobID jobId) {
         mainThreadExecutor.assertRunningInMainThread();
+
         CompletableFuture<Void> cleanupFuture = 
FutureUtils.completedVoidFuture();
         for (T cleanup : prioritizedCleanup) {
-            cleanupFuture =
-                    cleanupFuture.thenCompose(
-                            ignoredValue ->
-                                    cleanupFn.cleanupAsync(cleanup, jobId, 
cleanupExecutor));
+            cleanupFuture = cleanupFuture.thenCompose(ignoredValue -> 
withRetry(jobId, cleanup));
         }
+
         return cleanupFuture.thenCompose(
                 ignoredValue ->
                         FutureUtils.completeAll(
                                 regularCleanup.stream()
-                                        .map(
-                                                cleanup ->
-                                                        cleanupFn.cleanupAsync(
-                                                                cleanup, 
jobId, cleanupExecutor))
+                                        .map(cleanup -> withRetry(jobId, 
cleanup))
                                         .collect(Collectors.toList())));
     }
+
+    private CompletableFuture<Void> withRetry(JobID jobId, T cleanup) {

Review comment:
       We're always passing the cleanup as a parameter already: `.map(cleanup 
-> withRetry(jobId, cleanup))`. I feel like it doesn't add any value 
considering that the operation is passed via the `cleanup` parameter and it's a 
private method, anyway




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to