This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 7eb665e [ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher can not
listen cluster events
7eb665e is described below
commit 7eb665e1d4048690f209b4132c5f76bc89357cd3
Author: Xun Liu <[email protected]>
AuthorDate: Fri Jul 5 12:57:16 2019 +0800
[ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher can not listen cluster
events
### What is this PR for?
1. **ClusterInterpreterLauncher can not listen cluster events**
In Cluster mode, The `ClusterInterpreterLauncher` needs to listen for
cluster events, Accept requests from other zeppelin servers to create
interpreter processes.
Since the `ClusterInterpreterLauncher` is lazy, dynamically generated, So
in cluster mode, when the zeppelin service starts, Create a
`ClusterInterpreterLauncher` object, This allows the
`ClusterInterpreterLauncher` to listen for cluster events.
2. **Replace `RemoteInterpreterManagedProcess` with
`ClusterInterpreterProcess` for local interpreter process creation**
Because in cluster mode, `RemoteInterpreterServer` performs service
discovery by registering thrift's `ip` and `port` into the cluster metadata,
not through the thrift interface `callback`.
### What type of PR is it?
[Bug Fix]
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4209
### How should this be tested?
* [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/552320897)
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: Xun Liu <[email protected]>
Closes #3389 from liuxunorg/ZEPPELIN-4209 and squashes the following
commits:
85464c092 [Xun Liu] fixed travis faild
71a64b373 [Xun Liu] Check if the interpreter process is offline
1b52d2343 [Xun Liu] ClusterInterpreterLauncher initialization in
zeppelinserver
b1cdcaba7 [Xun Liu] [ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher
can not listen to cluster events
---
.../launcher/ClusterInterpreterCheckThread.java | 86 +++++++++++++++++++++
.../launcher/ClusterInterpreterLauncher.java | 83 +++++++++++---------
.../launcher/ClusterInterpreterProcess.java | 88 +---------------------
.../launcher/ClusterInterpreterLauncherTest.java | 15 ++--
.../interpreter/launcher/ClusterMockTest.java | 12 ++-
.../org/apache/zeppelin/server/ZeppelinServer.java | 19 +++++
.../zeppelin/interpreter/InterpreterSetting.java | 4 +-
7 files changed, 176 insertions(+), 131 deletions(-)
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
new file mode 100644
index 0000000..fb7e41f
--- /dev/null
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+// Metadata registered in the cluster by the interpreter process,
+// Keep the interpreter process started
+public class ClusterInterpreterCheckThread extends Thread {
+ private static final Logger LOGGER
+ = LoggerFactory.getLogger(ClusterInterpreterCheckThread.class);
+
+ private ClusterInterpreterProcess intpProcess;
+
+ ClusterInterpreterCheckThread(ClusterInterpreterProcess intpProcess) {
+ this.intpProcess = intpProcess;
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("ClusterInterpreterCheckThread run() >>>");
+
+ ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+ String intpGroupId = intpProcess.getInterpreterGroupId();
+
+ HashMap<String, Object> intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ int connectTimeout = intpProcess.getConnectTimeout();
+
+ int MAX_RETRY_GET_META = connectTimeout /
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+ int retryGetMeta = 0;
+ while ((retryGetMeta++ < MAX_RETRY_GET_META)
+ && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
+ try {
+ Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
+ intpMeta = clusterServer
+ .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+ LOGGER.info("retry {} times to get {} meta!", retryGetMeta,
intpGroupId);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+
+ if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
+ && intpMeta.containsKey(INTP_TSERVER_PORT)) {
+ String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+ int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+ LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
+
+ intpProcess.processStarted(intpPort, intpHost);
+ break;
+ }
+ }
+
+ if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+ LOGGER.error("Can not found interpreter meta!");
+ }
+
+ LOGGER.info("ClusterInterpreterCheckThread run() <<<");
+ }
+}
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 1fe77a03..83f4232 100644
---
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang.StringUtils;
import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.cluster.event.ClusterEvent;
import org.apache.zeppelin.cluster.event.ClusterEventListener;
@@ -38,8 +39,10 @@ import java.util.Map;
import static
org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
/**
@@ -71,7 +74,8 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
HashMap<String, Object> intpProcMeta = clusterServer
.getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
- && intpProcMeta.containsKey(INTP_TSERVER_PORT)) {
+ && intpProcMeta.containsKey(INTP_TSERVER_PORT) &&
intpProcMeta.containsKey(STATUS)
+ && StringUtils.equals((String) intpProcMeta.get(STATUS),
ONLINE_STATUS)) {
// connect exist Interpreter Process
String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
@@ -91,9 +95,9 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
String srvHost = (String) meta.get(SERVER_HOST);
String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
- if (localhost.equalsIgnoreCase(srvHost) && false) {
+ if (localhost.equalsIgnoreCase(srvHost)) {
// launch interpreter on local
- return super.launch(context);
+ return createInterpreterProcess(context);
} else {
int srvPort = (int) meta.get(SERVER_PORT);
@@ -104,15 +108,20 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
mapEvent.put(CLUSTER_EVENT_MSG, sContext);
String strEvent = gson.toJson(mapEvent);
+ // Notify other server in the cluster that the resource is idle to
create an interpreter
clusterServer.unicastClusterEvent(
srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC,
strEvent);
+ // Find the ip and port of thrift registered by the remote interpreter
process
+ // through the cluster metadata
HashMap<String, Object> intpMeta = clusterServer
.getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
- int retryGetMeta = connectTimeout / CHECK_META_INTERVAL;
- while ((retryGetMeta-- > 0) &
- (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
+
+ int MAX_RETRY_GET_META = connectTimeout /
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+ int retryGetMeta = 0;
+ while ((retryGetMeta++ < MAX_RETRY_GET_META)
+ && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+ || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
try {
Thread.sleep(CHECK_META_INTERVAL);
intpMeta = clusterServer
@@ -126,11 +135,9 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
// Check if the remote creation process is successful
if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
|| !intpMeta.containsKey(INTP_TSERVER_PORT)) {
- LOGGER.error("Creating process {} failed on remote server {}:{}",
+ String errorInfo = String.format("Creating process %s failed on
remote server %s:%d",
intpGroupId, srvHost, srvPort);
-
- // launch interpreter on local
- return super.launch(context);
+ throw new IOException(errorInfo);
} else {
// connnect remote interpreter process
String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
@@ -151,29 +158,38 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
LOGGER.debug(msg);
}
- Gson gson = new Gson();
- Map<String, Object> mapEvent = gson.fromJson(msg,
- new TypeToken<Map<String, Object>>(){}.getType());
- String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
- ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
-
- switch (clusterEvent) {
- case CREATE_INTP_PROCESS:
- onCreateIntpProcess(mapEvent);
- break;
- default:
- LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
- break;
+ try {
+ Gson gson = new Gson();
+ Map<String, Object> mapEvent = gson.fromJson(msg,
+ new TypeToken<Map<String, Object>>(){}.getType());
+ String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
+ ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
+
+ switch (clusterEvent) {
+ case CREATE_INTP_PROCESS:
+ // 1)Other zeppelin servers in the cluster send requests to create
an interpreter process
+ // 2)After the interpreter process is created, and the interpreter
is started,
+ // the interpreter registers the thrift ip and port into the
cluster metadata.
+ // 3)Other servers connect through the IP and port of thrift in the
cluster metadata,
+ // using this remote interpreter process
+ String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+ InterpreterLaunchContext context = gson.fromJson(
+ eventMsg, new TypeToken<InterpreterLaunchContext>()
{}.getType());
+ ClusterInterpreterProcess clusterInterpreterProcess =
createInterpreterProcess(context);
+ clusterInterpreterProcess.start(context.getUserName());
+ break;
+ default:
+ LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
+ break;
+ }
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
}
}
- private void onCreateIntpProcess(Map<String, Object> mapEvent) {
- String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+ private ClusterInterpreterProcess
createInterpreterProcess(InterpreterLaunchContext context) {
+ ClusterInterpreterProcess clusterInterpreterProcess = null;
try {
- Gson gson = new Gson();
- InterpreterLaunchContext context = gson.fromJson(
- eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType());
-
this.properties = context.getProperties();
InterpreterOption option = context.getOption();
InterpreterRunner runner = context.getRunner();
@@ -183,8 +199,7 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
+ context.getInterpreterSettingId();
- ClusterInterpreterProcess clusterInterpreterProcess
- = new ClusterInterpreterProcess(
+ clusterInterpreterProcess = new ClusterInterpreterProcess(
runner != null ? runner.getPath() :
zConf.getInterpreterRemoteRunnerPath(),
context.getZeppelinServerRPCPort(),
context.getZeppelinServerHost(),
@@ -196,10 +211,10 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
intpSetName,
context.getInterpreterGroupId(),
option.isUserImpersonate());
-
- clusterInterpreterProcess.start(context.getUserName());
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
}
+
+ return clusterInterpreterProcess;
}
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index 986c2ed..8f0fcc7 100644
---
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -1,28 +1,12 @@
package org.apache.zeppelin.interpreter.launcher;
import java.io.IOException;
-import java.util.HashMap;
import java.util.Map;
-import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
-
public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess
{
- private static final Logger LOGGER
- = LoggerFactory.getLogger(ClusterInterpreterProcess.class);
-
- private String intpHost = "";
- private int intpPort = 0;
-
- private ClusterManagerServer clusterServer =
ClusterManagerServer.getInstance();
public ClusterInterpreterProcess(
String intpRunner,
@@ -52,31 +36,13 @@ public class ClusterInterpreterProcess extends
RemoteInterpreterManagedProcess {
@Override
public void start(String userName) throws IOException {
- CheckIntpRunStatusThread checkIntpRunStatusThread = new
CheckIntpRunStatusThread(this);
- checkIntpRunStatusThread.start();
+ ClusterInterpreterCheckThread interpreterCheckThread = new
ClusterInterpreterCheckThread(this);
+ interpreterCheckThread.start();
super.start(userName);
}
@Override
- public void processStarted(int port, String host) {
- // Cluster mode, discovering interpreter processes through metadata
registration
- this.intpHost = host;
- this.intpPort = port;
- super.processStarted(port, host);
- }
-
- @Override
- public String getHost() {
- return intpHost;
- }
-
- @Override
- public int getPort() {
- return intpPort;
- }
-
- @Override
public boolean isRunning() {
if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(),
getPort())) {
return true;
@@ -88,54 +54,4 @@ public class ClusterInterpreterProcess extends
RemoteInterpreterManagedProcess {
public String getErrorMessage() {
return null;
}
-
- // Metadata registered in the cluster by the interpreter process,
- // Keep the interpreter process started
- private class CheckIntpRunStatusThread extends Thread {
- private ClusterInterpreterProcess intpProcess;
-
- CheckIntpRunStatusThread(ClusterInterpreterProcess intpProcess) {
- this.intpProcess = intpProcess;
- }
-
- @Override
- public void run() {
- LOGGER.info("checkIntpRunStatusThread run() >>>");
-
- String intpGroupId = getInterpreterGroupId();
-
- HashMap<String, Object> intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
- int connectTimeout = intpProcess.getConnectTimeout();
- int retryGetMeta = connectTimeout /
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
- while ((retryGetMeta-- > 0)
- && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
- try {
- Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
- intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
- LOGGER.info("retry {} times to get {} meta!", retryGetMeta,
intpGroupId);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
- && intpMeta.containsKey(INTP_TSERVER_PORT)) {
- String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
- int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
- LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
-
- intpProcess.processStarted(intpPort, intpHost);
- }
- }
-
- if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
- LOGGER.error("Can not found interpreter meta!");
- }
-
- LOGGER.info("checkIntpRunStatusThread run() <<<");
- }
- }
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index da4bf5e..2b2b172 100644
---
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -18,7 +18,6 @@ package org.apache.zeppelin.interpreter.launcher;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
import org.junit.AfterClass;
import org.junit.Before;
@@ -51,8 +50,8 @@ public class ClusterInterpreterLauncherTest extends
ClusterMockTest {
}
@Test
- public void testConnectExistIntpProcess() throws IOException {
- mockIntpProcessMeta("intpGroupId");
+ public void testConnectExistOnlineIntpProcess() throws IOException {
+ mockIntpProcessMeta("intpGroupId", true);
ClusterInterpreterLauncher launcher
= new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
@@ -75,7 +74,9 @@ public class ClusterInterpreterLauncherTest extends
ClusterMockTest {
}
@Test
- public void testCreateIntpProcess() throws IOException {
+ public void testConnectExistOfflineIntpProcess() throws IOException {
+ mockIntpProcessMeta("intpGroupId2", false);
+
ClusterInterpreterLauncher launcher
= new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
Properties properties = new Properties();
@@ -84,12 +85,12 @@ public class ClusterInterpreterLauncherTest extends
ClusterMockTest {
InterpreterOption option = new InterpreterOption();
option.setUserImpersonate(true);
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null,
- "user1", "intpGroupId", "groupId",
+ "user1", "intpGroupId2", "groupId",
"groupName", "name", 0, "host");
InterpreterClient client = launcher.launch(context);
- assertTrue(client instanceof RemoteInterpreterManagedProcess);
- RemoteInterpreterManagedProcess interpreterProcess =
(RemoteInterpreterManagedProcess) client;
+ assertTrue(client instanceof ClusterInterpreterProcess);
+ ClusterInterpreterProcess interpreterProcess = (ClusterInterpreterProcess)
client;
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(".//interpreter/groupName",
interpreterProcess.getInterpreterDir());
assertEquals(".//local-repo/groupId",
interpreterProcess.getLocalRepoDir());
diff --git
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index 72b51b4..a7b6dee 100644
---
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.HashMap;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -114,7 +116,7 @@ public class ClusterMockTest {
LOGGER.info("serverMeta <<<");
}
- public void mockIntpProcessMeta(String metaKey) {
+ public void mockIntpProcessMeta(String metaKey, boolean online) {
// mock IntpProcess Meta
HashMap<String, Object> meta = new HashMap<>();
meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
@@ -125,7 +127,11 @@ public class ClusterMockTest {
meta.put(ClusterMeta.CPU_USED, "CPU_USED");
meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
-
+ if (online) {
+ meta.put(ClusterMeta.STATUS, ONLINE_STATUS);
+ } else {
+ meta.put(ClusterMeta.STATUS, OFFLINE_STATUS);
+ }
// put IntpProcess Meta
clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey,
meta);
@@ -137,6 +143,6 @@ public class ClusterMockTest {
assertNotNull(check);
assertNotNull(check.get(metaKey));
- assertEquals(true, check.get(metaKey).size() == 8);
+ assertEquals(true, check.get(metaKey).size() == 9);
}
}
diff --git
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 10ee180..04b2b71 100644
---
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -41,8 +41,10 @@ import org.apache.zeppelin.helium.HeliumApplicationFactory;
import org.apache.zeppelin.helium.HeliumBundleFactory;
import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
import org.apache.zeppelin.notebook.NoteEventListener;
import org.apache.zeppelin.notebook.Notebook;
@@ -60,6 +62,7 @@ import org.apache.zeppelin.service.*;
import org.apache.zeppelin.service.AuthenticationService;
import org.apache.zeppelin.socket.NotebookServer;
import org.apache.zeppelin.user.Credentials;
+import org.apache.zeppelin.util.ReflectionUtils;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.jmx.ConnectorServer;
import org.eclipse.jetty.jmx.MBeanContainer;
@@ -358,6 +361,22 @@ public class ZeppelinServer extends ResourceConfig {
AuthorizationService authorizationService =
serviceLocator.getService(AuthorizationService.class);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC,
notebookServer);
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC,
authorizationService);
+
+ // Since the ClusterInterpreterLauncher is lazy, dynamically generated,
So in cluster mode,
+ // when the zeppelin service starts, Create a ClusterInterpreterLauncher
object,
+ // This allows the ClusterInterpreterLauncher to listen for cluster
events.
+ try {
+ InterpreterSettingManager intpSettingManager =
sharedServiceLocator.getService(InterpreterSettingManager.class);
+ RecoveryStorage recoveryStorage = ReflectionUtils.createClazzInstance(
+ conf.getRecoveryStorageClass(),
+ new Class[] {ZeppelinConfiguration.class,
InterpreterSettingManager.class},
+ new Object[] {conf, intpSettingManager});
+ recoveryStorage.init();
+
PluginManager.get().loadInterpreterLauncher(InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME,
recoveryStorage);
+ } catch (IOException e) {
+ LOG.error(e.getMessage(), e);
+ }
+
clusterManagerServer.start();
}
}
diff --git
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index fb15e74..0afd7b0 100644
---
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -131,6 +131,8 @@ public class InterpreterSetting {
private transient LifecycleManager lifecycleManager;
private transient RecoveryStorage recoveryStorage;
private transient RemoteInterpreterEventServer interpreterEventServer;
+
+ public static final String CLUSTER_INTERPRETER_LAUNCHER_NAME =
"ClusterInterpreterLauncher";
///////////////////////////////////////////////////////////////////////////////////////////
/**
@@ -671,7 +673,7 @@ public class InterpreterSetting {
if (isRunningOnKubernetes()) {
return "K8sStandardInterpreterLauncher";
} else if (isRunningOnCluster()) {
- return "ClusterInterpreterLauncher";
+ return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME;
} if (isRunningOnDocker()) {
return "DockerInterpreterLauncher";
} else {