This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit b4f1b0b0e52f017a189d1a98627f13dba8d79d0c Author: Jiangjie (Becket) Qin <[email protected]> AuthorDate: Sat Oct 10 00:32:24 2020 +0800 [hotfix][util] Add a util class to help closing components with timeout. --- .../apache/flink/util/ComponentClosingUtils.java | 117 +++++++++++++++++++++ 1 file changed, 117 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java b/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java new file mode 100644 index 0000000..65c83ae --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ComponentClosingUtils.java @@ -0,0 +1,117 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.util; + +import org.apache.flink.util.function.ThrowingRunnable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * A util class to help with a clean component shutdown. + */ +public class ComponentClosingUtils { + private static final Logger LOG = LoggerFactory.getLogger(ComponentClosingUtils.class); + // A shared watchdog executor to handle the timeout closing. + private static final ScheduledExecutorService watchDog = + Executors.newSingleThreadScheduledExecutor((ThreadFactory) r -> { + Thread t = new Thread(r, "ComponentClosingUtil"); + t.setUncaughtExceptionHandler((thread, exception) -> { + LOG.error("FATAL: The component closing util thread caught exception ", exception); + System.exit(-17); + }); + return t; + }); + + private ComponentClosingUtils() {} + + /** + * Close a component with a timeout. + * + * @param componentName the name of the component. + * @param closingSequence the closing logic which is a callable that can throw exceptions. + * @param closeTimeoutMs the timeout in milliseconds to waif for the component to close. + * @return An optional throwable which is non-empty if an error occurred when closing the component. + */ + public static CompletableFuture<Void> closeAsyncWithTimeout( + String componentName, + ThrowingRunnable<Exception> closingSequence, + long closeTimeoutMs) { + return closeAsyncWithTimeout( + componentName, + (Runnable) () -> { + try { + closingSequence.run(); + } catch (Exception e) { + throw new ClosingException(componentName, e); + } + }, closeTimeoutMs); + } + + /** + * Close a component with a timeout. + * + * @param componentName the name of the component. + * @param closingSequence the closing logic. + * @param closeTimeoutMs the timeout in milliseconds to waif for the component to close. + * @return An optional throwable which is non-empty if an error occurred when closing the component. + */ + public static CompletableFuture<Void> closeAsyncWithTimeout( + String componentName, + Runnable closingSequence, + long closeTimeoutMs) { + final CompletableFuture<Void> future = new CompletableFuture<>(); + // Start a dedicate thread to close the component. + Thread t = new Thread(() -> { + closingSequence.run(); + future.complete(null); + }); + // Use uncaught exception handler to handle exceptions during closing. + t.setUncaughtExceptionHandler((thread, error) -> future.completeExceptionally(error)); + t.start(); + // Schedule a watch dog job to the watching executor to detect timeout when + // closing the component. + watchDog.schedule(() -> { + if (t.isAlive()) { + t.interrupt(); + future.completeExceptionally(new TimeoutException( + String.format("Failed to close the %s before timeout of %d milliseconds", + componentName, closeTimeoutMs))); + } + }, closeTimeoutMs, TimeUnit.MILLISECONDS); + return future; + } + + // --------------------------- + + private static class ClosingException extends RuntimeException { + private static final long serialVersionUID = 2527474477287706295L; + + private ClosingException(String componentName, Exception e) { + super(String.format("Caught exception when closing %s", componentName), e); + } + } +}
