This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f85dfbb  [FLINK-18290][checkpointing] Don't System.exit on 
CheckpointCoordinator failure if it is shut down
f85dfbb is described below

commit f85dfbb12dd7845a0e87a1aef23edefc20365058
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jun 17 12:32:12 2020 +0200

    [FLINK-18290][checkpointing] Don't System.exit on CheckpointCoordinator 
failure if it is shut down
---
 .../flink/runtime/checkpoint/CheckpointCoordinator.java     | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d535591..d44a382 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -71,6 +71,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -582,7 +583,17 @@ public class CheckpointCoordinator {
 
                                                        return null;
                                                },
-                                               timer));
+                                               timer)
+                                       .exceptionally(error -> {
+                                               if (!isShutdown()) {
+                                                       throw new 
CompletionException(error);
+                                               } else if (error instanceof 
RejectedExecutionException) {
+                                                       LOG.debug("Execution 
rejected during shutdown");
+                                               } else {
+                                                       LOG.warn("Error 
encountered during shutdown", error);
+                                               }
+                                               return null;
+                                       }));
                } catch (Throwable throwable) {
                        onTriggerFailure(request, throwable);
                }

Reply via email to