This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eb665e  [ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher can not 
listen cluster events
7eb665e is described below

commit 7eb665e1d4048690f209b4132c5f76bc89357cd3
Author: Xun Liu <[email protected]>
AuthorDate: Fri Jul 5 12:57:16 2019 +0800

    [ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher can not listen cluster 
events
    
    ### What is this PR for?
    1. **ClusterInterpreterLauncher can not listen cluster events**
    In Cluster mode, The `ClusterInterpreterLauncher` needs to listen for 
cluster events, Accept requests from other zeppelin servers to create 
interpreter processes.
    Since the `ClusterInterpreterLauncher` is lazy, dynamically generated, So 
in cluster mode, when the zeppelin service starts, Create a 
`ClusterInterpreterLauncher` object, This allows the 
`ClusterInterpreterLauncher` to listen for cluster events.
    
    2. **Replace `RemoteInterpreterManagedProcess` with 
`ClusterInterpreterProcess` for local interpreter process creation**
    Because in cluster mode, `RemoteInterpreterServer` performs service 
discovery by registering thrift's `ip` and `port` into the cluster metadata, 
not through the thrift interface `callback`.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4209
    
    ### How should this be tested?
    * [CI Pass](https://travis-ci.org/liuxunorg/zeppelin/builds/552320897)
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update?
    * Is there breaking changes for older versions?
    * Does this needs documentation?
    
    Author: Xun Liu <[email protected]>
    
    Closes #3389 from liuxunorg/ZEPPELIN-4209 and squashes the following 
commits:
    
    85464c092 [Xun Liu] fixed travis faild
    71a64b373 [Xun Liu] Check if the interpreter process is offline
    1b52d2343 [Xun Liu] ClusterInterpreterLauncher initialization in 
zeppelinserver
    b1cdcaba7 [Xun Liu] [ZEPPELIN-4209] Fixed the ClusterInterpreterLauncher 
can not listen to cluster events
---
 .../launcher/ClusterInterpreterCheckThread.java    | 86 +++++++++++++++++++++
 .../launcher/ClusterInterpreterLauncher.java       | 83 +++++++++++---------
 .../launcher/ClusterInterpreterProcess.java        | 88 +---------------------
 .../launcher/ClusterInterpreterLauncherTest.java   | 15 ++--
 .../interpreter/launcher/ClusterMockTest.java      | 12 ++-
 .../org/apache/zeppelin/server/ZeppelinServer.java | 19 +++++
 .../zeppelin/interpreter/InterpreterSetting.java   |  4 +-
 7 files changed, 176 insertions(+), 131 deletions(-)

diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
new file mode 100644
index 0000000..fb7e41f
--- /dev/null
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterCheckThread.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.launcher;
+
+import org.apache.zeppelin.cluster.ClusterManagerServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
+
+// Metadata registered in the cluster by the interpreter process,
+// Keep the interpreter process started
+public class ClusterInterpreterCheckThread extends Thread {
+  private static final Logger LOGGER
+      = LoggerFactory.getLogger(ClusterInterpreterCheckThread.class);
+
+  private ClusterInterpreterProcess intpProcess;
+
+  ClusterInterpreterCheckThread(ClusterInterpreterProcess intpProcess) {
+    this.intpProcess = intpProcess;
+  }
+
+  @Override
+  public void run() {
+    LOGGER.info("ClusterInterpreterCheckThread run() >>>");
+
+    ClusterManagerServer clusterServer = ClusterManagerServer.getInstance();
+
+    String intpGroupId = intpProcess.getInterpreterGroupId();
+
+    HashMap<String, Object> intpMeta = clusterServer
+        .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+    int connectTimeout = intpProcess.getConnectTimeout();
+
+    int MAX_RETRY_GET_META = connectTimeout / 
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+    int retryGetMeta = 0;
+    while ((retryGetMeta++ < MAX_RETRY_GET_META)
+        && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+        || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
+      try {
+        Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
+        intpMeta = clusterServer
+            .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
+        LOGGER.info("retry {} times to get {} meta!", retryGetMeta, 
intpGroupId);
+      } catch (InterruptedException e) {
+        LOGGER.error(e.getMessage(), e);
+      }
+
+      if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
+          && intpMeta.containsKey(INTP_TSERVER_PORT)) {
+        String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
+        int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
+        LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
+
+        intpProcess.processStarted(intpPort, intpHost);
+        break;
+      }
+    }
+
+    if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+        || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
+      LOGGER.error("Can not found interpreter meta!");
+    }
+
+    LOGGER.info("ClusterInterpreterCheckThread run() <<<");
+  }
+}
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
index 1fe77a03..83f4232 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncher.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang.StringUtils;
 import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.cluster.event.ClusterEvent;
 import org.apache.zeppelin.cluster.event.ClusterEventListener;
@@ -38,8 +39,10 @@ import java.util.Map;
 import static 
org.apache.zeppelin.cluster.event.ClusterEvent.CREATE_INTP_PROCESS;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_HOST;
 import static org.apache.zeppelin.cluster.meta.ClusterMeta.SERVER_PORT;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.STATUS;
 import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
 
 /**
@@ -71,7 +74,8 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
     HashMap<String, Object> intpProcMeta = clusterServer
         .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
     if (null != intpProcMeta && intpProcMeta.containsKey(INTP_TSERVER_HOST)
-        && intpProcMeta.containsKey(INTP_TSERVER_PORT)) {
+        && intpProcMeta.containsKey(INTP_TSERVER_PORT) && 
intpProcMeta.containsKey(STATUS)
+        && StringUtils.equals((String) intpProcMeta.get(STATUS), 
ONLINE_STATUS)) {
       // connect exist Interpreter Process
       String intpTserverHost = (String) intpProcMeta.get(INTP_TSERVER_HOST);
       int intpTserverPort = (int) intpProcMeta.get(INTP_TSERVER_PORT);
@@ -91,9 +95,9 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
       String srvHost = (String) meta.get(SERVER_HOST);
       String localhost = RemoteInterpreterUtils.findAvailableHostAddress();
 
-      if (localhost.equalsIgnoreCase(srvHost) && false) {
+      if (localhost.equalsIgnoreCase(srvHost)) {
         // launch interpreter on local
-        return super.launch(context);
+        return createInterpreterProcess(context);
       } else {
         int srvPort = (int) meta.get(SERVER_PORT);
 
@@ -104,15 +108,20 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
         mapEvent.put(CLUSTER_EVENT, CREATE_INTP_PROCESS);
         mapEvent.put(CLUSTER_EVENT_MSG, sContext);
         String strEvent = gson.toJson(mapEvent);
+        // Notify other server in the cluster that the resource is idle to 
create an interpreter
         clusterServer.unicastClusterEvent(
             srvHost, srvPort, ClusterManagerServer.CLUSTER_INTP_EVENT_TOPIC, 
strEvent);
 
+        // Find the ip and port of thrift registered by the remote interpreter 
process
+        // through the cluster metadata
         HashMap<String, Object> intpMeta = clusterServer
             .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-        int retryGetMeta = connectTimeout / CHECK_META_INTERVAL;
-        while ((retryGetMeta-- > 0) &
-            (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-                || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
+
+        int MAX_RETRY_GET_META = connectTimeout / 
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
+        int retryGetMeta = 0;
+        while ((retryGetMeta++ < MAX_RETRY_GET_META)
+            && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
+            || !intpMeta.containsKey(INTP_TSERVER_PORT)) ) {
           try {
             Thread.sleep(CHECK_META_INTERVAL);
             intpMeta = clusterServer
@@ -126,11 +135,9 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
         // Check if the remote creation process is successful
         if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
             || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
-          LOGGER.error("Creating process {} failed on remote server {}:{}",
+          String errorInfo = String.format("Creating process %s failed on 
remote server %s:%d",
               intpGroupId, srvHost, srvPort);
-
-          // launch interpreter on local
-          return super.launch(context);
+          throw new IOException(errorInfo);
         } else {
           // connnect remote interpreter process
           String intpTSrvHost = (String) intpMeta.get(INTP_TSERVER_HOST);
@@ -151,29 +158,38 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
       LOGGER.debug(msg);
     }
 
-    Gson gson = new Gson();
-    Map<String, Object> mapEvent = gson.fromJson(msg,
-        new TypeToken<Map<String, Object>>(){}.getType());
-    String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
-    ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
-
-    switch (clusterEvent) {
-      case CREATE_INTP_PROCESS:
-        onCreateIntpProcess(mapEvent);
-        break;
-      default:
-        LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
-        break;
+    try {
+      Gson gson = new Gson();
+      Map<String, Object> mapEvent = gson.fromJson(msg,
+          new TypeToken<Map<String, Object>>(){}.getType());
+      String sEvent = (String) mapEvent.get(CLUSTER_EVENT);
+      ClusterEvent clusterEvent = ClusterEvent.valueOf(sEvent);
+
+      switch (clusterEvent) {
+        case CREATE_INTP_PROCESS:
+          // 1)Other zeppelin servers in the cluster send requests to create 
an interpreter process
+          // 2)After the interpreter process is created, and the interpreter 
is started,
+          //    the interpreter registers the thrift ip and port into the 
cluster metadata.
+          // 3)Other servers connect through the IP and port of thrift in the 
cluster metadata,
+          //    using this remote interpreter process
+          String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+          InterpreterLaunchContext context = gson.fromJson(
+              eventMsg, new TypeToken<InterpreterLaunchContext>() 
{}.getType());
+          ClusterInterpreterProcess clusterInterpreterProcess = 
createInterpreterProcess(context);
+          clusterInterpreterProcess.start(context.getUserName());
+          break;
+        default:
+          LOGGER.error("Unknown clusterEvent:{}, msg:{} ", clusterEvent, msg);
+          break;
+      }
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage(), e);
     }
   }
 
-  private void onCreateIntpProcess(Map<String, Object> mapEvent) {
-    String eventMsg = (String) mapEvent.get(CLUSTER_EVENT_MSG);
+  private ClusterInterpreterProcess 
createInterpreterProcess(InterpreterLaunchContext context) {
+    ClusterInterpreterProcess clusterInterpreterProcess = null;
     try {
-      Gson gson = new Gson();
-      InterpreterLaunchContext context = gson.fromJson(
-          eventMsg, new TypeToken<InterpreterLaunchContext>() {}.getType());
-
       this.properties = context.getProperties();
       InterpreterOption option = context.getOption();
       InterpreterRunner runner = context.getRunner();
@@ -183,8 +199,7 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
       String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/"
           + context.getInterpreterSettingId();
 
-      ClusterInterpreterProcess clusterInterpreterProcess
-          = new ClusterInterpreterProcess(
+      clusterInterpreterProcess = new ClusterInterpreterProcess(
           runner != null ? runner.getPath() : 
zConf.getInterpreterRemoteRunnerPath(),
           context.getZeppelinServerRPCPort(),
           context.getZeppelinServerHost(),
@@ -196,10 +211,10 @@ public class ClusterInterpreterLauncher extends 
StandardInterpreterLauncher
           intpSetName,
           context.getInterpreterGroupId(),
           option.isUserImpersonate());
-
-      clusterInterpreterProcess.start(context.getUserName());
     } catch (IOException e) {
       LOGGER.error(e.getMessage(), e);
     }
+
+    return clusterInterpreterProcess;
   }
 }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
index 986c2ed..8f0fcc7 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java
@@ -1,28 +1,12 @@
 package org.apache.zeppelin.interpreter.launcher;
 
 import java.io.IOException;
-import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_HOST;
-import static org.apache.zeppelin.cluster.meta.ClusterMeta.INTP_TSERVER_PORT;
-import static 
org.apache.zeppelin.cluster.meta.ClusterMetaType.INTP_PROCESS_META;
-
 
 public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess 
{
-  private static final Logger LOGGER
-      = LoggerFactory.getLogger(ClusterInterpreterProcess.class);
-
-  private String intpHost = "";
-  private int intpPort = 0;
-
-  private ClusterManagerServer clusterServer = 
ClusterManagerServer.getInstance();
 
   public ClusterInterpreterProcess(
       String intpRunner,
@@ -52,31 +36,13 @@ public class ClusterInterpreterProcess extends 
RemoteInterpreterManagedProcess {
 
   @Override
   public void start(String userName) throws IOException {
-    CheckIntpRunStatusThread checkIntpRunStatusThread = new 
CheckIntpRunStatusThread(this);
-    checkIntpRunStatusThread.start();
+    ClusterInterpreterCheckThread interpreterCheckThread = new 
ClusterInterpreterCheckThread(this);
+    interpreterCheckThread.start();
 
     super.start(userName);
   }
 
   @Override
-  public void processStarted(int port, String host) {
-    // Cluster mode, discovering interpreter processes through metadata 
registration
-    this.intpHost = host;
-    this.intpPort = port;
-    super.processStarted(port, host);
-  }
-
-  @Override
-  public String getHost() {
-    return intpHost;
-  }
-
-  @Override
-  public int getPort() {
-    return intpPort;
-  }
-
-  @Override
   public boolean isRunning() {
     if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), 
getPort())) {
       return true;
@@ -88,54 +54,4 @@ public class ClusterInterpreterProcess extends 
RemoteInterpreterManagedProcess {
   public String getErrorMessage() {
     return null;
   }
-
-  // Metadata registered in the cluster by the interpreter process,
-  // Keep the interpreter process started
-  private class CheckIntpRunStatusThread extends Thread {
-    private ClusterInterpreterProcess intpProcess;
-
-    CheckIntpRunStatusThread(ClusterInterpreterProcess intpProcess) {
-      this.intpProcess = intpProcess;
-    }
-
-    @Override
-    public void run() {
-      LOGGER.info("checkIntpRunStatusThread run() >>>");
-
-      String intpGroupId = getInterpreterGroupId();
-
-      HashMap<String, Object> intpMeta = clusterServer
-          .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-      int connectTimeout = intpProcess.getConnectTimeout();
-      int retryGetMeta = connectTimeout / 
ClusterInterpreterLauncher.CHECK_META_INTERVAL;
-      while ((retryGetMeta-- > 0)
-          && (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-          || !intpMeta.containsKey(INTP_TSERVER_PORT))) {
-        try {
-          Thread.sleep(ClusterInterpreterLauncher.CHECK_META_INTERVAL);
-          intpMeta = clusterServer
-              .getClusterMeta(INTP_PROCESS_META, intpGroupId).get(intpGroupId);
-          LOGGER.info("retry {} times to get {} meta!", retryGetMeta, 
intpGroupId);
-        } catch (InterruptedException e) {
-          LOGGER.error(e.getMessage(), e);
-        }
-
-        if (null != intpMeta && intpMeta.containsKey(INTP_TSERVER_HOST)
-            && intpMeta.containsKey(INTP_TSERVER_PORT)) {
-          String intpHost = (String) intpMeta.get(INTP_TSERVER_HOST);
-          int intpPort = (int) intpMeta.get(INTP_TSERVER_PORT);
-          LOGGER.info("Found cluster interpreter {}:{}", intpHost, intpPort);
-
-          intpProcess.processStarted(intpPort, intpHost);
-        }
-      }
-
-      if (null == intpMeta || !intpMeta.containsKey(INTP_TSERVER_HOST)
-          || !intpMeta.containsKey(INTP_TSERVER_PORT)) {
-        LOGGER.error("Can not found interpreter meta!");
-      }
-
-      LOGGER.info("checkIntpRunStatusThread run() <<<");
-    }
-  }
 }
diff --git 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
index da4bf5e..2b2b172 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterLauncherTest.java
@@ -18,7 +18,6 @@ package org.apache.zeppelin.interpreter.launcher;
 
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
-import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -51,8 +50,8 @@ public class ClusterInterpreterLauncherTest extends 
ClusterMockTest {
   }
 
   @Test
-  public void testConnectExistIntpProcess() throws IOException {
-    mockIntpProcessMeta("intpGroupId");
+  public void testConnectExistOnlineIntpProcess() throws IOException {
+    mockIntpProcessMeta("intpGroupId", true);
 
     ClusterInterpreterLauncher launcher
         = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
@@ -75,7 +74,9 @@ public class ClusterInterpreterLauncherTest extends 
ClusterMockTest {
   }
 
   @Test
-  public void testCreateIntpProcess() throws IOException {
+  public void testConnectExistOfflineIntpProcess() throws IOException {
+    mockIntpProcessMeta("intpGroupId2", false);
+
     ClusterInterpreterLauncher launcher
         = new ClusterInterpreterLauncher(ClusterMockTest.zconf, null);
     Properties properties = new Properties();
@@ -84,12 +85,12 @@ public class ClusterInterpreterLauncherTest extends 
ClusterMockTest {
     InterpreterOption option = new InterpreterOption();
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null,
-        "user1", "intpGroupId", "groupId",
+        "user1", "intpGroupId2", "groupId",
         "groupName", "name", 0, "host");
     InterpreterClient client = launcher.launch(context);
 
-    assertTrue(client instanceof RemoteInterpreterManagedProcess);
-    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertTrue(client instanceof ClusterInterpreterProcess);
+    ClusterInterpreterProcess interpreterProcess = (ClusterInterpreterProcess) 
client;
     assertEquals("name", interpreterProcess.getInterpreterSettingName());
     assertEquals(".//interpreter/groupName", 
interpreterProcess.getInterpreterDir());
     assertEquals(".//local-repo/groupId", 
interpreterProcess.getLocalRepoDir());
diff --git 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
index 72b51b4..a7b6dee 100644
--- 
a/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
+++ 
b/zeppelin-plugins/launcher/cluster/src/test/java/org/apache/zeppelin/interpreter/launcher/ClusterMockTest.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashMap;
 
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.OFFLINE_STATUS;
+import static org.apache.zeppelin.cluster.meta.ClusterMeta.ONLINE_STATUS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
@@ -114,7 +116,7 @@ public class ClusterMockTest {
     LOGGER.info("serverMeta <<<");
   }
 
-  public void mockIntpProcessMeta(String metaKey) {
+  public void mockIntpProcessMeta(String metaKey, boolean online) {
     // mock IntpProcess Meta
     HashMap<String, Object> meta = new HashMap<>();
     meta.put(ClusterMeta.SERVER_HOST, "SERVER_HOST");
@@ -125,7 +127,11 @@ public class ClusterMockTest {
     meta.put(ClusterMeta.CPU_USED, "CPU_USED");
     meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
     meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
-
+    if (online) {
+      meta.put(ClusterMeta.STATUS, ONLINE_STATUS);
+    } else {
+      meta.put(ClusterMeta.STATUS, OFFLINE_STATUS);
+    }
     // put IntpProcess Meta
     clusterClient.putClusterMeta(ClusterMetaType.INTP_PROCESS_META, metaKey, 
meta);
 
@@ -137,6 +143,6 @@ public class ClusterMockTest {
 
     assertNotNull(check);
     assertNotNull(check.get(metaKey));
-    assertEquals(true, check.get(metaKey).size() == 8);
+    assertEquals(true, check.get(metaKey).size() == 9);
   }
 }
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 10ee180..04b2b71 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -41,8 +41,10 @@ import org.apache.zeppelin.helium.HeliumApplicationFactory;
 import org.apache.zeppelin.helium.HeliumBundleFactory;
 import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage;
+import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
 import org.apache.zeppelin.notebook.NoteEventListener;
 import org.apache.zeppelin.notebook.Notebook;
@@ -60,6 +62,7 @@ import org.apache.zeppelin.service.*;
 import org.apache.zeppelin.service.AuthenticationService;
 import org.apache.zeppelin.socket.NotebookServer;
 import org.apache.zeppelin.user.Credentials;
+import org.apache.zeppelin.util.ReflectionUtils;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.jmx.ConnectorServer;
 import org.eclipse.jetty.jmx.MBeanContainer;
@@ -358,6 +361,22 @@ public class ZeppelinServer extends ResourceConfig {
       AuthorizationService authorizationService = 
serviceLocator.getService(AuthorizationService.class);
       
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_NOTE_EVENT_TOPIC,
 notebookServer);
       
clusterManagerServer.addClusterEventListeners(ClusterManagerServer.CLUSTER_AUTH_EVENT_TOPIC,
 authorizationService);
+
+      // Since the ClusterInterpreterLauncher is lazy, dynamically generated, 
So in cluster mode,
+      // when the zeppelin service starts, Create a ClusterInterpreterLauncher 
object,
+      // This allows the ClusterInterpreterLauncher to listen for cluster 
events.
+      try {
+        InterpreterSettingManager intpSettingManager = 
sharedServiceLocator.getService(InterpreterSettingManager.class);
+        RecoveryStorage recoveryStorage = ReflectionUtils.createClazzInstance(
+                conf.getRecoveryStorageClass(),
+                new Class[] {ZeppelinConfiguration.class, 
InterpreterSettingManager.class},
+                new Object[] {conf, intpSettingManager});
+        recoveryStorage.init();
+        
PluginManager.get().loadInterpreterLauncher(InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME,
 recoveryStorage);
+      } catch (IOException e) {
+        LOG.error(e.getMessage(), e);
+      }
+
       clusterManagerServer.start();
     }
   }
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
index fb15e74..0afd7b0 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java
@@ -131,6 +131,8 @@ public class InterpreterSetting {
   private transient LifecycleManager lifecycleManager;
   private transient RecoveryStorage recoveryStorage;
   private transient RemoteInterpreterEventServer interpreterEventServer;
+
+  public static final String CLUSTER_INTERPRETER_LAUNCHER_NAME = 
"ClusterInterpreterLauncher";
   
///////////////////////////////////////////////////////////////////////////////////////////
 
   /**
@@ -671,7 +673,7 @@ public class InterpreterSetting {
     if (isRunningOnKubernetes()) {
       return "K8sStandardInterpreterLauncher";
     } else if (isRunningOnCluster()) {
-      return "ClusterInterpreterLauncher";
+      return InterpreterSetting.CLUSTER_INTERPRETER_LAUNCHER_NAME;
     } if (isRunningOnDocker()) {
       return "DockerInterpreterLauncher";
     } else {

Reply via email to