This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32a7bb01cdcc92571cab453d9fc530cf26d1c2f6 Author: Stephan Ewen <[email protected]> AuthorDate: Sun Nov 22 15:57:16 2020 +0100 [hotfix][runtime] Minor reorg in ComponentClosingUtils to avoid some wrapping. --- .../coordination/ComponentClosingUtils.java | 34 +++++++--------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java index 3249fba..29df511 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java @@ -44,17 +44,12 @@ public class ComponentClosingUtils { */ public static CompletableFuture<Void> closeAsyncWithTimeout( String componentName, - ThrowingRunnable<Exception> closingSequence, + Runnable closingSequence, Duration closeTimeout) { return closeAsyncWithTimeout( componentName, - (Runnable) () -> { - try { - closingSequence.run(); - } catch (Exception e) { - throw new ClosingException(componentName, e); - } - }, closeTimeout); + (ThrowingRunnable<Exception>) closingSequence::run, + closeTimeout); } /** @@ -67,16 +62,19 @@ public class ComponentClosingUtils { */ public static CompletableFuture<Void> closeAsyncWithTimeout( String componentName, - Runnable closingSequence, + ThrowingRunnable<Exception> closingSequence, Duration closeTimeout) { + final CompletableFuture<Void> future = new CompletableFuture<>(); // Start a dedicate thread to close the component. final Thread t = new Thread(() -> { - closingSequence.run(); - future.complete(null); + try { + closingSequence.run(); + future.complete(null); + } catch (Throwable error) { + future.completeExceptionally(error); + } }); - // Use uncaught exception handler to handle exceptions during closing. - t.setUncaughtExceptionHandler((thread, error) -> future.completeExceptionally(error)); t.start(); // if the future fails due to a timeout, we interrupt the thread @@ -98,14 +96,4 @@ public class ComponentClosingUtils { // the abortion strategy is pretty simple here... t.interrupt(); } - - // --------------------------- - - private static class ClosingException extends RuntimeException { - private static final long serialVersionUID = 2527474477287706295L; - - private ClosingException(String componentName, Exception e) { - super(String.format("Caught exception when closing %s", componentName), e); - } - } }
