rkhachatryan commented on a change in pull request #18976:
URL: https://github.com/apache/flink/pull/18976#discussion_r821943902



##########
File path: 
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -170,16 +189,24 @@ private RetriableTask(
 
         @Override
         public void run() {
-            LOG.debug("starting attempt {}", current);
+            LOG.debug("starting attempt {}", attemptNumber);
             if (!actionCompleted.get()) {
                 Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
                 try {
-                    runnable.run();
+                    Result result = action.tryExecute();
                     if (actionCompleted.compareAndSet(false, true)) {
-                        LOG.debug("succeeded with {} attempts", current);
-                        attemptsPerTaskHistogram.update(current);
+                        LOG.debug("succeeded with {} attempts", attemptNumber);
+                        action.completeWithResult(result);
+                        attemptsPerTaskHistogram.update(attemptNumber);
+                    } else {
+                        LOG.debug(
+                                "discard unnecessarily uploaded state, attempt 
{}", attemptNumber);
+                        try {
+                            action.discardResult(result);
+                        } catch (Exception e) {
+                            LOG.warn("unable to discard execution attempt 
result", e);
+                        }
                     }
-                    attemptCompleted.set(true);
                 } catch (Exception e) {
                     handleError(e);

Review comment:
       Error handler will exit because `actionCompleted` is already set; 
`timeoutFuture` will be cancelled.
   The action is basically building result objects and completing futures with 
them. This can fail in case of programming error which would manifest itself by 
not completing the checkpoints.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to