Repository: incubator-zeppelin Updated Branches: refs/heads/branch-0.5 6ed67913d -> e6df65771
ZEPPELIN-79 Zeppelin does not kill some interpreters when server is stopped https://issues.apache.org/jira/browse/ZEPPELIN-79 Zeppelin sometimes left interpreter process after it is stopped. This pr solve the problem by increase timeout for graceful shutdown Author: Lee moon soo <[email protected]> Closes #135 from Leemoonsoo/ZEPPELIN-79 and squashes the following commits: d2b1fa6 [Lee moon soo] Close and destroy interpreters in parallel 4558417 [Lee moon soo] Increase graceful shutdown timeout (cherry picked from commit 12e5abf2803e4c5015998672b10642fc72aac0da) Signed-off-by: Lee moon soo <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/e6df6577 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/e6df6577 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/e6df6577 Branch: refs/heads/branch-0.5 Commit: e6df657714ada918fe345048d87acd04b80af28a Parents: 6ed6791 Author: Lee moon soo <[email protected]> Authored: Thu Jul 2 12:05:23 2015 -0700 Committer: Lee moon soo <[email protected]> Committed: Sun Jul 5 10:45:11 2015 -0700 ---------------------------------------------------------------------- bin/zeppelin-daemon.sh | 20 ++++++--- .../zeppelin/interpreter/InterpreterGroup.java | 46 ++++++++++++++++++-- .../remote/RemoteInterpreterProcess.java | 4 +- .../interpreter/InterpreterFactory.java | 25 ++++++++--- 4 files changed, 77 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/bin/zeppelin-daemon.sh ---------------------------------------------------------------------- diff --git a/bin/zeppelin-daemon.sh b/bin/zeppelin-daemon.sh index 2440c12..a386f27 100755 --- a/bin/zeppelin-daemon.sh +++ b/bin/zeppelin-daemon.sh @@ -101,19 +101,27 @@ function wait_for_zeppelin_to_die() { local pid local count pid=$1 + timeout=$2 count=0 - while [[ "${count}" -lt 10 ]]; do + timeoutTime=$(date "+%s") + let "timeoutTime+=$timeout" + currentTime=$(date "+%s") + forceKill=1 + + while [[ $currentTime -lt $timeoutTime ]]; do $(kill ${pid} > /dev/null 2> /dev/null) if kill -0 ${pid} > /dev/null 2>&1; then sleep 3 - let "count+=1" else + forceKill=0 break fi - if [[ "${count}" == "5" ]]; then + currentTime=$(date "+%s") + done + + if [[ forceKill -ne 0 ]]; then $(kill -9 ${pid} > /dev/null 2> /dev/null) fi - done } function wait_zeppelin_is_up_for_ci() { @@ -187,7 +195,7 @@ function stop() { if [[ -z "${pid}" ]]; then echo "${ZEPPELIN_NAME} is not running" else - wait_for_zeppelin_to_die $pid + wait_for_zeppelin_to_die $pid 40 $(rm -f ${ZEPPELIN_PID}) action_msg "${ZEPPELIN_NAME} stop" "${SET_OK}" fi @@ -200,7 +208,7 @@ function stop() { fi pid=$(cat ${f}) - wait_for_zeppelin_to_die $pid + wait_for_zeppelin_to_die $pid 20 $(rm -f ${f}) done http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 9baaef3..216663a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -18,9 +18,11 @@ package org.apache.zeppelin.interpreter; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.Random; +import org.apache.log4j.Logger; import org.apache.zeppelin.display.AngularObjectRegistry; /** @@ -71,14 +73,50 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ } public void close() { - for (Interpreter intp : this) { - intp.close(); + List<Thread> closeThreads = new LinkedList<Thread>(); + + for (final Interpreter intp : this) { + Thread t = new Thread() { + public void run() { + intp.close(); + } + }; + + t.start(); + closeThreads.add(t); + } + + for (Thread t : closeThreads) { + try { + t.join(); + } catch (InterruptedException e) { + Logger logger = Logger.getLogger(InterpreterGroup.class); + logger.error("Can't close interpreter", e); + } } } public void destroy() { - for (Interpreter intp : this) { - intp.destroy(); + List<Thread> destroyThreads = new LinkedList<Thread>(); + + for (final Interpreter intp : this) { + Thread t = new Thread() { + public void run() { + intp.destroy(); + } + }; + + t.start(); + destroyThreads.add(t); + } + + for (Thread t : destroyThreads) { + try { + t.join(); + } catch (InterruptedException e) { + Logger logger = Logger.getLogger(InterpreterGroup.class); + logger.error("Can't close interpreter", e); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index f917eb9..91edd41 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -163,10 +163,10 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { clientPool.clear(); clientPool.close(); - // wait for 3 sec and force kill + // wait for some time (connectTimeout) and force kill // remote process server.serve() loop is not always finishing gracefully long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 3 * 1000) { + while (System.currentTimeMillis() - startTime < connectTimeout) { if (this.isRunning()) { try { Thread.sleep(500); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/e6df6577/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 77df7c5..57e2b7a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -546,13 +546,26 @@ public class InterpreterFactory { public void close() { + List<Thread> closeThreads = new LinkedList<Thread>(); synchronized (interpreterSettings) { - synchronized (interpreterSettings) { - Collection<InterpreterSetting> intpsettings = interpreterSettings.values(); - for (InterpreterSetting intpsetting : intpsettings) { - intpsetting.getInterpreterGroup().close(); - intpsetting.getInterpreterGroup().destroy(); - } + Collection<InterpreterSetting> intpsettings = interpreterSettings.values(); + for (final InterpreterSetting intpsetting : intpsettings) { + Thread t = new Thread() { + public void run() { + intpsetting.getInterpreterGroup().close(); + intpsetting.getInterpreterGroup().destroy(); + } + }; + t.start(); + closeThreads.add(t); + } + } + + for (Thread t : closeThreads) { + try { + t.join(); + } catch (InterruptedException e) { + logger.error("Can't close interpreterGroup", e); } } }
