This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new d89baa3  [FLINK-18421][checkpointing][tests] Fix logging of 
RejectedExecutionException during CheckpointCoordinator shutdown
d89baa3 is described below

commit d89baa37dd85f99153c3aa39f0989db883c0e798
Author: Roman Khachatryan <[email protected]>
AuthorDate: Wed Jul 22 18:01:17 2020 +0200

    [FLINK-18421][checkpointing][tests] Fix logging of 
RejectedExecutionException during CheckpointCoordinator shutdown
---
 .../org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index d44a382..76f9efd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -78,6 +78,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
+import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -587,7 +588,7 @@ public class CheckpointCoordinator {
                                        .exceptionally(error -> {
                                                if (!isShutdown()) {
                                                        throw new 
CompletionException(error);
-                                               } else if (error instanceof 
RejectedExecutionException) {
+                                               } else if (findThrowable(error, 
RejectedExecutionException.class).isPresent()) {
                                                        LOG.debug("Execution 
rejected during shutdown");
                                                } else {
                                                        LOG.warn("Error 
encountered during shutdown", error);
@@ -1745,7 +1746,7 @@ public class CheckpointCoordinator {
                CheckpointFailureReason defaultReason, Throwable throwable) {
 
                final Optional<CheckpointException> checkpointExceptionOptional 
=
-                       ExceptionUtils.findThrowable(throwable, 
CheckpointException.class);
+                       findThrowable(throwable, CheckpointException.class);
                return checkpointExceptionOptional
                        .orElseGet(() -> new CheckpointException(defaultReason, 
throwable));
        }

Reply via email to