[hotfix] Introduce ShutdownHookUtil to avoid code duplication

(Un)registering shotdown hooks for cleanups is a very common concern in Flink.
Many places in the code essentially duplicate all the code for doing this.
This commit introduces a utils class and deduplicates the code.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4d19b1c2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4d19b1c2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4d19b1c2

Branch: refs/heads/master
Commit: 4d19b1c2c276d85eb78a138da5edb25eaac5c088
Parents: 4e7f03e
Author: Stefan Richter <[email protected]>
Authored: Fri Feb 23 12:11:30 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:10:28 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ShutdownHookUtil.java | 100 +++++++++++++++++++
 .../flink/api/java/RemoteEnvironment.java       |  40 +-------
 .../api/streaming/data/PythonStreamer.java      |  21 ++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  16 +--
 .../webmonitor/history/HistoryServer.java       |  33 ++----
 .../flink/runtime/blob/AbstractBlobCache.java   |  15 +--
 .../apache/flink/runtime/blob/BlobServer.java   |  18 +---
 .../apache/flink/runtime/blob/BlobUtils.java    |  36 -------
 .../FileArchivedExecutionGraphStore.java        |  19 +---
 .../flink/runtime/filecache/FileCache.java      |  46 ++-------
 .../io/disk/iomanager/IOManagerAsync.java       |  35 +------
 .../runtime/util/JvmShutdownSafeguard.java      |  13 +--
 .../flink/runtime/testutils/TestJvmProcess.java |  15 +--
 .../yarn/AbstractYarnClusterDescriptor.java     |   7 +-
 14 files changed, 152 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
