This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f85dfbb [FLINK-18290][checkpointing] Don't System.exit on
CheckpointCoordinator failure if it is shut down
f85dfbb is described below
commit f85dfbb12dd7845a0e87a1aef23edefc20365058
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 17 12:32:12 2020 +0200
[FLINK-18290][checkpointing] Don't System.exit on CheckpointCoordinator
failure if it is shut down
---
.../flink/runtime/checkpoint/CheckpointCoordinator.java | 13 ++++++++++++-
1 file changed, 12 insertions(+), 1 deletion(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d535591..d44a382 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -71,6 +71,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
@@ -582,7 +583,17 @@ public class CheckpointCoordinator {
return null;
},
- timer));
+ timer)
+ .exceptionally(error -> {
+ if (!isShutdown()) {
+ throw new
CompletionException(error);
+ } else if (error instanceof
RejectedExecutionException) {
+ LOG.debug("Execution
rejected during shutdown");
+ } else {
+ LOG.warn("Error
encountered during shutdown", error);
+ }
+ return null;
+ }));
} catch (Throwable throwable) {
onTriggerFailure(request, throwable);
}