This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 57f6632  [ZEPPELIN-4281] Fixed unusable after cluster mode restarts 
interpreter
57f6632 is described below

commit 57f663297cb4d6c39e594b3fb06604196bcb3af0
Author: Xun Liu <[email protected]>
AuthorDate: Sat Jul 27 09:57:24 2019 +0800

    [ZEPPELIN-4281] Fixed unusable after cluster mode restarts interpreter
    
    ### What is this PR for?
    In cluster mode,
    After restarting the interpreter,
    The cluster metadata sets the process state of the interpreter to offline.
    The interpreter could not be executed again due to a judgment error.
    
    ### What type of PR is it?
    Bug Fix
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/ZEPPELIN-4281
    
    ### How should this be tested?
    [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/577750017) And 
completed 4 test cases
    
    1. ClusterInterpreterLauncherTest::testConnectExistOnlineIntpProcess()
    2. ClusterInterpreterLauncherTest::testConnectExistOfflineIntpProcess()
    3. ClusterInterpreterLauncherTest::testCreateIntpProcessDockerMode()
    4. ClusterInterpreterLauncherTest::testCreateIntpProcessLocalMode()
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Xun Liu <[email protected]>
    
    Closes #3434 from liuxunorg/ZEPPELIN-4281 and squashes the following 
commits:
    
    4909e32d9 [Xun Liu] [ZEPPELIN-4281] Fixed unusable after cluster mode 
restarts interpreter
---
 .../apache/zeppelin/cluster/ClusterCallback.java   |  26 +++
 .../apache/zeppelin/cluster/ClusterManager.java    |  63 +++++++
 .../zeppelin/cluster/ClusterManagerServer.java     |   5 +-
 .../remote/RemoteInterpreterServer.java            |  22 ++-
 .../launcher/ClusterInterpreterCheckThread.java    |  65 +++----
 .../launcher/ClusterInterpreterLauncher.java       | 199 +++++++++++----------
 .../launcher/ClusterInterpreterLauncherTest.java   |   7 +-
 .../interpreter/launcher/ClusterMockTest.java      |  26 ++-
 .../launcher/DockerInterpreterProcess.java         |   3 +
 9 files changed, 274 insertions(+), 142 deletions(-)

diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
new file mode 100644
index 0000000..fa7899d
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.cluster;
+
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
+
+public interface ClusterCallback<T> {
+  InterpreterClient online(T result);
+
+  void offline();
+}
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 6389b7c..02135b4 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -86,6 +86,7 @@ import org.apache.zeppelin.cluster.meta.ClusterMetaType;
 import org.apache.zeppelin.cluster.protocol.LocalRaftProtocolFactory;
 import org.apache.zeppelin.cluster.protocol.RaftClientMessagingProtocol;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.launcher.InterpreterClient;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -112,9 +113,14 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeoutException;
 
 import static io.atomix.primitive.operation.PrimitiveOperation.operation;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
 import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.DELETE_OPERATION;
 import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.PUT_OPERATION;
 import static 