new file mode 100644
index 0000000..7526f65
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/ShutdownHookUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utils class for dealing with JVM shutdown hooks.
+ */
+public class ShutdownHookUtil {
+
+       /**
+        * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
+        */
+       public static Thread addShutdownHook(
+               final AutoCloseable service,
+               final String serviceName,
+               final Logger logger) {
+
+               checkNotNull(service);
+               checkNotNull(logger);
+
+               final Thread shutdownHook = new Thread(() -> {
+                       try {
+                               service.close();
+                       } catch (Throwable t) {
+                               logger.error("Error during shutdown of {} via 
JVM shutdown hook.", serviceName, t);
+                       }
+               }, serviceName + " shutdown hook");
+
+               return addShutdownHookThread(shutdownHook, serviceName, logger) 
? shutdownHook : null;
+       }
+
+       /**
+        * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
+        */
+       public static boolean addShutdownHookThread(
+               final Thread shutdownHook,
+               final String serviceName,
+               final Logger logger) {
+
+               checkNotNull(shutdownHook);
+               checkNotNull(logger);
+
+               try {
+                       // Add JVM shutdown hook to call shutdown of service
+                       Runtime.getRuntime().addShutdownHook(shutdownHook);
+                       return true;
+               } catch (IllegalStateException e) {
+                       // JVM is already shutting down. no need to do our work
+               } catch (Throwable t) {
+                       logger.error("Cannot register shutdown hook that 
cleanly terminates {}.", serviceName, t);
+               }
+               return false;
+       }
+
+       /**
+        * Removes a shutdown hook from the JVM.
+        */
+       public static void removeShutdownHook(final Thread shutdownHook, final 
String serviceName, final Logger logger) {
+
+               // Do not run if this is invoked by the shutdown hook itself
+               if (shutdownHook == null || shutdownHook == 
Thread.currentThread()) {
+                       return;
+               }
+
+               checkNotNull(logger);
+
+               try {
+                       Runtime.getRuntime().removeShutdownHook(shutdownHook);
+               } catch (IllegalStateException e) {
+                       // race, JVM is in shutdown already, we can safely 
ignore this
+                       logger.debug("Unable to remove shutdown hook for {}, 
shutdown already in progress", serviceName, e);
+               } catch (Throwable t) {
+                       logger.warn("Exception while un-registering {}'s 
shutdown hook.", serviceName, t);
+               }
+       }
+
+       private ShutdownHookUtil() {
+               throw new AssertionError();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
index fa223bd..c50f79c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.PlanExecutor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.File;
 import java.net.MalformedURLException;
@@ -224,19 +225,8 @@ public class RemoteEnvironment extends 
ExecutionEnvironment {
        // 
------------------------------------------------------------------------
 
        protected void dispose() {
-               // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the
-               // shutdown hook itself
-               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                       try {
-                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                       }
-                       catch (IllegalStateException e) {
-                               // race, JVM is in shutdown already, we can 
safely ignore this
-                       }
-                       catch (Throwable t) {
-                               LOG.warn("Exception while unregistering the 
cleanup shutdown hook.");
-                       }
-               }
+               // Remove shutdown hook to prevent resource leaks
+               ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
                try {
                        PlanExecutor executor = this.executor;
@@ -262,29 +252,7 @@ public class RemoteEnvironment extends 
ExecutionEnvironment {
 
        private void installShutdownHook() {
                if (shutdownHook == null) {
-                       Thread shutdownHook = new Thread(new Runnable() {
-                               @Override
-                               public void run() {
-                                       try {
-                                               dispose();
-                                       }
-                                       catch (Throwable t) {
-                                               LOG.error("Error in cleanup of 
RemoteEnvironment during JVM shutdown: " + t.getMessage(), t);
-                                       }
-                               }
-                       });
-
-                       try {
-                               // Add JVM shutdown hook to call shutdown of 
service
-                               
Runtime.getRuntime().addShutdownHook(shutdownHook);
-                               this.shutdownHook = shutdownHook;
-                       }
-                       catch (IllegalStateException e) {
-                               // JVM is already shutting down. no need or a 
shutdown hook
-                       }
-                       catch (Throwable t) {
-                               LOG.error("Cannot register shutdown hook that 
cleanly terminates the BLOB service.");
-                       }
+                       this.shutdownHook = 
ShutdownHookUtil.addShutdownHook(this::dispose, getClass().getSimpleName(), 
LOG);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 864ea30..e28b8db 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -21,6 +21,7 @@ import 
org.apache.flink.python.api.streaming.util.SerializationUtils.IntSerializ
 import 
org.apache.flink.python.api.streaming.util.SerializationUtils.StringSerializer;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -124,18 +125,10 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                errorPrinter = new Thread(new 
StreamPrinter(process.getErrorStream(), msg));
                errorPrinter.start();
 
-               shutdownThread = new Thread() {
-                       @Override
-                       public void run() {
-                               try {
-                                       destroyProcess(process);
-                               } catch (IOException ioException) {
-                                       LOG.warn("Could not destroy python 
process.", ioException);
-                               }
-                       }
-               };
-
-               Runtime.getRuntime().addShutdownHook(shutdownThread);
+               shutdownThread = ShutdownHookUtil.addShutdownHook(
+                       () -> destroyProcess(process),
+                       getClass().getSimpleName(),
+                       LOG);
 
                OutputStream processOutput = process.getOutputStream();
                
processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
@@ -207,9 +200,7 @@ public class PythonStreamer<S extends PythonSender, OUT> 
implements Serializable
                        throwable = ExceptionUtils.firstOrSuppressed(t, 
throwable);
                }
 
-               if (shutdownThread != null) {
-                       Runtime.getRuntime().removeShutdownHook(shutdownThread);
-               }
+               ShutdownHookUtil.removeShutdownHook(shutdownThread, 
getClass().getSimpleName(), LOG);
 
                ExceptionUtils.tryRethrowIOException(throwable);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 6abe0e7..f27ae00 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -84,6 +84,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
@@ -382,20 +383,7 @@ public class WebRuntimeMonitor implements WebMonitor {
                        webRootDir));
 
                // add shutdown hook for deleting the directories and remaining 
temp files on shutdown
-               try {
-                       Runtime.getRuntime().addShutdownHook(new Thread() {
-                               @Override
-                               public void run() {
-                                       cleanup();
-                               }
-                       });
-               } catch (IllegalStateException e) {
-                       // race, JVM is in shutdown already, we can safely 
ignore this
-                       LOG.debug("Unable to add shutdown hook, shutdown 
already in progress", e);
-               } catch (Throwable t) {
-                       // these errors usually happen when the shutdown is 
already in progress
-                       LOG.warn("Error while adding shutdown hook", t);
-               }
+               ShutdownHookUtil.addShutdownHook(this::cleanup, 
getClass().getSimpleName(), LOG);
 
                this.netty = new WebFrontendBootstrap(router, LOG, uploadDir, 
serverSSLContext, configuredAddress, configuredPort, config);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 9c3b51e..d361934 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 
@@ -181,22 +182,10 @@ public class HistoryServer {
                long refreshIntervalMillis = 
config.getLong(HistoryServerOptions.HISTORY_SERVER_ARCHIVE_REFRESH_INTERVAL);
                archiveFetcher = new 
HistoryServerArchiveFetcher(refreshIntervalMillis, refreshDirs, webDir, 
numFinishedPolls);
 
-               this.shutdownHook = new Thread() {
-                       @Override
-                       public void run() {
-                               HistoryServer.this.stop();
-                       }
-               };
-               // add shutdown hook for deleting the directories and remaining 
temp files on shutdown
-               try {
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-               } catch (IllegalStateException e) {
-                       // race, JVM is in shutdown already, we can safely 
ignore this
-                       LOG.debug("Unable to add shutdown hook, shutdown 
already in progress", e);
-               } catch (Throwable t) {
-                       // these errors usually happen when the shutdown is 
already in progress
-                       LOG.warn("Error while adding shutdown hook", t);
-               }
+               this.shutdownHook = ShutdownHookUtil.addShutdownHook(
+                       HistoryServer.this::stop,
+                       HistoryServer.class.getSimpleName(),
+                       LOG);
        }
 
        @VisibleForTesting
@@ -263,16 +252,8 @@ public class HistoryServer {
 
                                LOG.info("Stopped history server.");
 
-                               // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the shutdown hook itself
-                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                                       try {
-                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                                       } catch (IllegalStateException ignored) 
{
-                                               // race, JVM is in shutdown 
already, we can safely ignore this
-                                       } catch (Throwable t) {
-                                               LOG.warn("Exception while 
unregistering HistoryServer cleanup shutdown hook.");
-                                       }
-                               }
+                               // Remove shutdown hook to prevent resource 
leaks
+                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), 
LOG);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
index ebcb42e..ce12898 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/AbstractBlobCache.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 
@@ -116,7 +117,7 @@ public abstract class AbstractBlobCache implements 
Closeable {
                }
 
                // Add shutdown hook to delete storage directory
-               shutdownHook = BlobUtils.addShutdownHook(this, log);
+               shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
getClass().getSimpleName(), log);
 
                this.serverAddress = serverAddress;
        }
@@ -249,16 +250,8 @@ public abstract class AbstractBlobCache implements 
Closeable {
                        try {
                                FileUtils.deleteDirectory(storageDir);
                        } finally {
-                               // Remove shutdown hook to prevent resource 
leaks, unless this is invoked by the shutdown hook itself
-                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                                       try {
-                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                                       } catch (IllegalStateException e) {
-                                               // race, JVM is in shutdown 
already, we can safely ignore this
-                                       } catch (Throwable t) {
-                                               log.warn("Exception while 
unregistering BLOB cache's cleanup shutdown hook.");
-                                       }
-                               }
+                               // Remove shutdown hook to prevent resource 
leaks
+                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), 
log);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
index 1213a31..92d1135 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -169,7 +170,7 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
                        .schedule(new TransientBlobCleanupTask(blobExpiryTimes, 
readWriteLock.writeLock(),
                                storageDir, LOG), cleanupInterval, 
cleanupInterval);
 
-               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+               this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
getClass().getSimpleName(), LOG);
 
                if (config.getBoolean(BlobServerOptions.SSL_ENABLED)) {
                        try {
@@ -345,19 +346,8 @@ public class BlobServer extends Thread implements 
BlobService, BlobWriter, Perma
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
                        }
 
-                       // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the
-                       // shutdown hook itself
-                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                               try {
-                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                               }
-                               catch (IllegalStateException e) {
-                                       // race, JVM is in shutdown already, we 
can safely ignore this
-                               }
-                               catch (Throwable t) {
-                                       LOG.warn("Exception while unregistering 
BLOB server's cleanup shutdown hook.", t);
-                               }
-                       }
+                       // Remove shutdown hook to prevent resource leaks
+                       ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
 
                        if (LOG.isInfoEnabled()) {
                                LOG.info("Stopped BLOB server at {}:{}", 
serverSocket.getInetAddress().getHostAddress(), getPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
index 3273e1c..a21c7d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobUtils.java
@@ -33,7 +33,6 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 
-import java.io.Closeable;
 import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
@@ -47,7 +46,6 @@ import java.security.NoSuchAlgorithmException;
 import java.util.Random;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.StringUtils.isNullOrWhitespaceOnly;
 
 /**
@@ -304,40 +302,6 @@ public class BlobUtils {
        }
 
        /**
-        * Adds a shutdown hook to the JVM and returns the Thread, which has 
been registered.
-        */
-       public static Thread addShutdownHook(final Closeable service, final 
Logger logger) {
-               checkNotNull(service);
-               checkNotNull(logger);
-
-               final Thread shutdownHook = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       service.close();
-                               }
-                               catch (Throwable t) {
-                                       logger.error("Error during shutdown of 
blob service via JVM shutdown hook.", t);
-                               }
-                       }
-               });
-
-               try {
-                       // Add JVM shutdown hook to call shutdown of service
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-                       return shutdownHook;
-               }
-               catch (IllegalStateException e) {
-                       // JVM is already shutting down. no need to do our work
-                       return null;
-               }
-               catch (Throwable t) {
-                       logger.error("Cannot register shutdown hook that 
cleanly terminates the BLOB service.");
-                       return null;
-               }
-       }
-
-       /**
         * Auxiliary method to write the length of an upcoming data chunk to an
         * output stream.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
index d2dbeb5..6526072 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/FileArchivedExecutionGraphStore.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.dispatcher;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.blob.BlobUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -31,6 +30,7 @@ import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.flink.shaded.guava18.com.google.common.base.Ticker;
 import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
@@ -122,7 +122,7 @@ public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphSt
                        expirationTime.toMilliseconds(),
                        TimeUnit.MILLISECONDS);
 
-               this.shutdownHook = BlobUtils.addShutdownHook(this, LOG);
+               this.shutdownHook = ShutdownHookUtil.addShutdownHook(this, 
getClass().getSimpleName(), LOG);
 
                this.numFinishedJobs = 0;
                this.numFailedJobs = 0;
@@ -206,19 +206,8 @@ public class FileArchivedExecutionGraphStore implements 
ArchivedExecutionGraphSt
                // clean up the storage directory
                FileUtils.deleteFileOrDirectory(storageDir);
 
-               // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the
-               // shutdown hook itself
-               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                       try {
-                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                       }
-                       catch (IllegalStateException e) {
-                               // race, JVM is in shutdown already, we can 
safely ignore this
-                       }
-                       catch (Throwable t) {
-                               LOG.warn("Exception while unregistering 
FileArchivedExecutionGraphStore's cleanup shutdown hook.", t);
-                       }
-               }
+               // Remove shutdown hook to prevent resource leaks
+               ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
        }
 
        // --------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
index d78801d..d9d021a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java
@@ -29,6 +29,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -132,19 +133,8 @@ public class FileCache {
                                }
                        }
 
-                       // Remove shutdown hook to prevent resource leaks, 
unless this is invoked by the
-                       // shutdown hook itself
-                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                               try {
-                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                               }
-                               catch (IllegalStateException e) {
-                                       // race, JVM is in shutdown already, we 
can safely ignore this
-                               }
-                               catch (Throwable t) {
-                                       LOG.warn("Exception while unregistering 
file cache's cleanup shutdown hook.");
-                               }
-                       }
+                       // Remove shutdown hook to prevent resource leaks
+                       ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
                }
        }
 
@@ -265,31 +255,11 @@ public class FileCache {
 
        private static Thread createShutdownHook(final FileCache cache, final 
Logger logger) {
 
-               Thread shutdownHook = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       cache.shutdown();
-                               }
-                               catch (Throwable t) {
-                                       logger.error("Error during shutdown of 
file cache via JVM shutdown hook: " + t.getMessage(), t);
-                               }
-                       }
-               });
-
-               try {
-                       // Add JVM shutdown hook to call shutdown of service
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-                       return shutdownHook;
-               }
-               catch (IllegalStateException e) {
-                       // JVM is already shutting down. no need to do our work
-                       return null;
-               }
-               catch (Throwable t) {
-                       logger.error("Cannot register shutdown hook that 
cleanly terminates the file cache service.");
-                       return null;
-               }
+               return ShutdownHookUtil.addShutdownHook(
+                       cache::shutdown,
+                       FileCache.class.getSimpleName(),
+                       logger
+               );
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index e2a3a6f..2b8e7f3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.disk.iomanager;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.ShutdownHookUtil;
 
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
@@ -99,22 +100,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                }
 
                // install a shutdown hook that makes sure the temp directories 
get deleted
-               this.shutdownHook = new Thread("I/O manager shutdown hook") {
-                       @Override
-                       public void run() {
-                               shutdown();
-                       }
-               };
-               try {
-                       Runtime.getRuntime().addShutdownHook(this.shutdownHook);
-               }
-               catch (IllegalStateException e) {
-                       // race, JVM is in shutdown already, we can safely 
ignore this
-                       LOG.debug("Unable to add shutdown hook, shutdown 
already in progress", e);
-               }
-               catch (Throwable t) {
-                       LOG.warn("Error while adding shutdown hook for 
IOManager", t);
-               }
+               this.shutdownHook = 
ShutdownHookUtil.addShutdownHook(this::shutdown, getClass().getSimpleName(), 
LOG);
        }
 
        /**
@@ -129,20 +115,9 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
                        return;
                }
 
-               // Remove shutdown hook to prevent resource leaks, unless this 
is invoked by the shutdown hook itself
-               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                       try {
-                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                       }
-                       catch (IllegalStateException e) {
-                               // race, JVM is in shutdown already, we can 
safely ignore this
-                               LOG.debug("Unable to remove shutdown hook, 
shutdown already in progress", e);
-                       }
-                       catch (Throwable t) {
-                               LOG.warn("Exception while unregistering 
IOManager's shutdown hook.", t);
-                       }
-               }
-               
+               // Remove shutdown hook to prevent resource leaks
+               ShutdownHookUtil.removeShutdownHook(shutdownHook, 
getClass().getSimpleName(), LOG);
+
                try {
                        if (LOG.isDebugEnabled()) {
                                LOG.debug("Shutting down I/O manager.");

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
index e8e378e..579cdf4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.util.ShutdownHookUtil;
+
 import org.slf4j.Logger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -112,15 +114,6 @@ public class JvmShutdownSafeguard extends Thread {
 
                // install the blocking shutdown hook
                Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
-               try {
-                       // Add JVM shutdown hook to call shutdown of service
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-               }
-               catch (IllegalStateException ignored) {
-                       // JVM is already shutting down. No need to do this.
-               }
-               catch (Throwable t) {
-                       logger.error("Cannot install JVM Shutdown Safeguard 
against blocked shutdown hooks");
-               }
+               ShutdownHookUtil.addShutdownHookThread(shutdownHook, 
JvmShutdownSafeguard.class.getSimpleName(), logger);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 4578edf..885ea8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.testutils;
 
+import org.apache.flink.util.ShutdownHookUtil;
+
 import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -229,18 +231,7 @@ public abstract class TestJvmProcess {
                        }
                        finally {
                                destroyed = true;
-
-                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                                       try {
-                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                                       }
-                                       catch (IllegalStateException ignored) {
-                                               // JVM is in shutdown already, 
we can safely ignore this.
-                                       }
-                                       catch (Throwable t) {
-                                               LOG.warn("Exception while 
unregistering process cleanup shutdown hook.");
-                                       }
-                               }
+                               
ShutdownHookUtil.removeShutdownHook(shutdownHook, getClass().getSimpleName(), 
LOG);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/4d19b1c2/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 2d6926a..e6c36f6 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ShutdownHookUtil;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -1036,11 +1037,7 @@ public abstract class AbstractYarnClusterDescriptor 
implements ClusterDescriptor
                                        "temporary files of the YARN session in 
the home directoy will not be removed.");
                }
                // since deployment was successful, remove the hook
-               try {
-                       
Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
-               } catch (IllegalStateException e) {
-                       // we're already in the shut down hook.
-               }
+               ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, 
getClass().getSimpleName(), LOG);
                return report;
        }
 

Reply via email to