[hotfix] Introduce ShutdownHookUtil to avoid code duplication (Un)registering shotdown hooks for cleanups is a very common concern in Flink. Many places in the code essentially duplicate all the code for doing this. This commit introduces a utils class and deduplicates the code.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d19b1c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d19b1c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d19b1c2 Branch: refs/heads/master Commit: 4d19b1c2c276d85eb78a138da5edb25eaac5c088 Parents: 4e7f03e Author: Stefan Richter <[email protected]> Authored: Fri Feb 23 12:11:30 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:10:28 2018 +0100 ---------------------------------------------------------------------- .../org/apache/flink/util/ShutdownHookUtil.java | 100 +++++++++++++++++++ .../flink/api/java/RemoteEnvironment.java | 40 +------- .../api/streaming/data/PythonStreamer.java | 21 ++-- .../runtime/webmonitor/WebRuntimeMonitor.java | 16 +-- .../webmonitor/history/HistoryServer.java | 33 ++---- .../flink/runtime/blob/AbstractBlobCache.java | 15 +-- .../apache/flink/runtime/blob/BlobServer.java | 18 +--- .../apache/flink/runtime/blob/BlobUtils.java | 36 ------- .../FileArchivedExecutionGraphStore.java | 19 +--- .../flink/runtime/filecache/FileCache.java | 46 ++------- .../io/disk/iomanager/IOManagerAsync.java | 35 +------ .../runtime/util/JvmShutdownSafeguard.java | 13 +-- .../flink/runtime/testutils/TestJvmProcess.java | 15 +-- .../yarn/AbstractYarnClusterDescriptor.java | 7 +- 14 files changed, 152 insertions(+), 262 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java new file mode 100644 index 0000000..7526f65 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Utils class for dealing with JVM shutdown hooks. + */ +public class ShutdownHookUtil { + + /** + * Adds a shutdown hook to the JVM and returns the Thread, which has been registered. + */ + public static Thread addShutdownHook( + final AutoCloseable service, + final String serviceName, + final Logger logger) { + + checkNotNull(service); + checkNotNull(logger); + + final Thread shutdownHook = new Thread(() -> { + try { + service.close(); + } catch (Throwable t) { + logger.error("Error during shutdown of {} via JVM shutdown hook.", serviceName, t); + } + }, serviceName + " shutdown hook"); + + return addShutdownHookThread(shutdownHook, serviceName, logger) ? shutdownHook : null; + } + + /** + * Adds a shutdown hook to the JVM and returns the Thread, which has been registered. + */ + public static boolean addShutdownHookThread( + final Thread shutdownHook, + final String serviceName, + final Logger logger) { + + checkNotNull(shutdownHook); + checkNotNull(logger); + + try { + // Add JVM shutdown hook to call shutdown of service + Runtime.getRuntime().addShutdownHook(shutdownHook); + return true; + } catch (IllegalStateException e) { + // JVM is already shutting down. no need to do our work + } catch (Throwable t) { + logger.error("Cannot register shutdown hook that cleanly terminates {}.", serviceName, t); + } + return false; + } + + /** + * Removes a shutdown hook from the JVM. + */ + public static void removeShutdownHook(final Thread shutdownHook, final String serviceName, final Logger logger) { + + // Do not run if this is invoked by the shutdown hook itself + if (shutdownHook == null || shutdownHook == Thread.currentThread()) { + return; + } + + checkNotNull(logger); + + try { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } catch (IllegalStateException e) { + // race, JVM is in shutdown already, we can safely ignore this + logger.debug("Unable to remove shutdown hook for {}, shutdown already in progress", serviceName, e); + } catch (Throwable t) { + logger.warn("Exception while un-registering {}'s shutdown hook.", serviceName, t); + } + } + + private ShutdownHookUtil() { + throw new AssertionError(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index fa223bd..c50f79c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.ShutdownHookUtil; import java.io.File; import java.net.MalformedURLException; @@ -224,19 +225,8 @@ public class RemoteEnvironment extends ExecutionEnvironment { // ------------------------------------------------------------------------ protected void dispose() { - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the - // shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering the cleanup shutdown hook."); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); try { PlanExecutor executor = this.executor; @@ -262,29 +252,7 @@ public class RemoteEnvironment extends ExecutionEnvironment { private void installShutdownHook() { if (shutdownHook == null) { - Thread shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - dispose(); - } - catch (Throwable t) { - LOG.error("Error in cleanup of RemoteEnvironment during JVM shutdown: " + t.getMessage(), t); - } - } - }); - - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - this.shutdownHook = shutdownHook; - } - catch (IllegalStateException e) { - // JVM is already shutting down. no need or a shutdown hook - } - catch (Throwable t) { - LOG.error("Cannot register shutdown hook that cleanly terminates the BLOB service."); - } + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::dispose, getClass().getSimpleName(), LOG); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 864ea30..e28b8db 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -21,6 +21,7 @@ import org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ import org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer; import org.apache.flink.python.api.streaming.util.StreamPrinter; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -124,18 +125,10 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable errorPrinter = new Thread(new StreamPrinter(process.getErrorStream(), msg)); errorPrinter.start(); - shutdownThread = new Thread() { - @Override - public void run() { - try { - destroyProcess(process); - } catch (IOException ioException) { - LOG.warn("Could not destroy python process.", ioException); - } - } - }; - - Runtime.getRuntime().addShutdownHook(shutdownThread); + shutdownThread = ShutdownHookUtil.addShutdownHook( + () -> destroyProcess(process), + getClass().getSimpleName(), + LOG); OutputStream processOutput = process.getOutputStream(); processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET)); @@ -207,9 +200,7 @@ public class PythonStreamer<S extends PythonSender, OUT> implements Serializable throwable = ExceptionUtils.firstOrSuppressed(t, throwable); } - if (shutdownThread != null) { - Runtime.getRuntime().removeShutdownHook(shutdownThread); - } + ShutdownHookUtil.removeShutdownHook(shutdownThread, getClass().getSimpleName(), LOG); ExceptionUtils.tryRethrowIOException(throwable); } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 6abe0e7..f27ae00 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -84,6 +84,7 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; @@ -382,20 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor { webRootDir)); // add shutdown hook for deleting the directories and remaining temp files on shutdown - try { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - cleanup(); - } - }); - } catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); - } catch (Throwable t) { - // these errors usually happen when the shutdown is already in progress - LOG.warn("Error while adding shutdown hook", t); - } + ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG); this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, serverSSLContext, configuredAddress, configuredPort, config); http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java index 9c3b51e..d361934 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; import org.apache.flink.util.FileUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; @@ -181,22 +182,10 @@ public class HistoryServer { long refreshIntervalMillis = config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL); archiveFetcher = new HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, numFinishedPolls); - this.shutdownHook = new Thread() { - @Override - public void run() { - HistoryServer.this.stop(); - } - }; - // add shutdown hook for deleting the directories and remaining temp files on shutdown - try { - Runtime.getRuntime().addShutdownHook(shutdownHook); - } catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); - } catch (Throwable t) { - // these errors usually happen when the shutdown is already in progress - LOG.warn("Error while adding shutdown hook", t); - } + this.shutdownHook = ShutdownHookUtil.addShutdownHook( + HistoryServer.this::stop, + HistoryServer.class.getSimpleName(), + LOG); } @VisibleForTesting @@ -263,16 +252,8 @@ public class HistoryServer { LOG.info("Stopped history server."); - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } catch (IllegalStateException ignored) { - // race, JVM is in shutdown already, we can safely ignore this - } catch (Throwable t) { - LOG.warn("Exception while unregistering HistoryServer cleanup shutdown hook."); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java index ebcb42e..ce12898 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.FileUtils; +import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; @@ -116,7 +117,7 @@ public abstract class AbstractBlobCache implements Closeable { } // Add shutdown hook to delete storage directory - shutdownHook = BlobUtils.addShutdownHook(this, log); + shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), log); this.serverAddress = serverAddress; } @@ -249,16 +250,8 @@ public abstract class AbstractBlobCache implements Closeable { try { FileUtils.deleteDirectory(storageDir); } finally { - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } catch (Throwable t) { - log.warn("Exception while unregistering BLOB cache's cleanup shutdown hook."); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), log); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index 1213a31..92d1135 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.ShutdownHookUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -169,7 +170,7 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma .schedule(new TransientBlobCleanupTask(blobExpiryTimes, readWriteLock.writeLock(), storageDir, LOG), cleanupInterval, cleanupInterval); - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) { try { @@ -345,19 +346,8 @@ public class BlobServer extends Thread implements BlobService, BlobWriter, Perma exception = ExceptionUtils.firstOrSuppressed(e, exception); } - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the - // shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering BLOB server's cleanup shutdown hook.", t); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); if (LOG.isInfoEnabled()) { LOG.info("Stopped BLOB server at {}:{}", serverSocket.getInetAddress().getHostAddress(), getPort()); http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java index 3273e1c..a21c7d6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java @@ -33,7 +33,6 @@ import org.slf4j.Logger; import javax.annotation.Nullable; -import java.io.Closeable; import java.io.EOFException; import java.io.File; import java.io.IOException; @@ -47,7 +46,6 @@ import java.security.NoSuchAlgorithmException; import java.util.Random; import java.util.UUID; -import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly; /** @@ -304,40 +302,6 @@ public class BlobUtils { } /** - * Adds a shutdown hook to the JVM and returns the Thread, which has been registered. - */ - public static Thread addShutdownHook(final Closeable service, final Logger logger) { - checkNotNull(service); - checkNotNull(logger); - - final Thread shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - service.close(); - } - catch (Throwable t) { - logger.error("Error during shutdown of blob service via JVM shutdown hook.", t); - } - } - }); - - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - return shutdownHook; - } - catch (IllegalStateException e) { - // JVM is already shutting down. no need to do our work - return null; - } - catch (Throwable t) { - logger.error("Cannot register shutdown hook that cleanly terminates the BLOB service."); - return null; - } - } - - /** * Auxiliary method to write the length of an upcoming data chunk to an * output stream. * http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java index d2dbeb5..6526072 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java @@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.blob.BlobUtils; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -31,6 +30,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.shaded.guava18.com.google.common.base.Ticker; import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; @@ -122,7 +122,7 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt expirationTime.toMilliseconds(), TimeUnit.MILLISECONDS); - this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, getClass().getSimpleName(), LOG); this.numFinishedJobs = 0; this.numFailedJobs = 0; @@ -206,19 +206,8 @@ public class FileArchivedExecutionGraphStore implements ArchivedExecutionGraphSt // clean up the storage directory FileUtils.deleteFileOrDirectory(storageDir); - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the - // shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering FileArchivedExecutionGraphStore's cleanup shutdown hook.", t); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); } // -------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java index d78801d..d9d021a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java @@ -29,6 +29,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.commons.io.FileUtils; import org.slf4j.Logger; @@ -132,19 +133,8 @@ public class FileCache { } } - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the - // shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - } - catch (Throwable t) { - LOG.warn("Exception while unregistering file cache's cleanup shutdown hook."); - } - } + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); } } @@ -265,31 +255,11 @@ public class FileCache { private static Thread createShutdownHook(final FileCache cache, final Logger logger) { - Thread shutdownHook = new Thread(new Runnable() { - @Override - public void run() { - try { - cache.shutdown(); - } - catch (Throwable t) { - logger.error("Error during shutdown of file cache via JVM shutdown hook: " + t.getMessage(), t); - } - } - }); - - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - return shutdownHook; - } - catch (IllegalStateException e) { - // JVM is already shutting down. no need to do our work - return null; - } - catch (Throwable t) { - logger.error("Cannot register shutdown hook that cleanly terminates the file cache service."); - return null; - } + return ShutdownHookUtil.addShutdownHook( + cache::shutdown, + FileCache.class.getSimpleName(), + logger + ); } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index e2a3a6f..2b8e7f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.ShutdownHookUtil; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; @@ -99,22 +100,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle } // install a shutdown hook that makes sure the temp directories get deleted - this.shutdownHook = new Thread("I/O manager shutdown hook") { - @Override - public void run() { - shutdown(); - } - }; - try { - Runtime.getRuntime().addShutdownHook(this.shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to add shutdown hook, shutdown already in progress", e); - } - catch (Throwable t) { - LOG.warn("Error while adding shutdown hook for IOManager", t); - } + this.shutdownHook = ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), LOG); } /** @@ -129,20 +115,9 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle return; } - // Remove shutdown hook to prevent resource leaks, unless this is invoked by the shutdown hook itself - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException e) { - // race, JVM is in shutdown already, we can safely ignore this - LOG.debug("Unable to remove shutdown hook, shutdown already in progress", e); - } - catch (Throwable t) { - LOG.warn("Exception while unregistering IOManager's shutdown hook.", t); - } - } - + // Remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); + try { if (LOG.isDebugEnabled()) { LOG.debug("Shutting down I/O manager."); http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java index e8e378e..579cdf4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.util; +import org.apache.flink.util.ShutdownHookUtil; + import org.slf4j.Logger; import static org.apache.flink.util.Preconditions.checkArgument; @@ -112,15 +114,6 @@ public class JvmShutdownSafeguard extends Thread { // install the blocking shutdown hook Thread shutdownHook = new JvmShutdownSafeguard(delayMillis); - try { - // Add JVM shutdown hook to call shutdown of service - Runtime.getRuntime().addShutdownHook(shutdownHook); - } - catch (IllegalStateException ignored) { - // JVM is already shutting down. No need to do this. - } - catch (Throwable t) { - logger.error("Cannot install JVM Shutdown Safeguard against blocked shutdown hooks"); - } + ShutdownHookUtil.addShutdownHookThread(shutdownHook, JvmShutdownSafeguard.class.getSimpleName(), logger); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java index 4578edf..885ea8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.testutils; +import org.apache.flink.util.ShutdownHookUtil; + import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -229,18 +231,7 @@ public abstract class TestJvmProcess { } finally { destroyed = true; - - if (shutdownHook != null && shutdownHook != Thread.currentThread()) { - try { - Runtime.getRuntime().removeShutdownHook(shutdownHook); - } - catch (IllegalStateException ignored) { - // JVM is in shutdown already, we can safely ignore this. - } - catch (Throwable t) { - LOG.warn("Exception while unregistering process cleanup shutdown hook."); - } - } + ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), LOG); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 2d6926a..e6c36f6 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.ShutdownHookUtil; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.fs.FileSystem; @@ -1036,11 +1037,7 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor "temporary files of the YARN session in the home directoy will not be removed."); } // since deployment was successful, remove the hook - try { - Runtime.getRuntime().removeShutdownHook(deploymentFailureHook); - } catch (IllegalStateException e) { - // we're already in the shut down hook. - } + ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG); return report; }
