Repository: zeppelin Updated Branches: refs/heads/master 30f60248c -> 86f387e94
ZEPPELIN-2197. Interpreter Lifecycle Manager ### What is this PR for? This PR implement the lifecycle manager. There're 2 implementions. * NullLifecycleManager. Nothing is done as before. User has to start/stop interpreter explicitly in UI. * TimeoutLifecycleManager. Interpreter will be closed after idle for one threshold of time. By default it is 1 hour. ### What type of PR is it? [ Feature] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-2197 ### How should this be tested? Unit test is added. ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #2631 from zjffdu/ZEPPELIN-2197 and squashes the following commits: 8b4c6ce [Jeff Zhang] Add one more test 00d0183 [Jeff Zhang] ZEPPELIN-2197. Interpreter Lifecycle Manager Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/86f387e9 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/86f387e9 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/86f387e9 Branch: refs/heads/master Commit: 86f387e940eacb6dac7531a4ed2cadadaa381bbe Parents: 30f6024 Author: Jeff Zhang <[email protected]> Authored: Thu Oct 26 10:01:27 2017 +0800 Committer: Jeff Zhang <[email protected]> Committed: Sat Oct 28 06:55:04 2017 +0800 ---------------------------------------------------------------------- conf/zeppelin-site.xml.template | 19 +++ .../zeppelin/conf/ZeppelinConfiguration.java | 12 +- .../zeppelin/interpreter/InterpreterGroup.java | 19 +++ .../apache/zeppelin/socket/NotebookServer.java | 19 +-- .../interpreter/InterpreterSetting.java | 22 +++- .../interpreter/InterpreterSettingManager.java | 15 ++- .../zeppelin/interpreter/LifecycleManager.java | 35 ++++++ .../interpreter/ManagedInterpreterGroup.java | 5 +- .../lifecycle/NullLifecycleManager.java | 49 ++++++++ .../lifecycle/TimeoutLifecycleManager.java | 75 +++++++++++ .../interpreter/remote/RemoteInterpreter.java | 15 ++- .../java/org/apache/zeppelin/notebook/Note.java | 13 +- .../org/apache/zeppelin/notebook/Notebook.java | 63 +++++----- .../interpreter/AbstractInterpreterTest.java | 3 +- .../InterpreterSettingManagerTest.java | 8 +- .../lifecycle/TimeoutLifecycleManagerTest.java | 126 +++++++++++++++++++ .../interpreter/test/interpreter-setting.json | 15 ++- 17 files changed, 455 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 4c31669..8a2a60e 100755 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -411,6 +411,25 @@ <description>Enable directory listings on server.</description> </property> +<property> + <name>zeppelin.interpreter.lifecyclemanager.class</name> + <value>org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager</value> + <description>LifecycleManager class for managing the lifecycle of interpreters, by default interpreter will + be closed after timeout</description> +</property> + +<property> + <name>zeppelin.interpreter.lifecyclemanager.timeout.checkinterval</name> + <value>60000</value> + <description>milliseconds of the interval to checking whether interpreter is time out</description> +</property> + +<property> + <name>zeppelin.interpreter.lifecyclemanager.timeout.threshold</name> + <value>3600000</value> + <description>milliseconds of the interpreter timeout threshold, by default it is 1 hour</description> +</property> + <!-- <property> <name>zeppelin.server.jetty.name</name> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 720d6ec..cb249d8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -534,6 +534,9 @@ public class ZeppelinConfiguration extends XMLConfiguration { return getString(ConfVars.ZEPPELIN_SERVER_STRICT_TRANSPORT); } + public String getLifecycleManagerClass() { + return getString(ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS); + } public Map<String, String> dumpConfigurations(ZeppelinConfiguration conf, ConfigurationKeyPredicate predicate) { @@ -701,7 +704,14 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_SERVER_KERBEROS_KEYTAB("zeppelin.server.kerberos.keytab", ""), ZEPPELIN_SERVER_KERBEROS_PRINCIPAL("zeppelin.server.kerberos.principal", ""), - ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"); + ZEPPELIN_INTERPRETER_CALLBACK_PORTRANGE("zeppelin.interpreter.callback.portRange", ":"), + + ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS("zeppelin.interpreter.lifecyclemanager.class", + "org.apache.zeppelin.interpreter.lifecycle.TimeoutLifecycleManager"), + ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL( + "zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", 6000L), + ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD( + "zeppelin.interpreter.lifecyclemanager.timeout.threshold", 360000L); private String varName; @SuppressWarnings("rawtypes") http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 6acd601..9f88901 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -142,4 +142,23 @@ public class InterpreterGroup { public boolean isEmpty() { return sessions.isEmpty(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof InterpreterGroup)) { + return false; + } + + InterpreterGroup that = (InterpreterGroup) o; + + return id != null ? id.equals(that.id) : that.id == null; + } + + @Override + public int hashCode() { + return id != null ? id.hashCode() : 0; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 8c13e21..f9a8ba1 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -1376,13 +1376,13 @@ public class NotebookServer extends WebSocketServlet List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { - if (setting.getOrCreateInterpreterGroup(user, note.getId()) == null) { + if (setting.getInterpreterGroup(user, note.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, note.getId()) + if (interpreterGroupId.equals(setting.getInterpreterGroup(user, note.getId()) .getId())) { AngularObjectRegistry angularObjectRegistry = - setting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); + setting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); // first trying to get local registry ao = angularObjectRegistry.get(varName, noteId, paragraphId); @@ -1419,13 +1419,13 @@ public class NotebookServer extends WebSocketServlet List<InterpreterSetting> settings = notebook.getInterpreterSettingManager().getInterpreterSettings(note.getId()); for (InterpreterSetting setting : settings) { - if (setting.getOrCreateInterpreterGroup(user, n.getId()) == null) { + if (setting.getInterpreterGroup(user, n.getId()) == null) { continue; } - if (interpreterGroupId.equals(setting.getOrCreateInterpreterGroup(user, n.getId()) + if (interpreterGroupId.equals(setting.getInterpreterGroup(user, n.getId()) .getId())) { AngularObjectRegistry angularObjectRegistry = - setting.getOrCreateInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); + setting.getInterpreterGroup(user, n.getId()).getAngularObjectRegistry(); this.broadcastExcept(n.getId(), new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", ao) .put("interpreterGroupId", interpreterGroupId).put("noteId", n.getId()) @@ -2297,14 +2297,17 @@ public class NotebookServer extends WebSocketServlet } for (InterpreterSetting intpSetting : settings) { + if (intpSetting.getInterpreterGroup(user, note.getId()) == null) { + continue; + } AngularObjectRegistry registry = - intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); + intpSetting.getInterpreterGroup(user, note.getId()).getAngularObjectRegistry(); List<AngularObject> objects = registry.getAllWithGlobal(note.getId()); for (AngularObject object : objects) { conn.send(serializeMessage( new Message(OP.ANGULAR_OBJECT_UPDATE).put("angularObject", object) .put("interpreterGroupId", - intpSetting.getOrCreateInterpreterGroup(user, note.getId()).getId()) + intpSetting.getInterpreterGroup(user, note.getId()).getId()) .put("noteId", note.getId()).put("paragraphId", object.getParagraphId()))); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 93481be..5b88c12 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -37,6 +37,7 @@ import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext; import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; import org.apache.zeppelin.interpreter.launcher.ShellScriptLauncher; import org.apache.zeppelin.interpreter.launcher.SparkInterpreterLauncher; +import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventPoller; @@ -139,6 +140,7 @@ public class InterpreterSetting { private transient InterpreterLauncher launcher; /////////////////////////////////////////////////////////////////////////////////////////// + private transient LifecycleManager lifecycleManager; /** * Builder class for InterpreterSetting @@ -233,6 +235,11 @@ public class InterpreterSetting { return this; } + public Builder setLifecycleManager(LifecycleManager lifecycleManager) { + interpreterSetting.lifecycleManager = lifecycleManager; + return this; + } + public InterpreterSetting create() { // post processing interpreterSetting.postProcessing(); @@ -249,6 +256,9 @@ public class InterpreterSetting { void postProcessing() { this.status = Status.READY; + if (this.lifecycleManager == null) { + this.lifecycleManager = new NullLifecycleManager(conf); + } } /** @@ -321,6 +331,14 @@ public class InterpreterSetting { this.interpreterSettingManager = interpreterSettingManager; } + public void setLifecycleManager(LifecycleManager lifecycleManager) { + this.lifecycleManager = lifecycleManager; + } + + public LifecycleManager getLifecycleManager() { + return lifecycleManager; + } + public String getId() { return id; } @@ -384,7 +402,7 @@ public class InterpreterSetting { this.interpreterGroups.remove(groupId); } - ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) { + public ManagedInterpreterGroup getInterpreterGroup(String user, String noteId) { String groupId = getInterpreterGroupId(user, noteId); try { interpreterGroupReadLock.lock(); @@ -628,7 +646,7 @@ public class InterpreterSetting { for (InterpreterInfo info : interpreterInfos) { Interpreter interpreter = null; interpreter = new RemoteInterpreter(getJavaProperties(), sessionId, - info.getClassName(), user); + info.getClassName(), user, lifecycleManager); if (info.isDefaultInterpreter()) { interpreters.add(0, interpreter); } else { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index abaf634..f27f83d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -51,6 +51,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; @@ -116,7 +117,7 @@ public class InterpreterSettingManager { private RemoteInterpreterProcessListener remoteInterpreterProcessListener; private ApplicationEventListener appEventListener; private DependencyResolver dependencyResolver; - + private LifecycleManager lifecycleManager; public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, AngularObjectRegistryListener angularObjectRegistryListener, @@ -153,6 +154,14 @@ public class InterpreterSettingManager { this.angularObjectRegistryListener = angularObjectRegistryListener; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; this.appEventListener = appEventListener; + try { + this.lifecycleManager = (LifecycleManager) + Class.forName(conf.getLifecycleManagerClass()).getConstructor(ZeppelinConfiguration.class) + .newInstance(conf); + } catch (Exception e) { + throw new IOException("Fail to create LifecycleManager", e); + } + init(); } @@ -177,6 +186,7 @@ public class InterpreterSettingManager { remoteInterpreterProcessListener); savedInterpreterSetting.setAppEventListener(appEventListener); savedInterpreterSetting.setDependencyResolver(dependencyResolver); + savedInterpreterSetting.setLifecycleManager(lifecycleManager); savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties( savedInterpreterSetting.getProperties() )); @@ -372,6 +382,7 @@ public class InterpreterSettingManager { interpreterSetting.setAppEventListener(appEventListener); interpreterSetting.setDependencyResolver(dependencyResolver); interpreterSetting.setInterpreterSettingManager(this); + interpreterSetting.setLifecycleManager(lifecycleManager); interpreterSetting.postProcessing(); interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); } @@ -633,6 +644,7 @@ public class InterpreterSettingManager { setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); setting.setDependencyResolver(dependencyResolver); setting.setAngularObjectRegistryListener(angularObjectRegistryListener); + setting.setLifecycleManager(lifecycleManager); setting.setInterpreterSettingManager(this); setting.postProcessing(); interpreterSettings.put(setting.getId(), setting); @@ -645,6 +657,7 @@ public class InterpreterSettingManager { interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting); interpreterSetting.setAppEventListener(appEventListener); interpreterSetting.setDependencyResolver(dependencyResolver); + interpreterSetting.setLifecycleManager(lifecycleManager); interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); interpreterSetting.setInterpreterSettingManager(this); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java new file mode 100644 index 0000000..fc2a7bd --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + + +/** + * Interface for managing the lifecycle of interpreters + */ +public interface LifecycleManager { + + void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup); + + void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup, + String sessionId); + + void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, + String sessionId); + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java index 96f0195..a8ae338 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -47,6 +47,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup { ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) { super(id); this.interpreterSetting = interpreterSetting; + interpreterSetting.getLifecycleManager().onInterpreterGroupCreated(this); } public InterpreterSetting getInterpreterSetting() { @@ -88,7 +89,7 @@ public class ManagedInterpreterGroup extends InterpreterGroup { LOGGER.info("Remove this InterpreterGroup: {} as all the sessions are closed", id); interpreterSetting.removeInterpreterGroup(id); if (remoteInterpreterProcess != null) { - LOGGER.info("Kill RemoteIntetrpreterProcess"); + LOGGER.info("Kill RemoteInterpreterProcess"); remoteInterpreterProcess.stop(); remoteInterpreterProcess = null; } @@ -134,8 +135,10 @@ public class ManagedInterpreterGroup extends InterpreterGroup { interpreter.setInterpreterGroup(this); } LOGGER.info("Create Session: {} in InterpreterGroup: {} for user: {}", sessionId, id, user); + interpreterSetting.getLifecycleManager().onInterpreterSessionCreated(this, sessionId); sessions.put(sessionId, interpreters); return interpreters; } } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java new file mode 100644 index 0000000..ce633c6 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter.lifecycle; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.LifecycleManager; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; + +/** + * Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. + */ +public class NullLifecycleManager implements LifecycleManager { + + public NullLifecycleManager(ZeppelinConfiguration zConf) { + + } + + @Override + public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) { + + } + + @Override + public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup, + String sessionId) { + + } + + @Override + public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) { + + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java new file mode 100644 index 0000000..7042060 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -0,0 +1,75 @@ +package org.apache.zeppelin.interpreter.lifecycle; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.interpreter.LifecycleManager; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; + + +/** + * This lifecycle manager would close interpreter after it is timeout. By default, it is timeout + * after no using in 1 hour. + * + * For now, this class only manage the lifecycle of interpreter group (will close interpreter + * process after timeout). Managing the lifecycle of interpreter session could be done in future + * if necessary. + */ +public class TimeoutLifecycleManager implements LifecycleManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class); + + // ManagerInterpreterGroup -> LastTimeUsing timestamp + private Map<ManagedInterpreterGroup, Long> interpreterGroups = new ConcurrentHashMap<>(); + + private long checkInterval; + private long timeoutThreshold; + + private Timer checkTimer; + + public TimeoutLifecycleManager(ZeppelinConfiguration zConf) { + this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars + .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); + this.timeoutThreshold = zConf.getLong( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); + this.checkTimer = new Timer(true); + this.checkTimer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + long now = System.currentTimeMillis(); + for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) { + ManagedInterpreterGroup interpreterGroup = entry.getKey(); + Long lastTimeUsing = entry.getValue(); + if ((now - lastTimeUsing) > timeoutThreshold ) { + LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId()); + interpreterGroup.close(); + interpreterGroups.remove(entry.getKey()); + } + } + } + }, checkInterval, checkInterval); + LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval + + ", timeoutThreshold: " + timeoutThreshold); + } + + @Override + public void onInterpreterGroupCreated(ManagedInterpreterGroup interpreterGroup) { + interpreterGroups.put(interpreterGroup, System.currentTimeMillis()); + } + + @Override + public void onInterpreterSessionCreated(ManagedInterpreterGroup interpreterGroup, + String sessionId) { + + } + + @Override + public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) { + interpreterGroups.put(interpreterGroup, System.currentTimeMillis()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 338210d..1ab459e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; @@ -66,17 +67,21 @@ public class RemoteInterpreter extends Interpreter { private volatile boolean isOpened = false; private volatile boolean isCreated = false; + private LifecycleManager lifecycleManager; + /** * Remote interpreter and manage interpreter process */ public RemoteInterpreter(Properties properties, String sessionId, String className, - String userName) { + String userName, + LifecycleManager lifecycleManager) { super(properties); this.sessionId = sessionId; this.className = className; this.userName = userName; + this.lifecycleManager = lifecycleManager; } public boolean isOpened() { @@ -149,6 +154,7 @@ public class RemoteInterpreter extends Interpreter { } }); isOpened = true; + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } } } @@ -189,6 +195,7 @@ public class RemoteInterpreter extends Interpreter { } }); isOpened = false; + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } else { LOGGER.warn("close is called when RemoterInterpreter is not opened for " + className); } @@ -218,6 +225,7 @@ public class RemoteInterpreter extends Interpreter { interpreterContextRunnerPool.clear(noteId); interpreterContextRunnerPool.addAll(noteId, runners); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<InterpreterResult>() { @Override @@ -266,6 +274,7 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { @Override public Void call(Client client) throws Exception { @@ -293,6 +302,7 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); FormType type = interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<FormType>() { @Override @@ -317,6 +327,7 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<Integer>() { @Override @@ -341,6 +352,7 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { @Override @@ -362,6 +374,7 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new RuntimeException(e); } + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( new RemoteInterpreterProcess.RemoteFunction<String>() { @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index b5dda67..c89eee5 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -685,9 +685,11 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); - AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); - angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); + InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); + if (intpGroup != null) { + AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); + angularObjects.put(intpGroup.getId(), registry.getAllWithGlobal(id)); + } } } @@ -700,7 +702,10 @@ public class Note implements ParagraphJobListener, JsonSerializable { } for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(user, id); + if (setting.getInterpreterGroup(user, id) == null) { + continue; + } + InterpreterGroup intpGroup = setting.getInterpreterGroup(user, id); AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); if (registry instanceof RemoteAngularObjectRegistry) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 77fd04c..c1dc46c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -334,39 +334,41 @@ public class Notebook implements NoteEventListener { // remove from all interpreter instance's angular object registry for (InterpreterSetting settings : interpreterSettingManager.get()) { - AngularObjectRegistry registry = - settings.getOrCreateInterpreterGroup(subject.getUser(), id).getAngularObjectRegistry(); - if (registry instanceof RemoteAngularObjectRegistry) { - // remove paragraph scope object - for (Paragraph p : note.getParagraphs()) { - ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId()); - - // remove app scope object - List<ApplicationState> appStates = p.getAllApplicationStates(); - if (appStates != null) { - for (ApplicationState app : appStates) { - ((RemoteAngularObjectRegistry) registry) - .removeAllAndNotifyRemoteProcess(id, app.getId()); + InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id); + if (interpreterGroup != null) { + AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); + if (registry instanceof RemoteAngularObjectRegistry) { + // remove paragraph scope object + for (Paragraph p : note.getParagraphs()) { + ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId()); + + // remove app scope object + List<ApplicationState> appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + ((RemoteAngularObjectRegistry) registry) + .removeAllAndNotifyRemoteProcess(id, app.getId()); + } } } - } - // remove note scope object - ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null); - } else { - // remove paragraph scope object - for (Paragraph p : note.getParagraphs()) { - registry.removeAll(id, p.getId()); - - // remove app scope object - List<ApplicationState> appStates = p.getAllApplicationStates(); - if (appStates != null) { - for (ApplicationState app : appStates) { - registry.removeAll(id, app.getId()); + // remove note scope object + ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null); + } else { + // remove paragraph scope object + for (Paragraph p : note.getParagraphs()) { + registry.removeAll(id, p.getId()); + + // remove app scope object + List<ApplicationState> appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + registry.removeAll(id, app.getId()); + } } } + // remove note scope object + registry.removeAll(id, null); } - // remove note scope object - registry.removeAll(id, null); } } @@ -517,9 +519,8 @@ public class Notebook implements NoteEventListener { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); List<InterpreterSetting> settings = interpreterSettingManager.get(); for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getOrCreateInterpreterGroup(subject.getUser(), - note.getId()); - if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { + InterpreterGroup intpGroup = setting.getInterpreterGroup(subject.getUser(), note.getId()); + if (intpGroup != null && intpGroup.getId().equals(snapshot.getIntpGroupId())) { AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); String noteId = snapshot.getAngularObject().getNoteId(); String paragraphId = snapshot.getAngularObject().getParagraphId(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java index ad3dd99..6ba8a49 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -32,7 +32,7 @@ public abstract class AbstractInterpreterTest { protected File interpreterDir; protected File confDir; protected File notebookDir; - protected ZeppelinConfiguration conf; + protected ZeppelinConfiguration conf = new ZeppelinConfiguration(); @Before public void setUp() throws Exception { @@ -55,7 +55,6 @@ public abstract class AbstractInterpreterTest { System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DIR.getVarName(), interpreterDir.getAbsolutePath()); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); - conf = new ZeppelinConfiguration(); conf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName(), "test,mock1,mock2,mock_resource_pool"); interpreterSettingManager = new InterpreterSettingManager(conf, mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java index 19f16f5..ec79ada 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingManagerTest.java @@ -50,9 +50,9 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest { InterpreterSetting interpreterSetting = interpreterSettingManager.getByName("test"); assertEquals("test", interpreterSetting.getName()); assertEquals("test", interpreterSetting.getGroup()); - assertEquals(2, interpreterSetting.getInterpreterInfos().size()); + assertEquals(3, interpreterSetting.getInterpreterInfos().size()); // 3 other builtin properties: - // * zeppelin.interpeter.output.limit + // * zeppelin.interpreter.output.limit // * zeppelin.interpreter.localRepo // * zeppelin.interpreter.max.poolsize assertEquals(6, interpreterSetting.getJavaProperties().size()); @@ -67,7 +67,6 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest { assertNotNull(interpreterSetting.getAppEventListener()); assertNotNull(interpreterSetting.getDependencyResolver()); assertNotNull(interpreterSetting.getInterpreterSettingManager()); - assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath()); List<RemoteRepository> repositories = interpreterSettingManager.getRepositories(); assertEquals(2, repositories.size()); @@ -80,14 +79,13 @@ public class InterpreterSettingManagerTest extends AbstractInterpreterTest { interpreterSetting = interpreterSettingManager2.getByName("test"); assertEquals("test", interpreterSetting.getName()); assertEquals("test", interpreterSetting.getGroup()); - assertEquals(2, interpreterSetting.getInterpreterInfos().size()); + assertEquals(3, interpreterSetting.getInterpreterInfos().size()); assertEquals(6, interpreterSetting.getJavaProperties().size()); assertEquals("value_1", interpreterSetting.getJavaProperties().getProperty("property_1")); assertEquals("new_value_2", interpreterSetting.getJavaProperties().getProperty("property_2")); assertEquals("value_3", interpreterSetting.getJavaProperties().getProperty("property_3")); assertEquals("shared", interpreterSetting.getOption().perNote); assertEquals("shared", interpreterSetting.getOption().perUser); - assertEquals("linux_runner", interpreterSetting.getInterpreterRunner().getPath()); assertEquals(0, interpreterSetting.getDependencies().size()); repositories = interpreterSettingManager2.getRepositories(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java new file mode 100644 index 0000000..971f376 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManagerTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.lifecycle; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.interpreter.AbstractInterpreterTest; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.user.AuthenticationInfo; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TimeoutLifecycleManagerTest extends AbstractInterpreterTest { + + @Override + public void setUp() throws Exception { + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_CLASS.getVarName(), + TimeoutLifecycleManager.class.getName()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL.getVarName(), "1000"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD.getVarName(), "10000"); + super.setUp(); + } + + @Test + public void testTimeout_1() throws InterpreterException, InterruptedException, IOException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.echo") instanceof RemoteInterpreter); + RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.echo"); + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + remoteInterpreter.interpret("hello world", context); + assertTrue(remoteInterpreter.isOpened()); + InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + Thread.sleep(15 * 1000); + // interpreterGroup is timeout, so is removed. + assertEquals(0, interpreterSetting.getAllInterpreterGroups().size()); + assertFalse(remoteInterpreter.isOpened()); + } + + @Test + public void testTimeout_2() throws InterpreterException, InterruptedException, IOException { + interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getSettingIds()); + assertTrue(interpreterFactory.getInterpreter("user1", "note1", "test.sleep") instanceof RemoteInterpreter); + final RemoteInterpreter remoteInterpreter = (RemoteInterpreter) interpreterFactory.getInterpreter("user1", "note1", "test.sleep"); + + // simulate how zeppelin submit paragraph + remoteInterpreter.getScheduler().submit(new Job("test-job", null) { + @Override + public Object getReturn() { + return null; + } + + @Override + public int progress() { + return 0; + } + + @Override + public Map<String, Object> info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + InterpreterContext context = new InterpreterContext("noteId", "paragraphId", "repl", + "title", "text", AuthenticationInfo.ANONYMOUS, new HashMap<String, Object>(), new GUI(), + null, null, new ArrayList<InterpreterContextRunner>(), null); + return remoteInterpreter.interpret("100000", context); + } + + @Override + protected boolean jobAbort() { + return false; + } + + @Override + public void setResult(Object results) { + + } + }); + + while(!remoteInterpreter.isOpened()) { + Thread.sleep(1000); + LOGGER.info("Wait for interpreter to be started"); + } + + InterpreterSetting interpreterSetting = interpreterSettingManager.getInterpreterSettingByName("test"); + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + + Thread.sleep(15 * 1000); + // interpreterGroup is not timeout because getStatus is called periodically. + assertEquals(1, interpreterSetting.getAllInterpreterGroups().size()); + assertTrue(remoteInterpreter.isOpened()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/86f387e9/zeppelin-zengine/src/test/resources/interpreter/test/interpreter-setting.json ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/resources/interpreter/test/interpreter-setting.json b/zeppelin-zengine/src/test/resources/interpreter/test/interpreter-setting.json index 596fc91..99e980b 100644 --- a/zeppelin-zengine/src/test/resources/interpreter/test/interpreter-setting.json +++ b/zeppelin-zengine/src/test/resources/interpreter/test/interpreter-setting.json @@ -42,8 +42,19 @@ "description": "desc_2" } }, - "runner": { - "linux": "linux_runner" + "editor": { + "language": "java", + "editOnDblClick": false + } + }, + + { + "group": "test", + "name": "sleep", + "defaultInterpreter": false, + "className": "org.apache.zeppelin.interpreter.SleepInterpreter", + "properties": { + }, "editor": { "language": "java",
