rkhachatryan commented on a change in pull request #18976:
URL: https://github.com/apache/flink/pull/18976#discussion_r821943902
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -170,16 +189,24 @@ private RetriableTask(
@Override
public void run() {
- LOG.debug("starting attempt {}", current);
+ LOG.debug("starting attempt {}", attemptNumber);
if (!actionCompleted.get()) {
Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
try {
- runnable.run();
+ Result result = action.tryExecute();
if (actionCompleted.compareAndSet(false, true)) {
- LOG.debug("succeeded with {} attempts", current);
- attemptsPerTaskHistogram.update(current);
+ LOG.debug("succeeded with {} attempts", attemptNumber);
+ action.completeWithResult(result);
+ attemptsPerTaskHistogram.update(attemptNumber);
+ } else {
+ LOG.debug(
+ "discard unnecessarily uploaded state, attempt
{}", attemptNumber);
+ try {
+ action.discardResult(result);
+ } catch (Exception e) {
+ LOG.warn("unable to discard execution attempt
result", e);
+ }
}
- attemptCompleted.set(true);
} catch (Exception e) {
handleError(e);
Review comment:
Error handler will exit because `actionCompleted` is already set;
`timeoutFuture` will be cancelled.
The action is basically building result objects and completing futures with
them. This can fail in case of programming error which would manifest itself by
not completing the checkpoints.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]