org.apache.zeppelin.cluster.meta.ClusterMetaOperation.GET_OPERATION;
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 /**
  * The base class for cluster management, including the following 
implementations
@@ -432,6 +438,63 @@ public abstract class ClusterManager {
     return clusterMeta;
   }
 
+  public InterpreterClient getIntpProcessStatus(String intpName,
+                                                int timeout,
+                                                
ClusterCallback<HashMap<String, Object>> callback) {
+    final int CHECK_META_INTERVAL = 1000;
+    int MAX_RETRY_GET_META = timeout / CHECK_META_INTERVAL;
+    int retryGetMeta = 0;
+    while (retryGetMeta++ < MAX_RETRY_GET_META) {
+      HashMap<String, Object> intpMeta = getClusterMeta(INTP_PROCESS_META, 
intpName).get(intpName);
+      if (interpreterMetaOnline(intpMeta)) {
+        // connect exist Interpreter Process
+        String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+        int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+        LOGGER.info("interpreter thrift {}:{} service is online!", 
intpTSrvHost, intpTSrvPort);
+
+        // Check if the interpreter thrift service is available
+        boolean remoteIntpAccessible =
+            
RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(intpTSrvHost, 
intpTSrvPort);
+        if (remoteIntpAccessible) {
+          LOGGER.info("interpreter thrift {}:{} accessible!", intpTSrvHost, 
intpTSrvPort);
+          return callback.online(intpMeta);
+        } else {
+          LOGGER.error("interpreter thrift {}:{} service is not available!",
+              intpTSrvHost, intpTSrvPort);
+          try {
+            Thread.sleep(CHECK_META_INTERVAL);
+            LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, 
intpName);
+          } catch (InterruptedException e) {
+            LOGGER.error(e.getMessage(), e);
+          }
+        }
+      } else {
+        try {
+          Thread.sleep(CHECK_META_INTERVAL);
+        } catch (InterruptedException e) {
+          LOGGER.error(e.getMessage(), e);
+        }
+      }
+    }
+
+    LOGGER.error("retry {} times not get {} meta!", retryGetMeta, intpName);
+    callback.offline();
+    return null;
+  }
+
+  // Check if the interpreter is online
+  private boolean interpreterMetaOnline(HashMap<String, Object> intpProcMeta) {
+    if (null != intpProcMeta
+        && intpProcMeta.containsKey(INTP_TSERVER_HOST)
+        && intpProcMeta.containsKey(INTP_TSERVER_PORT)
+        && intpProcMeta.containsKey(STATUS)
+        && StringUtils.equals((String) intpProcMeta.get(STATUS), 
ONLINE_STATUS)) {
+      return true;
+    }
+
+    return false;
+  }
+
   protected static final Serializer protocolSerializer = 
Serializer.using(Namespace.builder()
       .register(OpenSessionRequest.class)
       .register(OpenSessionResponse.class)
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
index 4cd370d..869188a 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -285,7 +285,8 @@ public class ClusterManagerServer extends ClusterManager {
   }
 
   public void unicastClusterEvent(String host, int port, String topic, String 
msg) {
-    LOGGER.info("send unicastClusterEvent message {}", msg);
+    LOGGER.info("send unicastClusterEvent host:{} port:{} topic:{} message:{}",
+        host, port, topic, msg);
 
     Address address = Address.from(host, port);
     CompletableFuture<byte[]> response = 
messagingService.sendAndReceive(address,
@@ -293,8 +294,6 @@ public class ClusterManagerServer extends ClusterManager {
     response.whenComplete((r, e) -> {
       if (null == e) {
         LOGGER.error(e.getMessage(), e);
-      } else {
-        LOGGER.info("unicastClusterEvent success! {}", msg);
       }
     });
   }
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
index d7e946b..a143cd3 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java
@@ -31,7 +31,6 @@ import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.cluster.ClusterManagerClient;
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
-import org.apache.zeppelin.cluster.meta.ClusterMetaType;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObject;
@@ -100,6 +99,8 @@ import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
 /**
  * Entry point for Interpreter process.
  * Accepting thrift connections from ZeppelinServer.
@@ -238,6 +239,9 @@ public class RemoteInterpreterServer extends Thread
   @Override
   public void shutdown() throws TException {
     logger.info("Shutting down...");
+    // delete interpreter cluster meta
+    deleteClusterMeta();
+
     if (interpreterGroup != null) {
       synchronized (interpreterGroup) {
         for (List<Interpreter> session : interpreterGroup.values()) {
@@ -340,7 +344,21 @@ public class RemoteInterpreterServer extends Thread
     meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
     meta.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
 
-    clusterManagerClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, 
interpreterGroupId, meta);
+    clusterManagerClient.putClusterMeta(INTP_PROCESS_META, interpreterGroupId, 
meta);
+  }
+
+  private void deleteClusterMeta() {
+    if (!zconf.isClusterMode()){
+      return;
+    }
+
+    try {
+      // delete interpreter cluster meta
+      clusterManagerClient.deleteClusterMeta(INTP_PROCESS_META, 
interpreterGroupId);
+      Thread.sleep(300);
+    } catch (InterruptedException e) {
+      logger.error(e.getMessage(), e);
+    }
   }
 
   @Override
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
index a389135..099bad7 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -17,6 +17,7 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.zeppelin.cluster.ClusterCallback;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -25,7 +26,6 @@ import java.util.HashMap;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 // Metadata registered in the cluster by the interpreter process,
 // Keep the interpreter process started
@@ -51,45 +51,30 @@ public class ClusterInterpreterCheckThread extends Thread {
 
     ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
 
-    HashMap<String, Object> intpMeta = clusterServer
-        .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-
-    int MAX_RETRY_GET_META = connectTimeout / 
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
-    int retryGetMeta = 0;
-    while ((retryGetMeta++ < MAX_RETRY_GET_META)
-        && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-        || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
-      try {
-        Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
-        intpMeta = clusterServer
-            .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-        LOGGER.info("retry {} times to get {} meta!", retryGetMeta, 
intpGroupId);
-      } catch (InterruptedException e) {
-        LOGGER.error(e.getMessage(), e);
-      }
-
-      if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
-          && intpMeta.containsKey(INTP_TSERVER_PORT)) {
-        String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
-        int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
-        LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
-
-        if (intpProcess instanceof DockerInterpreterProcess) {
-          ((DockerInterpreterProcess) intpProcess).processStarted(intpPort, 
intpHost);
-        } else if (intpProcess instanceof ClusterInterpreterProcess) {
-          ((ClusterInterpreterProcess) intpProcess).processStarted(intpPort, 
intpHost);
-        } else {
-          LOGGER.error("Unknown type !");
-        }
-
-        break;
-      }
-    }
-
-    if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-        || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
-      LOGGER.error("Can not found interpreter meta!");
-    }
+    clusterServer.getIntpProcessStatus(intpGroupId, connectTimeout,
+        new ClusterCallback<HashMap<String, Object>>() {
+          @Override
+          public InterpreterClient online(HashMap<String, Object> result) {
+            String intpTSrvHost = (String) result.get(INTP_TSERVER_HOST);
+            int intpTSrvPort = (int) result.get(INTP_TSERVER_PORT);
+            LOGGER.info("Found cluster interpreter {}:{}", intpTSrvHost, 
intpTSrvPort);
+
+            if (intpProcess instanceof DockerInterpreterProcess) {
+              ((DockerInterpreterProcess) 
intpProcess).processStarted(intpTSrvPort, intpTSrvHost);
+            } else if (intpProcess instanceof ClusterInterpreterProcess) {
+              ((ClusterInterpreterProcess) 
intpProcess).processStarted(intpTSrvPort, intpTSrvHost);
+            } else {
+              LOGGER.error("Unknown type !");
+            }
+
+            return null;
+          }
+
+          @Override
+          public void offline() {
+            LOGGER.error("Can not found cluster interpreter!");
+          }
+        });
 
     LOGGER.info("ClusterInterpreterCheckThread run() <<<");
   }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 14bbce2..31d7184 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -19,7 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
-import org.apache.commons.lang.StringUtils;
+import org.apache.zeppelin.cluster.ClusterCallback;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.cluster.event.ClusterEvent;
 import org.apache.zeppelin.cluster.event.ClusterEventListener;
@@ -40,11 +40,8 @@ import java.util.Map;
 import static 
org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
-import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 /**
  * Interpreter Launcher which use cluster to launch the interpreter process.
@@ -53,7 +50,6 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     implements ClusterEventListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterInterpreterLauncher.class);
 
-  public static final int CHECK_META_INTERVAL = 2000; // ms
   private InterpreterLaunchContext context;
   private ClusterManagerServer clusterServer = 
ClusterManagerServer.getInstance();
 
@@ -72,35 +68,59 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     int connectTimeout = getConnectTimeout();
     String intpGroupId = context.getInterpreterGroupId();
 
-    HashMap<String, Object> intpProcMeta = clusterServer
-        .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-    if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
-        && intpProcMeta.containsKey(INTP_TSERVER_PORT) && 
intpProcMeta.containsKey(STATUS)
-        && StringUtils.equals((String) intpProcMeta.get(STATUS), 
ONLINE_STATUS)) {
-      // connect exist Interpreter Process
-      String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
-      int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
-      return new RemoteInterpreterRunningProcess(
-          context.getInterpreterSettingName(),
-          connectTimeout,
-          intpTserverHost,
-          intpTserverPort);
-    } else {
-      // No process was found for the InterpreterGroup ID
-      HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
-      if (null == meta) {
-        LOGGER.error("Don't get idle node meta, launch interpreter on local.");
-        return createInterpreterProcess(context);
-      }
+    // connect exist Interpreter Process
+    InterpreterClient intpClient = clusterServer.getIntpProcessStatus(
+        intpGroupId, 3000, new ClusterCallback<HashMap<String, Object>>() {
+          @Override
+          public InterpreterClient online(HashMap<String, Object> result) {
+            String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
+            int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
 
-      String srvHost = (String) meta.get(SERVER_HOST);
+            return new RemoteInterpreterRunningProcess(
+                context.getInterpreterSettingName(),
+                connectTimeout,
+                intpTserverHost,
+                intpTserverPort);
+          }
+
+          @Override
+          public void offline() {
+            LOGGER.info("interpreter {} is not exist!", intpGroupId);
+          }
+        });
+    if (null != intpClient) {
+      return intpClient;
+    }
+
+    // No process was found for the InterpreterGroup ID
+    String srvHost = null;
+    int srvPort = 0;
+    HashMap<String, Object> meta = clusterServer.getIdleNodeMeta();
+    if (null == meta) {
+      LOGGER.error("Don't get idle node meta, launch interpreter on local.");
+      InterpreterClient clusterIntpProcess = createInterpreterProcess(context);
+      try {
+        clusterIntpProcess.start(context.getUserName());
+      } catch (IOException e) {
+        LOGGER.error(e.getMessage(), e);
+        return clusterIntpProcess;
+      }
+    } else {
+      srvHost = (String) meta.get(SERVER_HOST);
       String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
 
       if (localhost.equalsIgnoreCase(srvHost)) {
-        // launch interpreter on local
-        return createInterpreterProcess(context);
+        LOGGER.info("launch interpreter on local");
+        InterpreterClient clusterIntpProcess = 
createInterpreterProcess(context);
+        try {
+          clusterIntpProcess.start(context.getUserName());
+        } catch (IOException e) {
+          LOGGER.error(e.getMessage(), e);
+          return clusterIntpProcess;
+        }
       } else {
-        int srvPort = (int) meta.get(SERVER_PORT);
+        // launch interpreter on cluster
+        srvPort = (int) meta.get(SERVER_PORT);
 
         Gson gson = new Gson();
         String sContext = gson.toJson(context);
@@ -112,44 +132,41 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
         // Notify other server in the cluster that the resource is idle to 
create an interpreter
         clusterServer.unicastClusterEvent(
             srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, 
strEvent);
+      }
+    }
 
-        // Find the ip and port of thrift registered by the remote interpreter 
process
-        // through the cluster metadata
-        HashMap<String, Object> intpMeta = clusterServer
-            .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-
-        int MAX_RETRY_GET_META = connectTimeout / 
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
-        int retryGetMeta = 0;
-        while ((retryGetMeta++ < MAX_RETRY_GET_META)
-            && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-            || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
-          try {
-            Thread.sleep(CHECK_META_INTERVAL);
-            intpMeta = clusterServer
-                .getClusterMeta(INTP_PROCESS_META, 
intpGroupId).get(intpGroupId);
-            LOGGER.warn("retry {} times to get {} meta!", retryGetMeta, 
intpGroupId);
-          } catch (InterruptedException e) {
-            LOGGER.error(e.getMessage(), e);
+    // Find the ip and port of thrift registered by the remote interpreter 
process
+    // through the cluster metadata
+    String finalSrvHost = srvHost;
+    int finalSrvPort = srvPort;
+    intpClient = clusterServer.getIntpProcessStatus(intpGroupId, 
connectTimeout,
+        new ClusterCallback<HashMap<String, Object>>() {
+          @Override
+          public InterpreterClient online(HashMap<String, Object> result) {
+            // connect exist Interpreter Process
+            String intpTserverHost = (String) result.get(INTP_TSERVER_HOST);
+            int intpTserverPort = (int) result.get(INTP_TSERVER_PORT);
+
+            return new RemoteInterpreterRunningProcess(
+                context.getInterpreterSettingName(),
+                connectTimeout,
+                intpTserverHost,
+                intpTserverPort);
           }
-        }
 
-        // Check if the remote creation process is successful
-        if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-            || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
-          String errorInfo = String.format("Creating process %s failed on 
remote server %s:%d",
-              intpGroupId, srvHost, srvPort);
-          throw new IOException(errorInfo);
-        } else {
-          // connnect remote interpreter process
-          String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
-          int intpTSrvPort = (int) intpMeta.get(INTP_TSERVER_PORT);
-          return new RemoteInterpreterRunningProcess(
-              context.getInterpreterSettingName(),
-              connectTimeout,
-              intpTSrvHost,
-              intpTSrvPort);
-        }
-      }
+          @Override
+          public void offline() {
+            String errorInfo = String.format("Creating process %s failed on 
remote server %s:%d",
+                intpGroupId, finalSrvHost, finalSrvPort);
+            LOGGER.error(errorInfo);
+          }
+        });
+    if (null == intpClient) {
+      String errorInfo = String.format("Creating process %s failed on remote 
server %s:%d",
+          intpGroupId, srvHost, srvPort);
+      throw new IOException(errorInfo);
+    } else {
+      return intpClient;
     }
   }
 
@@ -162,7 +179,8 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     try {
       Gson gson = new Gson();
       Map<String, Object> mapEvent = gson.fromJson(msg,
-          new TypeToken<Map<String, Object>>(){}.getType());
+          new TypeToken<Map<String, Object>>() {
+          }.getType());
       String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
       ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
 
@@ -175,9 +193,10 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
           //    using this remote interpreter process
           String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
           InterpreterLaunchContext context = gson.fromJson(
-              eventMsg, new TypeToken<InterpreterLaunchContext>() 
{}.getType());
-          InterpreterClient clusterOrDockerIntpProcess = 
createInterpreterProcess(context);
-          clusterOrDockerIntpProcess.start(context.getUserName());
+              eventMsg, new TypeToken<InterpreterLaunchContext>() {
+              }.getType());
+          InterpreterClient intpProcess = createInterpreterProcess(context);
+          intpProcess.start(context.getUserName());
           break;
         default:
           LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
@@ -188,6 +207,28 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     }
   }
 
+  private InterpreterClient createInterpreterProcess(InterpreterLaunchContext 
context)
+      throws IOException {
+    this.context = context;
+    this.properties = context.getProperties();
+
+    InterpreterClient intpProcess = null;
+    if (isRunningOnDocker(zConf)) {
+      DockerInterpreterLauncher dockerIntpLauncher = new 
DockerInterpreterLauncher(zConf, null);
+      dockerIntpLauncher.setProperties(context.getProperties());
+      intpProcess = dockerIntpLauncher.launch(context);
+    } else {
+      intpProcess = createClusterIntpProcess();
+    }
+
+    // must first step start check interpreter thread
+    ClusterInterpreterCheckThread intpCheckThread = new 
ClusterInterpreterCheckThread(
+        intpProcess, context.getInterpreterGroupId(), getConnectTimeout());
+    intpCheckThread.start();
+
+    return intpProcess;
+  }
+
   private RemoteInterpreterProcess createClusterIntpProcess() {
     ClusterInterpreterProcess clusterIntpProcess = null;
     try {
@@ -218,28 +259,6 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     return clusterIntpProcess;
   }
 
-  private InterpreterClient createInterpreterProcess(InterpreterLaunchContext 
context)
-      throws IOException {
-    this.context = context;
-    this.properties = context.getProperties();
-    int connectTimeout = getConnectTimeout();
-
-    InterpreterClient remoteIntpProcess = null;
-    if (isRunningOnDocker(zConf)) {
-      DockerInterpreterLauncher dockerIntpLauncher = new 
DockerInterpreterLauncher(zConf, null);
-      dockerIntpLauncher.setProperties(context.getProperties());
-      remoteIntpProcess = dockerIntpLauncher.launch(context);
-    } else {
-      remoteIntpProcess = createClusterIntpProcess();
-    }
-
-    ClusterInterpreterCheckThread intpCheckThread = new 
ClusterInterpreterCheckThread(
-        remoteIntpProcess, context.getInterpreterGroupId(), connectTimeout);
-    intpCheckThread.start();
-
-    return remoteIntpProcess;
-  }
-
   private boolean isRunningOnDocker(ZeppelinConfiguration zconf) {
     return zconf.getRunMode() == ZeppelinConfiguration.RUN_MODE.DOCKER;
   }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index 7d83f07..bd8bfbf 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -23,6 +23,8 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -31,6 +33,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class ClusterInterpreterLauncherTest extends ClusterMockTest {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(ClusterInterpreterLauncherTest.class);
 
   @BeforeClass
   public static void startTest() throws IOException, InterruptedException {
@@ -63,12 +66,12 @@ public class ClusterInterpreterLauncherTest extends 
ClusterMockTest {
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null,
         "user1", "intpGroupId", "groupId",
         "groupName", "name", 0, "host");
+
     InterpreterClient client = launcher.launch(context);
 
     assertTrue(client instanceof RemoteInterpreterRunningProcess);
     RemoteInterpreterRunningProcess interpreterProcess = 
(RemoteInterpreterRunningProcess) client;
-    assertEquals("INTP_TSERVER_HOST", interpreterProcess.getHost());
-    assertEquals(0, interpreterProcess.getPort());
+    assertEquals("127.0.0.1", interpreterProcess.getHost());
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
     assertEquals(5000, interpreterProcess.getConnectTimeout());
   }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index a7b6dee..285103f 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.zeppelin.interpreter.launcher;
 
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TTransportException;
 import org.apache.zeppelin.cluster.ClusterManagerClient;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.cluster.meta.ClusterMeta;
@@ -26,6 +28,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
 import java.util.HashMap;
 
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
@@ -45,6 +48,8 @@ public class ClusterMockTest {
   static int zServerPort;
   static final String metaKey = "ClusterMockKey";
 
+  static TServerSocket tSocket = null;
+
   public static void startCluster() throws IOException, InterruptedException {
     LOGGER.info("startCluster >>>");
 
@@ -80,6 +85,12 @@ public class ClusterMockTest {
     }
     assertEquals(true, clusterServer.isClusterLeader());
 
+    try {
+      tSocket = new TServerSocket(0);
+    } catch (TTransportException e) {
+      throw new IOException("Fail to create TServerSocket", e);
+    }
+
     LOGGER.info("startCluster <<<");
   }
 
@@ -91,6 +102,9 @@ public class ClusterMockTest {
     if (null != clusterClient) {
       clusterServer.shutdown();
     }
+
+    tSocket.close();
+
     LOGGER.info("stopCluster <<<");
   }
 
@@ -119,14 +133,16 @@ public class ClusterMockTest {
   public void mockIntpProcessMeta(String metaKey, boolean online) {
     // mock IntpProcess Meta
     HashMap<String, Object> meta = new HashMap<>();
-    meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
-    meta.put(ClusterMeta.SERVER_PORT, 0);
-    meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
-    meta.put(ClusterMeta.INTP_TSERVER_PORT, 0);
+    meta.put(ClusterMeta.SERVER_HOST, "127.0.0.1");
+    meta.put(ClusterMeta.SERVER_PORT, 6000);
+    meta.put(ClusterMeta.INTP_TSERVER_HOST, "127.0.0.1");
+    meta.put(ClusterMeta.INTP_TSERVER_PORT, 
tSocket.getServerSocket().getLocalPort());
     meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
     meta.put(ClusterMeta.CPU_USED, "CPU_USED");
     meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
     meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
+    meta.put(ClusterMeta.LATEST_HEARTBEAT, LocalDateTime.now());
+
     if (online) {
       meta.put(ClusterMeta.STATUS, ONLINE_STATUS);
     } else {
@@ -143,6 +159,6 @@ public class ClusterMockTest {
 
     assertNotNull(check);
     assertNotNull(check.get(metaKey));
-    assertEquals(true, check.get(metaKey).size() == 9);
+    assertEquals(true, check.get(metaKey).size() == 10);
   }
 }
diff --git 
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
 
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
index 2af0de9..17bb093 100644
--- 
a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java
@@ -223,8 +223,10 @@ public class DockerInterpreterProcess extends 
RemoteInterpreterProcess {
       execInContainer(containerId, dockerCommand, false);
     } catch (DockerException e) {
       LOGGER.error(e.getMessage(), e);
+      throw new IOException(e.getMessage());
     } catch (InterruptedException e) {
       LOGGER.error(e.getMessage(), e);
+      throw new IOException(e.getMessage());
     }
 
     long startTime = System.currentTimeMillis();
@@ -236,6 +238,7 @@ public class DockerInterpreterProcess extends 
RemoteInterpreterProcess {
           dockerStarted.wait(getConnectTimeout());
         } catch (InterruptedException e) {
           LOGGER.error("Remote interpreter is not accessible");
+          throw new IOException(e.getMessage());
         }
       }
     }

Reply via email to