Repository: incubator-zeppelin Updated Branches: refs/heads/master 0a68c0b11 -> f8aeee11e
[ZEPPELIN-535] "Scheduler already terminated" occurs when RemoteInterpreter.close() doesn't succeed ### What is this PR for? Fix the exception "Scheduler already terminated" when remove interpreter close() fails ### What type of PR is it? Bug Fix ### Is there a relevant Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-535 ### How should this be tested? Modify any interpreter to throw exception on close() call. And try to use it after restart the interpreter. ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: Lee moon soo <[email protected]> Closes #574 from Leemoonsoo/ZEPPELIN-535 and squashes the following commits: 66d7c09 [Lee moon soo] Remove unnecessary check in test 9b7e8d5 [Lee moon soo] Remove interpreterGroupReference Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/f8aeee11 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/f8aeee11 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/f8aeee11 Branch: refs/heads/master Commit: f8aeee11e3bb447ee392cfb004b0acbe2e6bc435 Parents: 0a68c0b Author: Lee moon soo <[email protected]> Authored: Sun Dec 27 19:55:28 2015 -0800 Committer: Lee moon soo <[email protected]> Committed: Mon Dec 28 17:34:54 2015 -0800 ---------------------------------------------------------------------- .../zeppelin/interpreter/InterpreterGroup.java | 17 ++++ .../interpreter/remote/RemoteInterpreter.java | 81 +++++--------------- .../remote/RemoteInterpreterTest.java | 6 -- 3 files changed, 36 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 216663a..9256bcd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -24,6 +24,7 @@ import java.util.Random; import org.apache.log4j.Logger; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; /** * InterpreterGroup is list of interpreters in the same group. @@ -33,6 +34,7 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ String id; AngularObjectRegistry angularObjectRegistry; + RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process public InterpreterGroup(String id) { this.id = id; @@ -72,6 +74,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ this.angularObjectRegistry = angularObjectRegistry; } + public RemoteInterpreterProcess getRemoteInterpreterProcess() { + return remoteInterpreterProcess; + } + + public void setRemoteInterpreterProcess(RemoteInterpreterProcess remoteInterpreterProcess) { + this.remoteInterpreterProcess = remoteInterpreterProcess; + } + public void close() { List<Thread> closeThreads = new LinkedList<Thread>(); @@ -118,5 +128,12 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ logger.error("Can't close interpreter", e); } } + + // make sure remote interpreter process terminates + if (remoteInterpreterProcess != null) { + while (remoteInterpreterProcess.referenceCount() > 0) { + remoteInterpreterProcess.dereference(); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 3ac5121..c72aa7c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -56,8 +56,6 @@ public class RemoteInterpreter extends Interpreter { FormType formType; boolean initialized; private Map<String, String> env; - static Map<String, RemoteInterpreterProcess> interpreterGroupReference - = new HashMap<String, RemoteInterpreterProcess>(); private int connectTimeout; @@ -96,19 +94,21 @@ public class RemoteInterpreter extends Interpreter { } public RemoteInterpreterProcess getInterpreterProcess() { - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - RemoteInterpreterProcess interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - try { - return interpreterProcess; - } catch (Exception e) { - throw new InterpreterException(e); - } - } else { - // closed or not opened yet - return null; + InterpreterGroup intpGroup = getInterpreterGroup(); + if (intpGroup == null) { + return null; + } + + synchronized (intpGroup) { + if (intpGroup.getRemoteInterpreterProcess() == null) { + // create new remote process + RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess( + interpreterRunner, interpreterPath, env, connectTimeout); + + intpGroup.setRemoteInterpreterProcess(remoteProcess); } + + return intpGroup.getRemoteInterpreterProcess(); } } @@ -117,17 +117,7 @@ public class RemoteInterpreter extends Interpreter { return; } - RemoteInterpreterProcess interpreterProcess = null; - - synchronized (interpreterGroupReference) { - if (interpreterGroupReference.containsKey(getInterpreterGroupKey(getInterpreterGroup()))) { - interpreterProcess = interpreterGroupReference - .get(getInterpreterGroupKey(getInterpreterGroup())); - } else { - throw new InterpreterException("Unexpected error"); - } - } - + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); int rc = interpreterProcess.reference(getInterpreterGroup()); synchronized (interpreterProcess) { @@ -170,24 +160,14 @@ public class RemoteInterpreter extends Interpreter { Client client = null; try { client = interpreterProcess.getClient(); + client.close(className); } catch (Exception e1) { throw new InterpreterException(e1); - } - - try { - client.close(className); - } catch (TException e) { - throw new InterpreterException(e); } finally { - interpreterProcess.releaseClient(client); - } - - int r = interpreterProcess.dereference(); - if (r == 0) { - synchronized (interpreterGroupReference) { - InterpreterGroup intpGroup = getInterpreterGroup(); - interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup)); + if (client != null) { + interpreterProcess.releaseClient(client); } + getInterpreterProcess().dereference(); } } @@ -339,29 +319,6 @@ public class RemoteInterpreter extends Interpreter { } } - - @Override - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { - super.setInterpreterGroup(interpreterGroup); - - synchronized (interpreterGroupReference) { - RemoteInterpreterProcess intpProcess = interpreterGroupReference - .get(getInterpreterGroupKey(interpreterGroup)); - - // when interpreter process is not created or terminated - if (intpProcess == null || (!intpProcess.isRunning() && intpProcess.getPort() > 0) - || (!intpProcess.isRunning() && intpProcess.getPort() == -1)) { - interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), - new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env, connectTimeout)); - - logger.info("setInterpreterGroup = " - + getInterpreterGroupKey(interpreterGroup) + " class=" + className - + ", path=" + interpreterPath); - } - } - } - private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) { return interpreterGroup.getId(); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/f8aeee11/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index bbda252..c938ff3 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -219,9 +219,6 @@ public class RemoteInterpreterTest { intpA.close(); intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertNull(process); } @Test @@ -337,9 +334,6 @@ public class RemoteInterpreterTest { intpA.close(); intpB.close(); - - RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertNull(process); } @Test
