This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32a7bb01cdcc92571cab453d9fc530cf26d1c2f6
Author: Stephan Ewen <[email protected]>
AuthorDate: Sun Nov 22 15:57:16 2020 +0100

    [hotfix][runtime] Minor reorg in ComponentClosingUtils to avoid some 
wrapping.
---
 .../coordination/ComponentClosingUtils.java        | 34 +++++++---------------
 1 file changed, 11 insertions(+), 23 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index 3249fba..29df511 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -44,17 +44,12 @@ public class ComponentClosingUtils {
         */
        public static CompletableFuture<Void> closeAsyncWithTimeout(
                        String componentName,
-                       ThrowingRunnable<Exception> closingSequence,
+                       Runnable closingSequence,
                        Duration closeTimeout) {
                return closeAsyncWithTimeout(
                                componentName,
-                               (Runnable) () -> {
-                                       try {
-                                               closingSequence.run();
-                                       } catch (Exception e) {
-                                               throw new 
ClosingException(componentName, e);
-                                       }
-                               }, closeTimeout);
+                               (ThrowingRunnable<Exception>) 
closingSequence::run,
+                               closeTimeout);
        }
 
        /**
@@ -67,16 +62,19 @@ public class ComponentClosingUtils {
         */
        public static CompletableFuture<Void> closeAsyncWithTimeout(
                        String componentName,
-                       Runnable closingSequence,
+                       ThrowingRunnable<Exception> closingSequence,
                        Duration closeTimeout) {
+
                final CompletableFuture<Void> future = new 
CompletableFuture<>();
                // Start a dedicate thread to close the component.
                final Thread t = new Thread(() -> {
-                       closingSequence.run();
-                       future.complete(null);
+                       try {
+                               closingSequence.run();
+                               future.complete(null);
+                       } catch (Throwable error) {
+                               future.completeExceptionally(error);
+                       }
                });
-               // Use uncaught exception handler to handle exceptions during 
closing.
-               t.setUncaughtExceptionHandler((thread, error) -> 
future.completeExceptionally(error));
                t.start();
 
                // if the future fails due to a timeout, we interrupt the thread
@@ -98,14 +96,4 @@ public class ComponentClosingUtils {
                // the abortion strategy is pretty simple here...
                t.interrupt();
        }
-
-       // ---------------------------
-
-       private static class ClosingException extends RuntimeException {
-               private static final long serialVersionUID = 
2527474477287706295L;
-
-               private ClosingException(String componentName, Exception e) {
-                       super(String.format("Caught exception when closing %s", 
componentName), e);
-               }
-       }
 }

Reply via email to