Repository: spark Updated Branches: refs/heads/branch-2.0 1932bb683 -> 972106dd3
[SPARK-16182][CORE] Utils.scala -- terminateProcess() should call Process.destroyForcibly() if and only if Process.destroy() fails ## What changes were proposed in this pull request? Utils.terminateProcess should `destroy()` first and only fall back to `destroyForcibly()` if it fails. It's kind of bad that we're force-killing executors -- and only in Java 8. See JIRA for an example of the impact: no shutdown While here: `Utils.waitForProcess` should use the Java 8 method if available instead of a custom implementation. ## How was this patch tested? Existing tests, which cover the force-kill case, and Amplab tests, which will cover both Java 7 and Java 8 eventually. However I tested locally on Java 8 and the PR builder will try Java 7 here. Author: Sean Owen <[email protected]> Closes #13973 from srowen/SPARK-16182. (cherry picked from commit 2075bf8ef6035fd7606bcf20dc2cd7d7b9cda446) Signed-off-by: Sean Owen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/972106dd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/972106dd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/972106dd Branch: refs/heads/branch-2.0 Commit: 972106dd3bdc40b0980949a09783d6d460e8d268 Parents: 1932bb6 Author: Sean Owen <[email protected]> Authored: Fri Jul 1 09:22:27 2016 +0100 Committer: Sean Owen <[email protected]> Committed: Fri Jul 1 09:22:36 2016 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/util/Utils.scala | 76 ++++++++++++-------- .../org/apache/spark/util/UtilsSuite.scala | 2 +- 2 files changed, 47 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/972106dd/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f77cc2f..0c23f3c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1772,50 +1772,66 @@ private[spark] object Utils extends Logging { } /** - * Terminates a process waiting for at most the specified duration. Returns whether - * the process terminated. + * Terminates a process waiting for at most the specified duration. + * + * @return the process exit value if it was successfully terminated, else None */ def terminateProcess(process: Process, timeoutMs: Long): Option[Int] = { - try { - // Java8 added a new API which will more forcibly kill the process. Use that if available. - val destroyMethod = process.getClass().getMethod("destroyForcibly"); - destroyMethod.setAccessible(true) - destroyMethod.invoke(process) - } catch { - case NonFatal(e) => - if (!e.isInstanceOf[NoSuchMethodException]) { - logWarning("Exception when attempting to kill process", e) - } - process.destroy() - } + // Politely destroy first + process.destroy() + if (waitForProcess(process, timeoutMs)) { + // Successful exit Option(process.exitValue()) } else { - None + // Java 8 added a new API which will more forcibly kill the process. Use that if available. + try { + classOf[Process].getMethod("destroyForcibly").invoke(process) + } catch { + case _: NoSuchMethodException => return None // Not available; give up + case NonFatal(e) => logWarning("Exception when attempting to kill process", e) + } + // Wait, again, although this really should return almost immediately + if (waitForProcess(process, timeoutMs)) { + Option(process.exitValue()) + } else { + logWarning("Timed out waiting to forcibly kill process") + None + } } } /** * Wait for a process to terminate for at most the specified duration. - * Return whether the process actually terminated after the given timeout. + * + * @return whether the process actually terminated before the given timeout. */ def waitForProcess(process: Process, timeoutMs: Long): Boolean = { - var terminated = false - val startTime = System.currentTimeMillis - while (!terminated) { - try { - process.exitValue() - terminated = true - } catch { - case e: IllegalThreadStateException => - // Process not terminated yet - if (System.currentTimeMillis - startTime > timeoutMs) { - return false + try { + // Use Java 8 method if available + classOf[Process].getMethod("waitFor", java.lang.Long.TYPE, classOf[TimeUnit]) + .invoke(process, timeoutMs.asInstanceOf[java.lang.Long], TimeUnit.MILLISECONDS) + .asInstanceOf[Boolean] + } catch { + case _: NoSuchMethodError => + // Otherwise implement it manually + var terminated = false + val startTime = System.currentTimeMillis + while (!terminated) { + try { + process.exitValue() + terminated = true + } catch { + case e: IllegalThreadStateException => + // Process not terminated yet + if (System.currentTimeMillis - startTime > timeoutMs) { + return false + } + Thread.sleep(100) } - Thread.sleep(100) - } + } + true } - true } /** http://git-wip-us.apache.org/repos/asf/spark/blob/972106dd/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index df279b5..f5d0fb0 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -863,7 +863,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(terminated.isDefined) Utils.waitForProcess(process, 5000) val duration = System.currentTimeMillis() - start - assert(duration < 5000) + assert(duration < 6000) // add a little extra time to allow a force kill to finish assert(!pidExists(pid)) } finally { signal(pid, "SIGKILL") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
