This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 57f6632 [ZEPPELIN-4281] Fixed unusable after cluster mode restarts
interpreter
57f6632 is described below
commit 57f663297cb4d6c39e594b3fb06604196bcb3af0
Author: Xun Liu <[email protected]>
AuthorDate: Sat Jul 27 09:57:24 2019 +0800
[ZEPPELIN-4281] Fixed unusable after cluster mode restarts interpreter
### What is this PR for?
In cluster mode,
After restarting the interpreter,
The cluster metadata sets the process state of the interpreter to offline.
The interpreter could not be executed again due to a judgment error.
### What type of PR is it?
Bug Fix
### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-4281
### How should this be tested?
[CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/577750017) And
completed 4 test cases
1. ClusterInterpreterLauncherTest::testConnectExistOnlineIntpProcess()
2. ClusterInterpreterLauncherTest::testConnectExistOfflineIntpProcess()
3. ClusterInterpreterLauncherTest::testCreateIntpProcessDockerMode()
4. ClusterInterpreterLauncherTest::testCreateIntpProcessLocalMode()
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update?
* Is there breaking changes for older versions?
* Does this needs documentation?
Author: Xun Liu <[email protected]>
Closes #3434 from liuxunorg/ZEPPELIN-4281 and squashes the following
commits:
4909e32d9 [Xun Liu] [ZEPPELIN-4281] Fixed unusable after cluster mode
restarts interpreter
---
.../apache/zeppelin/cluster/ClusterCallback.java | 26 +++
.../apache/zeppelin/cluster/ClusterManager.java | 63 +++++++
.../zeppelin/cluster/ClusterManagerServer.java | 5 +-
.../remote/RemoteInterpreterServer.java | 22 ++-
.../launcher/ClusterInterpreterCheckThread.java | 65 +++----
.../launcher/ClusterInterpreterLauncher.java | 199 +++++++++++----------
.../launcher/ClusterInterpreterLauncherTest.java | 7 +-
.../interpreter/launcher/ClusterMockTest.java | 26 ++-
.../launcher/DockerInterpreterProcess.java | 3 +
9 files changed, 274 insertions(+), 142 deletions(-)
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
new file mode 100644
index 0000000..fa7899d
--- /dev/null
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.cluster;
+
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+
+public interface ClusterCallback<T> {
+ InterpreterClient online(T result);
+
+ void offline();
+}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 6389b7c..02135b4 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -86,6 +86,7 @@ import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,9 +113,14 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import static io.atomix.primitive.operation.PrimitiveOperation.operation;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
import static
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
import static
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
import static
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
+import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
/**
* The base class for cluster management, including the following
implementations
@@ -432,6 +438,63 @@ public abstract class ClusterManager {
return clusterMeta;
}
+ public InterpreterClient getIntpProcessStatus(String intpName,
+ int timeout,
+
ClusterCallback<HashMap<String, Object>> callback) {
+ final int CHECK_META_INTERVAL = 1000;
+ int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL;
+ int retryGetMeta = 0;
+ while (retryGetMeta++ < MAX_RETRY_GET_META) {
+ HashMap<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META,
intpName).get(intpName);
+ if (interpreterMetaOnline(intpMeta)) {
+ // connect exist Interpreter Process
+ String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+ int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+ LOGGER.info("interpreter thrift {}:{} service is online!",
intpTSrvHost, intpTSrvPort);
+
+ // Check if the interpreter thrift service is available
+ boolean remoteIntpAccessible =
+
RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(intpTSrvHost,
intpTSrvPort);
+ if (remoteIntpAccessible) {
+ LOGGER.info("interpreter thrift {}:{} accessible!", intpTSrvHost,
intpTSrvPort);
+ return callback.online(intpMeta);
+ } else {
+ LOGGER.error("interpreter thrift {}:{} service is not available!",
+ intpTSrvHost, intpTSrvPort);
+ try {
+ Thread.sleep(CHECK_META_INTERVAL);
+ LOGGER.warn("retry {} times to get {} meta!", retryGetMeta,
intpName);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ } else {
+ try {
+ Thread.sleep(CHECK_META_INTERVAL);
+ } catch (InterruptedException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ }
+ }
+
+ LOGGER.error("retry {} times not get {} meta!", retryGetMeta, intpName);
+ callback.offline();
+ return null;
+ }
+
+ // Check if the interpreter is online
+ private boolean interpreterMetaOnline(HashMap<String, Object> intpProcMeta) {
+ if (null != intpProcMeta
+ && intpProcMeta.containsKey(INTP_TSERVER_HOST)
+ && intpProcMeta.containsKey(INTP_TSERVER_PORT)
+ && intpProcMeta.containsKey(STATUS)
+ && StringUtils.equals((String) intpProcMeta.get(STATUS),
ONLINE_STATUS)) {
+ return true;
+ }
+
+ return false;
+ }
+
protected static final Serializer protocolSerializer =
Serializer.using(Namespace.builder()
.register(OpenSessionRequest.class)
.register(OpenSessionResponse.class)
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 4cd370d..869188a 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -285,7 +285,8 @@ public class ClusterManagerServer extends ClusterManager {
}
public void unicastClusterEvent(String host, int port, String topic, String
msg) {
- LOGGER.info("send unicastClusterEvent message {}", msg);
+ LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}",
+ host, port, topic, msg);
Address address = Address.from(host, port);
CompletableFuture<byte[]> response =
messagingService.sendAndReceive(address,
@@ -293,8 +294,6 @@ public class ClusterManagerServer extends ClusterManager {
response.whenComplete((r, e) -> {
if (null == e) {
LOGGER.error(e.getMessage(), e);
- } else {
- LOGGER.info("unicastClusterEvent success! {}", msg);
}
});
}
diff --git
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d7e946b..a143cd3 100644
---
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -31,7 +31,6 @@ import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.cluster.ClusterManagerClient;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
-import org.apache.zeppelin.cluster.meta.ClusterMetaType;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.dep.DependencyResolver;
import org.apache.zeppelin.display.AngularObject;
@@ -100,6 +99,8 @@ import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
/**
* Entry point for Interpreter process.
* Accepting thrift connections from ZeppelinServer.
@@ -238,6 +239,9 @@ public class RemoteInterpreterServer extends Thread
@Override
public void shutdown() throws TException {
logger.info("Shutting down...");
+ // delete interpreter cluster meta
+ deleteClusterMeta();
+
if (interpreterGroup != null) {
synchronized (interpreterGroup) {
for (List<Interpreter> session : interpreterGroup.values()) {
@@ -340,7 +344,21 @@ public class RemoteInterpreterServer extends Thread
meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
- clusterManagerClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META,
interpreterGroupId, meta);
+ clusterManagerClient.putClusterMeta(INTP_PROCESS_META, interpreterGroupId,
meta);
+ }
+
+ private void deleteClusterMeta() {
+ if (!zconf.isClusterMode()){
+ return;
+ }
+
+ try {
+ // delete interpreter cluster meta
+ clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META,
interpreterGroupId);
+ Thread.sleep(300);
+ } catch (InterruptedException e) {
+ logger.error(e.getMessage(), e);
+ }
}
@Override
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
index a389135..099bad7 100644
---
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -17,6 +17,7 @@
package org.apache.zeppelin.interpreter.launcher;
+import org.apache.zeppelin.cluster.ClusterCallback;
import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,7 +26,6 @@ import java.util.HashMap;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
// Metadata registered in the cluster by the interpreter process,
// Keep the interpreter process started
@@ -51,45 +51,30 @@ public class ClusterInterpreterCheckThread extends Thread {
ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
- HashMap<String, Object> intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-
- int MAX_RETRY_GET_META = connectTimeout /
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
- int retryGetMeta = 0;
- while ((retryGetMeta++ < MAX_RETRY_GET_META)
- && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
- try {
- Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
- intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
- LOGGER.info("retry {} times to get {} meta!", retryGetMeta,
intpGroupId);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
- }
-
- if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
- && intpMeta.containsKey(INTP_TSERVER_PORT)) {
- String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
- int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
- LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
-
- if (intpProcess instanceof DockerInterpreterProcess) {
- ((DockerInterpreterProcess) intpProcess).processStarted(intpPort,
intpHost);
- } else if (intpProcess instanceof ClusterInterpreterProcess) {
- ((ClusterInterpreterProcess) intpProcess).processStarted(intpPort,
intpHost);
- } else {
- LOGGER.error("Unknown type !");
- }
-
- break;
- }
- }
-
- if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
- LOGGER.error("Can not found interpreter meta!");
- }
+ clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
+ new ClusterCallback<HashMap<String, Object>>() {
+ @Override
+ public InterpreterClient online(HashMap<String, Object> result) {
+ String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST);
+ int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT);
+ LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost,
intpTSrvPort);
+
+ if (intpProcess instanceof DockerInterpreterProcess) {
+ ((DockerInterpreterProcess)
intpProcess).processStarted(intpTSrvPort, intpTSrvHost);
+ } else if (intpProcess instanceof ClusterInterpreterProcess) {
+ ((ClusterInterpreterProcess)
intpProcess).processStarted(intpTSrvPort, intpTSrvHost);
+ } else {
+ LOGGER.error("Unknown type !");
+ }
+
+ return null;
+ }
+
+ @Override
+ public void offline() {
+ LOGGER.error("Can not found cluster interpreter!");
+ }
+ });
LOGGER.info("ClusterInterpreterCheckThread run() <<<");
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 14bbce2..31d7184 100644
---
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
-import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.ClusterCallback;
import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.cluster.event.ClusterEvent;
import org.apache.zeppelin.cluster.event.ClusterEventListener;
@@ -40,11 +40,8 @@ import java.util.Map;
import static
org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
-import static
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
/**
* Interpreter Launcher which use cluster to launch the interpreter process.
@@ -53,7 +50,6 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
implements ClusterEventListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterInterpreterLauncher.class);
- public static final int CHECK_META_INTERVAL = 2000; // ms
private InterpreterLaunchContext context;
private ClusterManagerServer clusterServer =
ClusterManagerServer.getInstance();
@@ -72,35 +68,59 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
int connectTimeout = getConnectTimeout();
String intpGroupId = context.getInterpreterGroupId();
- HashMap<String, Object> intpProcMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
- if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
- && intpProcMeta.containsKey(INTP_TSERVER_PORT) &&
intpProcMeta.containsKey(STATUS)
- && StringUtils.equals((String) intpProcMeta.get(STATUS),
ONLINE_STATUS)) {
- // connect exist Interpreter Process
- String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
- int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
- return new RemoteInterpreterRunningProcess(
- context.getInterpreterSettingName(),
- connectTimeout,
- intpTserverHost,
- intpTserverPort);
- } else {
- // No process was found for the InterpreterGroup ID
- HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
- if (null == meta) {
- LOGGER.error("Don't get idle node meta, launch interpreter on local.");
- return createInterpreterProcess(context);
- }
+ // connect exist Interpreter Process
+ InterpreterClient intpClient = clusterServer.getIntpProcessStatus(
+ intpGroupId, 3000, new ClusterCallback<HashMap<String, Object>>() {
+ @Override
+ public InterpreterClient online(HashMap<String, Object> result) {
+ String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
+ int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
- String srvHost = (String) meta.get(SERVER_HOST);
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ intpTserverHost,
+ intpTserverPort);
+ }
+
+ @Override
+ public void offline() {
+ LOGGER.info("interpreter {} is not exist!", intpGroupId);
+ }
+ });
+ if (null != intpClient) {
+ return intpClient;
+ }
+
+ // No process was found for the InterpreterGroup ID
+ String srvHost = null;
+ int srvPort = 0;
+ HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+ if (null == meta) {
+ LOGGER.error("Don't get idle node meta, launch interpreter on local.");
+ InterpreterClient clusterIntpProcess = createInterpreterProcess(context);
+ try {
+ clusterIntpProcess.start(context.getUserName());
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ return clusterIntpProcess;
+ }
+ } else {
+ srvHost = (String) meta.get(SERVER_HOST);
String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
if (localhost.equalsIgnoreCase(srvHost)) {
- // launch interpreter on local
- return createInterpreterProcess(context);
+ LOGGER.info("launch interpreter on local");
+ InterpreterClient clusterIntpProcess =
createInterpreterProcess(context);
+ try {
+ clusterIntpProcess.start(context.getUserName());
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ return clusterIntpProcess;
+ }
} else {
- int srvPort = (int) meta.get(SERVER_PORT);
+ // launch interpreter on cluster
+ srvPort = (int) meta.get(SERVER_PORT);
Gson gson = new Gson();
String sContext = gson.toJson(context);
@@ -112,44 +132,41 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
// Notify other server in the cluster that the resource is idle to
create an interpreter
clusterServer.unicastClusterEvent(
srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC,
strEvent);
+ }
+ }
- // Find the ip and port of thrift registered by the remote interpreter
process
- // through the cluster metadata
- HashMap<String, Object> intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-
- int MAX_RETRY_GET_META = connectTimeout /
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
- int retryGetMeta = 0;
- while ((retryGetMeta++ < MAX_RETRY_GET_META)
- && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
- try {
- Thread.sleep(CHECK_META_INTERVAL);
- intpMeta = clusterServer
- .getClusterMeta(INTP_PROCESS_META,
intpGroupId).get(intpGroupId);
- LOGGER.warn("retry {} times to get {} meta!", retryGetMeta,
intpGroupId);
- } catch (InterruptedException e) {
- LOGGER.error(e.getMessage(), e);
+ // Find the ip and port of thrift registered by the remote interpreter
process
+ // through the cluster metadata
+ String finalSrvHost = srvHost;
+ int finalSrvPort = srvPort;
+ intpClient = clusterServer.getIntpProcessStatus(intpGroupId,
connectTimeout,
+ new ClusterCallback<HashMap<String, Object>>() {
+ @Override
+ public InterpreterClient online(HashMap<String, Object> result) {
+ // connect exist Interpreter Process
+ String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
+ int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
+
+ return new RemoteInterpreterRunningProcess(
+ context.getInterpreterSettingName(),
+ connectTimeout,
+ intpTserverHost,
+ intpTserverPort);
}
- }
- // Check if the remote creation process is successful
- if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
- || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
- String errorInfo = String.format("Creating process %s failed on
remote server %s:%d",
- intpGroupId, srvHost, srvPort);
- throw new IOException(errorInfo);
- } else {
- // connnect remote interpreter process
- String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
- int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
- return new RemoteInterpreterRunningProcess(
- context.getInterpreterSettingName(),
- connectTimeout,
- intpTSrvHost,
- intpTSrvPort);
- }
- }
+ @Override
+ public void offline() {
+ String errorInfo = String.format("Creating process %s failed on
remote server %s:%d",
+ intpGroupId, finalSrvHost, finalSrvPort);
+ LOGGER.error(errorInfo);
+ }
+ });
+ if (null == intpClient) {
+ String errorInfo = String.format("Creating process %s failed on remote
server %s:%d",
+ intpGroupId, srvHost, srvPort);
+ throw new IOException(errorInfo);
+ } else {
+ return intpClient;
}
}
@@ -162,7 +179,8 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
try {
Gson gson = new Gson();
Map<String, Object> mapEvent = gson.fromJson(msg,
- new TypeToken<Map<String, Object>>(){}.getType());
+ new TypeToken<Map<String, Object>>() {
+ }.getType());
String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
@@ -175,9 +193,10 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
// using this remote interpreter process
String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
InterpreterLaunchContext context = gson.fromJson(
- eventMsg, new TypeToken<InterpreterLaunchContext>()
{}.getType());
- InterpreterClient clusterOrDockerIntpProcess =
createInterpreterProcess(context);
- clusterOrDockerIntpProcess.start(context.getUserName());
+ eventMsg, new TypeToken<InterpreterLaunchContext>() {
+ }.getType());
+ InterpreterClient intpProcess = createInterpreterProcess(context);
+ intpProcess.start(context.getUserName());
break;
default:
LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
@@ -188,6 +207,28 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
}
}
+ private InterpreterClient createInterpreterProcess(InterpreterLaunchContext
context)
+ throws IOException {
+ this.context = context;
+ this.properties = context.getProperties();
+
+ InterpreterClient intpProcess = null;
+ if (isRunningOnDocker(zConf)) {
+ DockerInterpreterLauncher dockerIntpLauncher = new
DockerInterpreterLauncher(zConf, null);
+ dockerIntpLauncher.setProperties(context.getProperties());
+ intpProcess = dockerIntpLauncher.launch(context);
+ } else {
+ intpProcess = createClusterIntpProcess();
+ }
+
+ // must first step start check interpreter thread
+ ClusterInterpreterCheckThread intpCheckThread = new
ClusterInterpreterCheckThread(
+ intpProcess, context.getInterpreterGroupId(), getConnectTimeout());
+ intpCheckThread.start();
+
+ return intpProcess;
+ }
+
private RemoteInterpreterProcess createClusterIntpProcess() {
ClusterInterpreterProcess clusterIntpProcess = null;
try {
@@ -218,28 +259,6 @@ public class ClusterInterpreterLauncher extends
StandardInterpreterLauncher
return clusterIntpProcess;
}
- private InterpreterClient createInterpreterProcess(InterpreterLaunchContext
context)
- throws IOException {
- this.context = context;
- this.properties = context.getProperties();
- int connectTimeout = getConnectTimeout();
-
- InterpreterClient remoteIntpProcess = null;
- if (isRunningOnDocker(zConf)) {
- DockerInterpreterLauncher dockerIntpLauncher = new
DockerInterpreterLauncher(zConf, null);
- dockerIntpLauncher.setProperties(context.getProperties());
- remoteIntpProcess = dockerIntpLauncher.launch(context);
- } else {
- remoteIntpProcess = createClusterIntpProcess();
- }
-
- ClusterInterpreterCheckThread intpCheckThread = new
ClusterInterpreterCheckThread(
- remoteIntpProcess, context.getInterpreterGroupId(), connectTimeout);
- intpCheckThread.start();
-
- return remoteIntpProcess;
- }
-
private boolean isRunningOnDocker(ZeppelinConfiguration zconf) {
return zconf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER;
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index 7d83f07..bd8bfbf 100644
---
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -23,6 +23,8 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Properties;
@@ -31,6 +33,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class ClusterInterpreterLauncherTest extends ClusterMockTest {
+ private static Logger LOGGER =
LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
@BeforeClass
public static void startTest() throws IOException, InterruptedException {
@@ -63,12 +66,12 @@ public class ClusterInterpreterLauncherTest extends
ClusterMockTest {
InterpreterLaunchContext context = new
InterpreterLaunchContext(properties, option, null,
"user1", "intpGroupId", "groupId",
"groupName", "name", 0, "host");
+
InterpreterClient client = launcher.launch(context);
assertTrue(client instanceof RemoteInterpreterRunningProcess);
RemoteInterpreterRunningProcess interpreterProcess =
(RemoteInterpreterRunningProcess) client;
- assertEquals("INTP_TSERVER_HOST", interpreterProcess.getHost());
- assertEquals(0, interpreterProcess.getPort());
+ assertEquals("127.0.0.1", interpreterProcess.getHost());
assertEquals("name", interpreterProcess.getInterpreterSettingName());
assertEquals(5000, interpreterProcess.getConnectTimeout());
}
diff --git
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index a7b6dee..285103f 100644
---
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.zeppelin.interpreter.launcher;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
import org.apache.zeppelin.cluster.ClusterManagerClient;
import org.apache.zeppelin.cluster.ClusterManagerServer;
import org.apache.zeppelin.cluster.meta.ClusterMeta;
@@ -26,6 +28,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.time.LocalDateTime;
import java.util.HashMap;
import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
@@ -45,6 +48,8 @@ public class ClusterMockTest {
static int zServerPort;
static final String metaKey = "ClusterMockKey";
+ static TServerSocket tSocket = null;
+
public static void startCluster() throws IOException, InterruptedException {
LOGGER.info("startCluster >>>");
@@ -80,6 +85,12 @@ public class ClusterMockTest {
}
assertEquals(true, clusterServer.isClusterLeader());
+ try {
+ tSocket = new TServerSocket(0);
+ } catch (TTransportException e) {
+ throw new IOException("Fail to create TServerSocket", e);
+ }
+
LOGGER.info("startCluster <<<");
}
@@ -91,6 +102,9 @@ public class ClusterMockTest {
if (null != clusterClient) {
clusterServer.shutdown();
}
+
+ tSocket.close();
+
LOGGER.info("stopCluster <<<");
}
@@ -119,14 +133,16 @@ public class ClusterMockTest {
public void mockIntpProcessMeta(String metaKey, boolean online) {
// mock IntpProcess Meta
HashMap<String, Object> meta = new HashMap<>();
- meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
- meta.put(ClusterMeta.SERVER_PORT, 0);
- meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
- meta.put(ClusterMeta.INTP_TSERVER_PORT, 0);
+ meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1");
+ meta.put(ClusterMeta.SERVER_PORT, 6000);
+ meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1");
+ meta.put(ClusterMeta.INTP_TSERVER_PORT,
tSocket.getServerSocket().getLocalPort());
meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
meta.put(ClusterMeta.CPU_USED, "CPU_USED");
meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
+ meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+
if (online) {
meta.put(ClusterMeta.STATUS, ONLINE_STATUS);
} else {
@@ -143,6 +159,6 @@ public class ClusterMockTest {
assertNotNull(check);
assertNotNull(check.get(metaKey));
- assertEquals(true, check.get(metaKey).size() == 9);
+ assertEquals(true, check.get(metaKey).size() == 10);
}
}
diff --git
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 2af0de9..17bb093 100644
---
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -223,8 +223,10 @@ public class DockerInterpreterProcess extends
RemoteInterpreterProcess {
execInContainer(containerId, dockerCommand, false);
} catch (DockerException e) {
LOGGER.error(e.getMessage(), e);
+ throw new IOException(e.getMessage());
} catch (InterruptedException e) {
LOGGER.error(e.getMessage(), e);
+ throw new IOException(e.getMessage());
}
long startTime = System.currentTimeMillis();
@@ -236,6 +238,7 @@ public class DockerInterpreterProcess extends
RemoteInterpreterProcess {
dockerStarted.wait(getConnectTimeout());
} catch (InterruptedException e) {
LOGGER.error("Remote interpreter is not accessible");
+ throw new IOException(e.getMessage());
}
}
}