Repository: flink
Updated Branches:
  refs/heads/master 4b1a9c72e -> 5066125f9


[FLINK-4625] [core] Add a safety net to forcibly terminate JVM is clean 
shutdown freezed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5066125f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5066125f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5066125f

Branch: refs/heads/master
Commit: 5066125f9a377d232f77f6fbcac3c22ebea66b39
Parents: 4b1a9c7
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Sep 15 19:27:06 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Sep 19 20:00:53 2016 +0200

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |   2 +
 .../MesosTaskManagerRunner.java                 |   5 +-
 .../runtime/util/JvmShutdownSafeguard.java      | 126 ++++++++++
 .../flink/runtime/jobmanager/JobManager.scala   |   1 +
 .../flink/runtime/taskmanager/TaskManager.scala |   1 +
 .../runtime/testutils/CommonTestUtils.java      |  10 +-
 .../flink/runtime/testutils/TestJvmProcess.java | 163 +++++++++----
 .../runtime/util/BlockingShutdownTest.java      | 229 +++++++++++++++++++
 .../cassandra/CassandraConnectorITCase.java     |  19 +-
 .../flink/core/testutils/CommonTestUtils.java   |  26 +++
 .../flink/yarn/TestingApplicationMaster.java    |   2 +
 .../flink/yarn/YarnApplicationMasterRunner.java |   2 +
 .../flink/yarn/YarnTaskManagerRunner.java       |   5 +-
 13 files changed, 525 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 9916a87..8fb6af4 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -109,6 +110,7 @@ public class MesosApplicationMasterRunner {
        public static void main(String[] args) {
                EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos 
AppMaster", args);
                SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // run and exit with the proper return code
                int returnCode = new MesosApplicationMasterRunner().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
index 94a9e99..ddc2097 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosTaskManagerRunner.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -49,7 +51,8 @@ public class MesosTaskManagerRunner {
 
        public static void runTaskManager(String[] args, final Class<? extends 
TaskManager> taskManager) throws IOException {
                EnvironmentInformation.logEnvironmentInfo(LOG, 
taskManager.getSimpleName(), args);
-               org.apache.flink.runtime.util.SignalHandler.register(LOG);
+               SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // try to parse the command line arguments
                final Configuration configuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
new file mode 100644
index 0000000..e8e378e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/JvmShutdownSafeguard.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.slf4j.Logger;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A utility that guards against blocking shutdown hooks that block JVM 
shutdown.
+ * 
+ * <p>When the JVM shuts down cleanly (<i>SIGTERM</i> or {@link 
System#exit(int)}) it runs
+ * all installed shutdown hooks. It is possible that any of the shutdown hooks 
blocks,
+ * which causes the JVM to get stuck and not exit at all.
+ * 
+ * <p>This utility installs a shutdown hook that forcibly terminates the JVM 
if it is still alive
+ * a certain time after clean shutdown was initiated. Even if some shutdown 
hooks block, the JVM will
+ * terminate within a certain time.
+ */
+public class JvmShutdownSafeguard extends Thread {
+
+       /** Default delay to wait after clean shutdown was stared, before 
forcibly terminating the JVM */  
+       private static final long DEFAULT_DELAY = 5000L;
+
+       /** The exit code returned by the JVM process if it is killed by the 
safeguard */
+       private static final int EXIT_CODE = -17;
+
+       /** The thread that actually does the termination */
+       private final Thread terminator;
+       
+       private JvmShutdownSafeguard(long delayMillis) {
+               setName("JVM Terminator Launcher");
+
+               this.terminator = new Thread(new 
DelayedTerminator(delayMillis), "Jvm Terminator");
+               this.terminator.setDaemon(true);
+       }
+
+       @Override
+       public void run() {
+               // Because this thread is registered as a shutdown hook, we 
cannot
+               // wait here and then call for termination. That would always 
delay the JVM shutdown.
+               // Instead, we spawn a non shutdown hook thread from here. 
+               // That thread is a daemon, so it does not keep the JVM alive.
+               terminator.start();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  The actual Shutdown thread
+       // 
------------------------------------------------------------------------
+
+       private static class DelayedTerminator implements Runnable {
+
+               private final long delayMillis;
+
+               private DelayedTerminator(long delayMillis) {
+                       this.delayMillis = delayMillis;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               Thread.sleep(delayMillis);
+                       }
+                       catch (Throwable t) {
+                               // catch all, including thread death, etc
+                       }
+
+                       Runtime.getRuntime().halt(EXIT_CODE);
+               }
+       } 
+
+       // 
------------------------------------------------------------------------
+       //  Installing as a shutdown hook
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Installs the safeguard shutdown hook. The maximum time that the JVM 
is allowed to spend
+        * on shutdown before being killed is five seconds.
+        * 
+        * @param logger The logger to log errors to.
+        */
+       public static void installAsShutdownHook(Logger logger) {
+               installAsShutdownHook(logger, DEFAULT_DELAY);
+       }
+
+       /**
+        * Installs the safeguard shutdown hook. The maximum time that the JVM 
is allowed to spend
+        * on shutdown before being killed is the given number of milliseconds.
+        * 
+        * @param logger      The logger to log errors to.
+        * @param delayMillis The delay (in milliseconds) to wait after clean 
shutdown was stared,
+        *                    before forcibly terminating the JVM.
+        */
+       public static void installAsShutdownHook(Logger logger, long 
delayMillis) {
+               checkArgument(delayMillis >= 0, "delay must be >= 0");
+
+               // install the blocking shutdown hook
+               Thread shutdownHook = new JvmShutdownSafeguard(delayMillis);
+               try {
+                       // Add JVM shutdown hook to call shutdown of service
+                       Runtime.getRuntime().addShutdownHook(shutdownHook);
+               }
+               catch (IllegalStateException ignored) {
+                       // JVM is already shutting down. No need to do this.
+               }
+               catch (Throwable t) {
+                       logger.error("Cannot install JVM Shutdown Safeguard 
against blocked shutdown hooks");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c68874..9c844ba 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2019,6 +2019,7 @@ object JobManager {
     // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "JobManager", args)
     SignalHandler.register(LOG.logger)
+    JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
 
     // parsing the command line arguments
     val (configuration: Configuration,

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index c882631..63a64a0 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1497,6 +1497,7 @@ object TaskManager {
     // startup checks and logging
     EnvironmentInformation.logEnvironmentInfo(LOG.logger, "TaskManager", args)
     SignalHandler.register(LOG.logger)
+    JvmShutdownSafeguard.installAsShutdownHook(LOG.logger)
 
     val maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit()
     if (maxOpenFileHandles != -1) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 59c37b7..2a787f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -71,8 +71,8 @@ public class CommonTestUtils {
         * Create a temporary log4j configuration for the test.
         */
        public static File createTemporaryLog4JProperties() throws IOException {
-               File log4jProps = 
File.createTempFile(FileUtils.getRandomFilename(""), "-log4j" +
-                               ".properties");
+               File log4jProps = File.createTempFile(
+                               FileUtils.getRandomFilename(""), 
"-log4j.properties");
                log4jProps.deleteOnExit();
                CommonTestUtils.printLog4jDebugConfig(log4jProps);
 
@@ -137,9 +137,7 @@ public class CommonTestUtils {
        }
 
        public static void printLog4jDebugConfig(File file) throws IOException {
-               try (FileWriter fw = new FileWriter(file)) {
-                       PrintWriter writer = new PrintWriter(fw);
-
+               try (PrintWriter writer = new PrintWriter(new 
FileWriter(file))) {
                        writer.println("log4j.rootLogger=DEBUG, console");
                        
writer.println("log4j.appender.console=org.apache.log4j.ConsoleAppender");
                        writer.println("log4j.appender.console.target = 
System.err");
@@ -147,9 +145,7 @@ public class CommonTestUtils {
                        
writer.println("log4j.appender.console.layout.ConversionPattern=%-4r [%t] %-5p 
%c %x - %m%n");
                        
writer.println("log4j.logger.org.eclipse.jetty.util.log=OFF");
                        writer.println("log4j.logger.org.apache.zookeeper=OFF");
-
                        writer.flush();
-                       writer.close();
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
index 73a0088..5954ee5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/TestJvmProcess.java
@@ -26,6 +26,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.util.Arrays;
 
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.createTemporaryLog4JProperties;
@@ -33,6 +35,7 @@ import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getCurrentClass
 import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getJavaCommandPath;
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.Assert.fail;
 
 /**
@@ -58,11 +61,15 @@ public abstract class TestJvmProcess {
        private int jvmMemoryInMb = 80;
 
        /** The JVM process */
-       private Process process;
+       private volatile Process process;
 
        /** Writer for the process output */
        private volatile StringWriter processOutput;
 
+       /** flag to mark the process as already destroyed */
+       private volatile boolean destroyed;
+
+
        public TestJvmProcess() throws Exception {
                this(getJavaCommandPath(), 
createTemporaryLog4JProperties().getPath());
        }
@@ -111,7 +118,9 @@ public abstract class TestJvmProcess {
         * @param jvmMemoryInMb Amount of memory in Megabytes for the JVM (>= 
80).
         */
        public void setJVMMemory(int jvmMemoryInMb) {
-               checkArgument(jvmMemoryInMb >= 80, "JobManager JVM Requires at 
least 80 MBs of memory.");
+               checkArgument(jvmMemoryInMb >= 80, "Process JVM Requires at 
least 80 MBs of memory.");
+               checkState(process == null, "Cannot set memory after process 
was started");
+
                this.jvmMemoryInMb = jvmMemoryInMb;
        }
 
@@ -139,35 +148,30 @@ public abstract class TestJvmProcess {
                }
 
                synchronized (createDestroyLock) {
-                       if (process == null) {
-                               LOG.debug("Running command '{}'.", 
Arrays.toString(cmd));
-                               this.process = new ProcessBuilder(cmd).start();
+                       checkState(process == null, "process already started");
 
-                               // Forward output
-                               this.processOutput = new StringWriter();
-                               new 
CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+                       LOG.debug("Running command '{}'.", 
Arrays.toString(cmd));
+                       this.process = new ProcessBuilder(cmd).start();
 
-                               try {
-                                       // Add JVM shutdown hook to call 
shutdown of service
-                                       
Runtime.getRuntime().addShutdownHook(shutdownHook);
-                               }
-                               catch (IllegalStateException ignored) {
-                                       // JVM is already shutting down. No 
need to do this.
-                               }
-                               catch (Throwable t) {
-                                       LOG.error("Cannot register process 
cleanup shutdown hook.", t);
-                               }
+                       // Forward output
+                       this.processOutput = new StringWriter();
+                       new 
CommonTestUtils.PipeForwarder(process.getErrorStream(), processOutput);
+
+                       try {
+                               // Add JVM shutdown hook to call shutdown of 
service
+                               
Runtime.getRuntime().addShutdownHook(shutdownHook);
                        }
-                       else {
-                               throw new IllegalStateException("Already 
running.");
+                       catch (IllegalStateException ignored) {
+                               // JVM is already shutting down. No need to do 
this.
+                       }
+                       catch (Throwable t) {
+                               LOG.error("Cannot register process cleanup 
shutdown hook.", t);
                        }
                }
        }
 
        public void printProcessLog() {
-               if (processOutput == null) {
-                       throw new IllegalStateException("Not started");
-               }
+               checkState(processOutput != null, "not started");
 
                System.out.println("-----------------------------------------");
                System.out.println(" BEGIN SPAWNED PROCESS LOG FOR " + 
getName());
@@ -188,28 +192,53 @@ public abstract class TestJvmProcess {
 
        public void destroy() {
                synchronized (createDestroyLock) {
-                       if (process != null) {
-                               LOG.debug("Destroying " + getName() + " 
process.");
+                       checkState(process != null, "process not started");
+
+                       if (destroyed) {
+                               // already done
+                               return;
+                       }
+
+                       LOG.info("Destroying " + getName() + " process.");
 
+                       try {
+                               // try to call "destroyForcibly()" on Java 8
+                               boolean destroyed = false;
                                try {
-                                       process.destroy();
+                                       Method m = 
process.getClass().getMethod("destroyForcibly");
+                                       m.setAccessible(true);
+                                       m.invoke(process);
+                                       destroyed = true;
+                               }
+                               catch (NoSuchMethodException ignored) {
+                                       // happens on Java 7
                                }
                                catch (Throwable t) {
-                                       LOG.error("Error while trying to 
destroy process.", t);
+                                       LOG.error("Failed to forcibly destroy 
process", t);
+                               }
+
+                               // if it was not destroyed, call the regular 
destroy method
+                               if (!destroyed) {
+                                       try {
+                                               process.destroy();
+                                       }
+                                       catch (Throwable t) {
+                                               LOG.error("Error while trying 
to destroy process.", t);
+                                       }
                                }
-                               finally {
-                                       process = null;
-
-                                       if (shutdownHook != null && 
shutdownHook != Thread.currentThread()) {
-                                               try {
-                                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                                               }
-                                               catch (IllegalStateException 
ignored) {
-                                                       // JVM is in shutdown 
already, we can safely ignore this.
-                                               }
-                                               catch (Throwable t) {
-                                                       LOG.warn("Exception 
while unregistering prcess cleanup shutdown hook.");
-                                               }
+                       }
+                       finally {
+                               destroyed = true;
+
+                               if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
+                                       try {
+                                               
Runtime.getRuntime().removeShutdownHook(shutdownHook);
+                                       }
+                                       catch (IllegalStateException ignored) {
+                                               // JVM is in shutdown already, 
we can safely ignore this.
+                                       }
+                                       catch (Throwable t) {
+                                               LOG.warn("Exception while 
unregistering process cleanup shutdown hook.");
                                        }
                                }
                        }
@@ -225,6 +254,47 @@ public abstract class TestJvmProcess {
                }
        }
 
+       /**
+        * Gets the process ID, if possible. This method currently only work on 
UNIX-based
+        * operating systems. On others, it returns {@code -1}.
+        * 
+        * @return The process ID, or -1, if the ID cannot be determined.
+        */
+       public long getProcessId() {
+               checkState(process != null, "process not started");
+
+               try {
+                       Class<? extends Process> clazz = process.getClass();
+                       if (clazz.getName().equals("java.lang.UNIXProcess")) {
+                               Field pidField = clazz.getDeclaredField("pid");
+                               pidField.setAccessible(true);
+                               return pidField.getLong(process);
+                       } else {
+                               return -1;
+                       }
+               }
+               catch (Throwable ignored) {
+                       return -1;
+               }
+       }
+
+       public boolean isAlive() {
+               if (destroyed) {
+                       return false;
+               } else {
+                       try {
+                               // the method throws an exception as long as the
+                               // process is alive
+                               process.exitValue();
+                               return false;
+                       }
+                       catch (IllegalThreadStateException ignored) {
+                               // thi
+                               return true;
+                       }
+               }
+       }
+
        // 
---------------------------------------------------------------------------------------------
        // File based synchronization utilities
        // 
---------------------------------------------------------------------------------------------
@@ -238,6 +308,19 @@ public abstract class TestJvmProcess {
                }
        }
 
+       public static void waitForMarkerFile(File file, long timeoutMillis) 
throws InterruptedException {
+               final long deadline = System.nanoTime() + timeoutMillis * 
1_000_000;
+
+               boolean exists;
+               while (!(exists = file.exists()) && System.nanoTime() < 
deadline) {
+                       Thread.sleep(10);
+               }
+
+               if (!exists) {
+                       fail("The marker file was not found within " + 
timeoutMillis + " msecs");
+               }
+       }
+       
        public static void waitForMarkerFiles(File basedir, String prefix, int 
num, long timeout) {
                long now = System.currentTimeMillis();
                final long deadline = now + timeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
new file mode 100644
index 0000000..f22f42f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockingShutdownTest.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.TestJvmProcess;
+import org.apache.flink.util.OperatingSystem;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
+
+/**
+ * Test that verifies the behavior of blocking shutdown hooks and of the
+ * {@link JvmShutdownSafeguard} that guards against it.
+ */
+public class BlockingShutdownTest {
+
+       @Test
+       public void testProcessShutdownBlocking() throws Exception {
+               // this test works only on linux
+               assumeTrue(OperatingSystem.isLinux());
+
+               // this test leaves remaining processes if not executed with 
Java 8
+               CommonTestUtils.assumeJava8();
+
+               final File markerFile = new File(
+                               
EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + 
".marker");
+
+               final BlockingShutdownProcess blockingProcess = 
+                               new 
BlockingShutdownProcess(markerFile.getAbsolutePath(), 0, false);
+
+               try {
+                       blockingProcess.startProcess();
+                       long pid = blockingProcess.getProcessId();
+                       assertTrue("Cannot determine process ID", pid != -1);
+
+                       // wait for the marker file to appear, which means the 
process is up properly
+                       TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+                       // send it a regular kill command (SIG_TERM)
+                       Process kill = Runtime.getRuntime().exec("kill " + pid);
+                       kill.waitFor();
+                       assertEquals("failed to send SIG_TERM to process", 0, 
kill.exitValue());
+
+                       // minimal delay until the Java process object notices 
that the process is gone
+                       // this will not let the test fail predictably if the 
process is actually in fact going away,
+                       // but it would create frequent failures. Not ideal, 
but the best we can do without
+                       // severely prolonging the test
+                       Thread.sleep(50);
+
+                       // the process should not go away by itself
+                       assertTrue("Test broken, process shutdown blocking does 
not work", blockingProcess.isAlive());
+               }
+               finally {
+                       blockingProcess.destroy();
+
+                       //noinspection ResultOfMethodCallIgnored
+                       markerFile.delete();
+               }
+       }
+
+       @Test
+       public void testProcessExitsDespiteBlockingShutdownHook() throws 
Exception {
+               // this test works only on linux
+               assumeTrue(OperatingSystem.isLinux());
+
+               final File markerFile = new File(
+                               
EnvironmentInformation.getTemporaryFileDirectory(), UUID.randomUUID() + 
".marker");
+
+               final BlockingShutdownProcess blockingProcess = 
+                               new 
BlockingShutdownProcess(markerFile.getAbsolutePath(), 100, true);
+
+               try {
+                       blockingProcess.startProcess();
+                       long pid = blockingProcess.getProcessId();
+                       assertTrue("Cannot determine process ID", pid != -1);
+
+                       // wait for the marker file to appear, which means the 
process is up properly
+                       TestJvmProcess.waitForMarkerFile(markerFile, 30000);
+
+                       // send it a regular kill command (SIG_TERM)
+                       Process kill = Runtime.getRuntime().exec("kill " + pid);
+                       kill.waitFor();
+                       assertEquals("failed to send SIG_TERM to process", 0, 
kill.exitValue());
+
+                       // the process should eventually go away
+                       final long deadline = System.nanoTime() + 
30_000_000_000L; // 30 secs in nanos
+                       while (blockingProcess.isAlive() && System.nanoTime() < 
deadline) {
+                               Thread.sleep(50);
+                       }
+
+                       assertFalse("shutdown blocking process does not 
properly terminate itself", blockingProcess.isAlive());
+               }
+               finally {
+                       blockingProcess.destroy();
+
+                       //noinspection ResultOfMethodCallIgnored
+                       markerFile.delete();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
+       // a method that blocks indefinitely
+       static void parkForever() {
+               // park this forever
+               final Object lock = new Object();
+               //noinspection InfiniteLoopStatement
+               while (true) {
+                       try {
+                               //noinspection 
SynchronizationOnLocalVariableOrMethodParameter
+                               synchronized (lock) {
+                                       lock.wait();
+                               }
+                       } catch (InterruptedException ignored) {}
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Blocking Process Implementation
+       // 
------------------------------------------------------------------------
+
+       private static final class BlockingShutdownProcess extends 
TestJvmProcess {
+
+               private final String tempFilePath;
+               private final long selfKillDelay;
+               private final boolean installSignalHandler;
+
+               public BlockingShutdownProcess(String tempFilePath, long 
selfKillDelay, boolean installSignalHandler)
+                               throws Exception {
+
+                       this.tempFilePath = tempFilePath;
+                       this.selfKillDelay = selfKillDelay;
+                       this.installSignalHandler = installSignalHandler;
+               }
+
+               @Override
+               public String getName() {
+                       return "BlockingShutdownProcess";
+               }
+
+               @Override
+               public String[] getJvmArgs() {
+                       return new String[] { tempFilePath, 
String.valueOf(installSignalHandler), String.valueOf(selfKillDelay) };
+               }
+
+               @Override
+               public String getEntryPointClassName() {
+                       return 
BlockingShutdownProcessEntryPoint.class.getName();
+               }
+       } 
+
+       // 
------------------------------------------------------------------------
+
+       public static final class BlockingShutdownProcessEntryPoint {
+
+               private static final Logger LOG = 
LoggerFactory.getLogger(BlockingShutdownProcessEntryPoint.class);
+
+               public static void main(String[] args) throws Exception {
+                       File touchFile = new File(args[0]);
+                       boolean installHandler = Boolean.parseBoolean(args[1]);
+                       long killDelay = Long.parseLong(args[2]);
+
+                       // install the blocking shutdown hook
+                       Thread shutdownHook = new Thread(new 
BlockingRunnable(), "Blocking ShutdownHook");
+                       try {
+                               // Add JVM shutdown hook to call shutdown of 
service
+                               
Runtime.getRuntime().addShutdownHook(shutdownHook);
+                       }
+                       catch (IllegalStateException ignored) {
+                               // JVM is already shutting down. No need to do 
this.
+                       }
+                       catch (Throwable t) {
+                               System.err.println("Cannot register process 
cleanup shutdown hook.");
+                               t.printStackTrace();
+                       }
+
+                       // install the jvm terminator, if we want it
+                       if (installHandler) {
+                               JvmShutdownSafeguard.installAsShutdownHook(LOG, 
killDelay);
+                       }
+
+                       System.err.println("signaling process started");
+                       TestJvmProcess.touchFile(touchFile);
+
+                       System.err.println("parking the main thread");
+                       parkForever();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       static final class BlockingRunnable implements Runnable {
+               
+               @Override
+               public void run() {
+                       System.err.println("starting shutdown hook");
+                       parkForever();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index cc4a527..d58b0af 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -47,18 +47,16 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.TestEnvironment;
+
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.internal.AssumptionViolatedException;
 import org.junit.runner.RunWith;
 
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
@@ -144,20 +142,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
        public static void startCassandra() throws IOException {
 
                // check if we should run this test, current Cassandra version 
requires Java >= 1.8
-               try {
-                       String javaVersionString = 
System.getProperty("java.runtime.version").substring(0, 3);
-                       float javaVersion = Float.parseFloat(javaVersionString);
-                       Assume.assumeTrue(javaVersion >= 1.8f);
-               }
-               catch (AssumptionViolatedException e) {
-                       System.out.println("Skipping CassandraConnectorITCase, 
because the JDK is < Java 8+");
-                       throw e;
-               }
-               catch (Exception e) {
-                       LOG.error("Cannot determine Java version", e);
-                       e.printStackTrace();
-                       fail("Cannot determine Java version");
-               }
+               org.apache.flink.core.testutils.CommonTestUtils.assumeJava8();
 
                // generate temporary files
                tmpDir = CommonTestUtils.createTempDirectory();

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
index a50a106..d318a3c 100644
--- 
a/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils-junit/src/main/java/org/apache/flink/core/testutils/CommonTestUtils.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.core.testutils;
 
+import org.junit.Assume;
+import org.junit.internal.AssumptionViolatedException;
+
 import java.io.BufferedWriter;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -27,6 +30,8 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 
+import static org.junit.Assert.fail;
+
 /**
  * This class contains reusable utility methods for unit tests.
  */
@@ -88,4 +93,25 @@ public class CommonTestUtils {
                }
                return f.toURI().toString();
        }
+
+       /**
+        * Checks whether this code runs in a Java 8 (Java 1.8) JVM. If not, 
this throws a
+        * {@link AssumptionViolatedException}, which causes JUnit to skip the 
test that
+        * called this method.
+        */
+       public static void assumeJava8() {
+               try {
+                       String javaVersionString = 
System.getProperty("java.runtime.version").substring(0, 3);
+                       float javaVersion = Float.parseFloat(javaVersionString);
+                       Assume.assumeTrue(javaVersion >= 1.8f);
+               }
+               catch (AssumptionViolatedException e) {
+                       System.out.println("Skipping CassandraConnectorITCase, 
because the JDK is < Java 8+");
+                       throw e;
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail("Cannot determine Java version: " + 
e.getMessage());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
index b0757f5..785dff9 100644
--- 
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
+++ 
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/TestingApplicationMaster.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testutils.TestingResourceManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 
 /**
@@ -55,6 +56,7 @@ public class TestingApplicationMaster extends 
YarnApplicationMasterRunner {
        public static void main(String[] args) {
                EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
                SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // run and exit with the proper return code
                int returnCode = new TestingApplicationMaster().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 7453344..6619633 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.process.ProcessReaper;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -107,6 +108,7 @@ public class YarnApplicationMasterRunner {
        public static void main(String[] args) {
                EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
                SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // run and exit with the proper return code
                int returnCode = new YarnApplicationMasterRunner().run(args);

http://git-wip-us.apache.org/repos/asf/flink/blob/5066125f/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index 6839bb5..9638137 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -28,6 +28,8 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -46,7 +48,8 @@ public class YarnTaskManagerRunner {
 
        public static void runYarnTaskManager(String[] args, final Class<? 
extends YarnTaskManager> taskManager) throws IOException {
                EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
TaskManager", args);
-               org.apache.flink.runtime.util.SignalHandler.register(LOG);
+               SignalHandler.register(LOG);
+               JvmShutdownSafeguard.installAsShutdownHook(LOG);
 
                // try to parse the command line arguments
                final Configuration configuration;

Reply via email to