This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c00e25  [ZEPPELIN-3626] Cluster server and client module
5c00e25 is described below

commit 5c00e256afe95236dab3facfbebfb6d381c2394f
Author: liuxunorg <[email protected]>
AuthorDate: Tue Mar 19 16:32:52 2019 +0800

    [ZEPPELIN-3626] Cluster server and client module
    
    ### What is this PR for?
    The cluster management server uses the Raft algorithm library Atomix to 
form a service cluster with consistent service status in the Zeppelin cluster. 
The cluster management client connects to the cluster management server for 
metadata operations of services and processes. The cluster monitoring module 
checks if each Zeppelin-Server and Zeppelin Interpreter process in the cluster 
is active.
    
    [Include]
    
    * [x] add cluster manager server
    * [x] add cluster manager client
    * [x] add cluster monitor
    * [x] add cluster create Interpreter thrift interface
    * [x] add InterpreterFactoryInterface
    * [x] add the function call in the ZeppelinServer startup cluster
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-3626
    
    ### Screenshots
    ![create zeppelin 
cluster](https://user-images.githubusercontent.com/3677382/52460928-adfbbd80-2ba7-11e9-9b4e-226448eb2329.gif)
    
    ### How should this be tested?
    CI pass
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? [Design 
document](https://docs.google.com/document/d/1a8QLSyR3M5AhlG1GIYuDTj6bwazeuVDKCRRBm-Qa3Bw/edit#heading=h.qbcgqhd0wwh8)
    
    Author: liuxunorg <[email protected]>
    
    Closes #3203 from liuxunorg/ZEPPELIN-3626 and squashes the following 
commits:
    
    04f0da7b1 [liuxunorg] Fixing guava version causes travis to fail.
    7db687a82 [liuxunorg] [ZEPPELIN-3626] Cluster server and client module
---
 zeppelin-interpreter-integration/pom.xml           |  11 +
 zeppelin-interpreter/pom.xml                       |  11 -
 .../apache/zeppelin/cluster/ClusterManager.java    |  20 +-
 .../zeppelin/cluster/ClusterManagerClient.java     |  77 ++
 .../zeppelin/cluster/ClusterManagerServer.java     | 347 +++++++
 .../apache/zeppelin/cluster/ClusterMonitor.java    | 248 +++++
 .../apache/zeppelin/cluster/meta/ClusterMeta.java  |   4 +
 .../interpreter/InterpreterFactoryInterface.java   |  34 +
 .../interpreter/InterpreterNotFoundException.java  |   0
 .../thrift/ClusterIntpProcParameters.java          | 938 +++++++++++++++++++
 .../interpreter/thrift/ClusterManagerService.java  | 998 +++++++++++++++++++++
 .../src/main/thrift/ClusterManagerService.thrift   |  32 +
 zeppelin-interpreter/src/main/thrift/genthrift.sh  |   1 +
 .../zeppelin/cluster/ClusterManagerTest.java       | 135 +++
 .../src/test/resources/zeppelin-site.xml           |  38 +
 .../org/apache/zeppelin/server/ZeppelinServer.java |  16 +
 .../zeppelin/interpreter/InterpreterFactory.java   |   3 +-
 17 files changed, 2890 insertions(+), 23 deletions(-)

diff --git a/zeppelin-interpreter-integration/pom.xml 
b/zeppelin-interpreter-integration/pom.xml
index d66b279..faacd2b 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -52,6 +52,17 @@
       <groupId>org.apache.zeppelin</groupId>
       <artifactId>zeppelin-zengine</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>20.0</version>
     </dependency>
 
     <dependency>
diff --git a/zeppelin-interpreter/pom.xml b/zeppelin-interpreter/pom.xml
index bf624ce..3e1183e 100644
--- a/zeppelin-interpreter/pom.xml
+++ b/zeppelin-interpreter/pom.xml
@@ -46,7 +46,6 @@
     <jline.version>2.14.3</jline.version>
     <atomix.version>3.0.0-rc4</atomix.version>
     <commons-math3.version>3.1.1</commons-math3.version>
-    <guava.version>20.0</guava.version>
     <commons-lang3.version>3.7</commons-lang3.version>
 
     <!--plugin versions-->
@@ -67,10 +66,6 @@
           <groupId>org.apache.commons</groupId>
           <artifactId>commons-lang3</artifactId>
         </exclusion>
-        <exclusion>
-          <groupId>com.google.guava</groupId>
-          <artifactId>guava</artifactId>
-        </exclusion>
       </exclusions>
     </dependency>
 
@@ -99,12 +94,6 @@
     </dependency>
 
     <dependency>
-      <groupId>com.google.guava</groupId>
-      <artifactId>guava</artifactId>
-      <version>${guava.version}</version>
-    </dependency>
-
-    <dependency>
       <groupId>org.apache.thrift</groupId>
       <artifactId>libthrift</artifactId>
     </dependency>
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
index 683f068..9c70a2e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManager.java
@@ -128,10 +128,6 @@ public abstract class ClusterManager {
 
   protected Collection<Node> clusterNodes = new ArrayList<>();
 
-  // raft
-  protected static String ZEPL_CLUSTER_ID = "ZEPL-CLUSTER";
-  protected static String ZEPL_CLIENT_ID = "ZEPL-CLIENT";
-
   protected int raftServerPort = 0;
 
   protected RaftClient raftClient = null;
@@ -139,7 +135,6 @@ public abstract class ClusterManager {
   protected Map<MemberId, Address> raftAddressMap = new ConcurrentHashMap<>();
   protected LocalRaftProtocolFactory protocolFactory
       = new LocalRaftProtocolFactory(protocolSerializer);
-  protected List<MessagingService> messagingServices = new ArrayList<>();
   protected List<MemberId> clusterMemberIds = new ArrayList<MemberId>();
 
   protected AtomicBoolean running = new AtomicBoolean(true);
@@ -151,6 +146,8 @@ public abstract class ClusterManager {
   // zeppelin server host & port
   protected String zeplServerHost = "";
 
+  protected ClusterMonitor clusterMonitor = null;
+
   public ClusterManager() {
     try {
       zeplServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
@@ -166,11 +163,12 @@ public abstract class ClusterManager {
             raftServerPort = clusterPort;
           }
 
-          Node node = Node.builder().withId(cluster[i])
-              .withAddress(Address.from(clusterHost, clusterPort)).build();
+          String memberId = clusterHost + ":" + clusterPort;
+          Address address = Address.from(clusterHost, clusterPort);
+          Node node = 
Node.builder().withId(memberId).withAddress(address).build();
           clusterNodes.add(node);
-          raftAddressMap.put(MemberId.from(cluster[i]), 
Address.from(clusterHost, clusterPort));
-          clusterMemberIds.add(MemberId.from(cluster[i]));
+          raftAddressMap.put(MemberId.from(memberId), address);
+          clusterMemberIds.add(MemberId.from(memberId));
         }
       }
     } catch (UnknownHostException e) {
@@ -218,7 +216,7 @@ public abstract class ClusterManager {
           LOGGER.error(e.getMessage());
         }
 
-        MemberId memberId = MemberId.from(ZEPL_CLIENT_ID + zeplServerHost + 
":" + raftClientPort);
+        MemberId memberId = MemberId.from(zeplServerHost + ":" + 
raftClientPort);
         Address address = Address.from(zeplServerHost, raftClientPort);
         raftAddressMap.put(memberId, address);
 
@@ -308,7 +306,7 @@ public abstract class ClusterManager {
     }
   }
 
-  public String getClusterName() {
+  public String getClusterNodeName() {
     return zeplServerHost + ":" + raftServerPort;
   }
 
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
new file mode 100644
index 0000000..c969bd6
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerClient.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.primitive.PrimitiveState;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
+
+/**
+ * Cluster management client class instantiated in zeppelin-interperter
+ */
+public class ClusterManagerClient extends ClusterManager {
+  private static ClusterManagerClient instance = null;
+
+  public static ClusterManagerClient getInstance() {
+    synchronized (ClusterManagerClient.class) {
+      if (instance == null) {
+        instance = new ClusterManagerClient();
+      }
+      return instance;
+    }
+  }
+
+  public ClusterManagerClient() {
+    super();
+  }
+
+  @Override
+  public boolean raftInitialized() {
+    if (null != raftClient && null != raftSessionClient
+        && raftSessionClient.getState() == PrimitiveState.CONNECTED) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isClusterLeader() {
+    return false;
+  }
+
+  // In the ClusterManagerClient metaKey equal interperterGroupId
+  public void start(String metaKey) {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+    super.start();
+
+    // Instantiated cluster monitoring class
+    clusterMonitor = new ClusterMonitor(this);
+    clusterMonitor.start(IntpProcessMeta, metaKey);
+  }
+
+  public void shutdown() {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+    clusterMonitor.shutdown();
+
+    super.shutdown();
+  }
+}
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
new file mode 100644
index 0000000..41e670a
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterManagerServer.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import io.atomix.cluster.*;
+import io.atomix.cluster.discovery.BootstrapDiscoveryProvider;
+import io.atomix.cluster.impl.DefaultClusterMembershipService;
+import io.atomix.cluster.impl.DefaultNodeDiscoveryService;
+import io.atomix.cluster.messaging.BroadcastService;
+import io.atomix.cluster.messaging.MessagingService;
+import io.atomix.cluster.messaging.impl.NettyMessagingService;
+import io.atomix.primitive.PrimitiveState;
+import io.atomix.protocols.raft.RaftServer;
+import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.storage.RaftStorage;
+import io.atomix.storage.StorageLevel;
+import io.atomix.utils.net.Address;
+import org.apache.commons.lang.StringUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.protocol.RaftServerMessagingProtocol;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.InterpreterFactoryInterface;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.apache.zeppelin.interpreter.thrift.ClusterIntpProcParameters;
+import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+
+/**
+ * Cluster management server class instantiated in zeppelin-server
+ * 1. Create a raft server
+ * 2. Remotely create interpreter's thrift service
+ */
+public class ClusterManagerServer extends ClusterManager
+    implements ClusterManagerService.Iface {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(ClusterManagerServer.class);
+
+  private static ClusterManagerServer instance = null;
+
+  // raft server
+  protected RaftServer raftServer = null;
+
+  protected MessagingService messagingService = null;
+
+  // zeppelin cluster manager thrift service
+  private TThreadPoolServer clusterManagerTserver = null;
+  private ClusterManagerService.Processor<ClusterManagerServer> 
clusterManagerProcessor = null;
+
+  // Find interpreter by note
+  private InterpreterFactoryInterface interpreterFactory = null;
+
+  // Connect to the interpreter process that has been created
+  public static String CONNET_EXISTING_PROCESS = "CONNET_EXISTING_PROCESS";
+
+  private ClusterManagerServer() {
+    super();
+
+    clusterManagerProcessor = new ClusterManagerService.Processor<>(this);
+
+    deleteRaftSystemData();
+  }
+
+  public static ClusterManagerServer getInstance() {
+    synchronized (ClusterManagerServer.class) {
+      if (instance == null) {
+        instance = new ClusterManagerServer();
+      }
+      return instance;
+    }
+  }
+
+  public void start(InterpreterFactoryInterface interpreterFactory) {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+
+    this.interpreterFactory = interpreterFactory;
+
+    initThread();
+
+    // Instantiated raftServer monitoring class
+    String clusterName = getClusterNodeName();
+    clusterMonitor = new ClusterMonitor(this);
+    clusterMonitor.start(ServerMeta, clusterName);
+
+    super.start();
+  }
+
+  @Override
+  public boolean raftInitialized() {
+    if (null != raftServer && raftServer.isRunning()
+        && null != raftClient && null != raftSessionClient
+        && raftSessionClient.getState() == PrimitiveState.CONNECTED) {
+      return true;
+    }
+
+    return false;
+  }
+
+  @Override
+  public boolean isClusterLeader() {
+    if (null == raftServer
+        || !raftServer.isRunning()
+        || !raftServer.isLeader()) {
+      return false;
+    }
+
+    return true;
+  }
+
+  protected void deleteRaftSystemData() {
+    String zeppelinHome = zconf.getZeppelinHome();
+    Path directory = new File(zeppelinHome, ".data").toPath();
+    if (Files.exists(directory)) {
+      try {
+        Files.walkFileTree(directory, new SimpleFileVisitor<Path>() {
+          @Override
+          public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs)
+              throws IOException {
+            Files.delete(file);
+            return FileVisitResult.CONTINUE;
+          }
+
+          @Override
+          public FileVisitResult postVisitDirectory(Path dir, IOException exc)
+              throws IOException {
+            Files.delete(dir);
+            return FileVisitResult.CONTINUE;
+          }
+        });
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+  }
+
+  private void initThread() {
+    // RaftServer Thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOGGER.info("RaftServer run() >>>");
+
+        Address address = Address.from(zeplServerHost, raftServerPort);
+        Member member = Member.builder(MemberId.from(zeplServerHost + ":" + 
raftServerPort))
+            .withAddress(address)
+            .build();
+        messagingService = NettyMessagingService.builder()
+            .withAddress(address).build().start().join();
+        RaftServerProtocol protocol = new RaftServerMessagingProtocol(
+            messagingService, ClusterManager.protocolSerializer, 
raftAddressMap::get);
+
+        BootstrapService bootstrapService = new BootstrapService() {
+          @Override
+          public MessagingService getMessagingService() {
+            return messagingService;
+          }
+
+          @Override
+          public BroadcastService getBroadcastService() {
+            return new BroadcastServiceAdapter();
+          }
+        };
+
+        ManagedClusterMembershipService clusterService = new 
DefaultClusterMembershipService(
+            member,
+            new DefaultNodeDiscoveryService(bootstrapService, member,
+                new BootstrapDiscoveryProvider(clusterNodes)),
+            bootstrapService,
+            new MembershipConfig());
+
+        RaftServer.Builder builder = RaftServer.builder(member.id())
+            .withMembershipService(clusterService)
+            .withProtocol(protocol)
+            .withStorage(RaftStorage.builder()
+                .withStorageLevel(StorageLevel.MEMORY)
+                .withSerializer(storageSerializer)
+                .withMaxSegmentSize(1024 * 1024)
+                .build());
+
+        raftServer = builder.build();
+        raftServer.bootstrap(clusterMemberIds);
+
+        LOGGER.info("RaftServer run() <<<");
+      }
+    }).start();
+
+    // Cluster manager thrift thread
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        LOGGER.info("TServerThread run() >>>");
+
+        ZeppelinConfiguration zconf = new ZeppelinConfiguration();
+        String portRange = zconf.getZeppelinServerRPCPortRange();
+
+        try {
+          TServerSocket serverTransport = 
RemoteInterpreterUtils.createTServerSocket(portRange);
+          int tserverPort = serverTransport.getServerSocket().getLocalPort();
+
+          clusterManagerTserver = new TThreadPoolServer(
+              new 
TThreadPoolServer.Args(serverTransport).processor(clusterManagerProcessor));
+          LOGGER.info("Starting raftServer manager Tserver on port {}", 
tserverPort);
+
+          String nodeName = getClusterNodeName();
+          HashMap<String, Object> meta = new HashMap<String, Object>();
+          meta.put(ClusterMeta.NODE_NAME, nodeName);
+          meta.put(ClusterMeta.SERVER_TSERVER_HOST, zeplServerHost);
+          meta.put(ClusterMeta.SERVER_TSERVER_PORT, tserverPort);
+          meta.put(ClusterMeta.SERVER_START_TIME, new Date());
+
+          putClusterMeta(ServerMeta, nodeName, meta);
+        } catch (UnknownHostException e) {
+          LOGGER.error(e.getMessage());
+        } catch (SocketException e) {
+          LOGGER.error(e.getMessage());
+        } catch (IOException e) {
+          LOGGER.error(e.getMessage());
+        }
+
+        clusterManagerTserver.serve();
+
+        LOGGER.info("TServerThread run() <<<");
+      }
+    }).start();
+  }
+
+  @Override
+  public void shutdown() {
+    if (!zconf.isClusterMode()) {
+      return;
+    }
+
+    try {
+      // delete local machine meta
+      deleteClusterMeta(ServerMeta, getClusterNodeName());
+      Thread.sleep(300);
+      clusterMonitor.shutdown();
+      // wait raft commit metadata
+      Thread.sleep(300);
+    } catch (InterruptedException e) {
+      LOGGER.error(e.getMessage());
+    }
+
+    if (null != raftServer && raftServer.isRunning()) {
+      try {
+        raftServer.shutdown().get(3, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {
+        LOGGER.error(e.getMessage());
+      } catch (ExecutionException e) {
+        LOGGER.error(e.getMessage());
+      } catch (TimeoutException e) {
+        LOGGER.error(e.getMessage());
+      }
+    }
+
+    clusterManagerTserver.stop();
+
+    super.shutdown();
+  }
+
+  public boolean openRemoteInterpreterProcess(
+      String host, int port, final ClusterIntpProcParameters 
clusterIntpProcParameters)
+      throws TException {
+    LOGGER.info("host: {}, port: {}, clusterIntpProcParameters: {}",
+        host, port, clusterIntpProcParameters);
+
+    try (TTransport transport = new TSocket(host, port)) {
+      transport.open();
+      TProtocol protocol = new TBinaryProtocol(transport);
+      ClusterManagerService.Client client = new 
ClusterManagerService.Client(protocol);
+
+      return client.createClusterInterpreterProcess(clusterIntpProcParameters);
+    }
+  }
+
+  @Override
+  public boolean createClusterInterpreterProcess(ClusterIntpProcParameters 
clusterIntpProcParameters) {
+    // TODO: ZEPPELIN-3623
+
+    return true;
+  }
+
+  // Obtain the server node whose resources are idle in the cluster
+  public HashMap<String, Object> getIdleNodeMeta() {
+    HashMap<String, Object> idleNodeMeta = null;
+    HashMap<String, HashMap<String, Object>> clusterMeta = 
getClusterMeta(ServerMeta, "");
+
+    long memoryIdle = 0;
+    for (Map.Entry<String, HashMap<String, Object>> entry : 
clusterMeta.entrySet()) {
+      HashMap<String, Object> meta = entry.getValue();
+      // Check if the service or process is offline
+      String status = (String) meta.get(ClusterMeta.STATUS);
+      if (null == status || StringUtils.isEmpty(status)
+          || status.equals(ClusterMeta.OFFLINE_STATUS)) {
+        continue;
+      }
+
+      long memoryCapacity  = (long) meta.get(ClusterMeta.MEMORY_CAPACITY);
+      long memoryUsed      = (long) meta.get(ClusterMeta.MEMORY_USED);
+      long idle = memoryCapacity - memoryUsed;
+      if (idle > memoryIdle) {
+        memoryIdle = idle;
+        idleNodeMeta = meta;
+      }
+    }
+
+    return idleNodeMeta;
+  }
+}
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
new file mode 100644
index 0000000..86fa6a7
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/ClusterMonitor.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.cluster;
+
+import com.sun.management.OperatingSystemMXBean;
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.IntpProcessMeta;
+import static org.apache.zeppelin.cluster.meta.ClusterMetaType.ServerMeta;
+
+/**
+ * cluster monitoring
+ * 1. cluster monitoring is also used for zeppelin-Server and zeppelin 
Interperter,
+ *    distinguish by member variable ClusterMetaType
+ * 2. Report the average of the server resource CPU and MEMORY usage in the
+ *    last few minutes to smooth the server's instantaneous peak
+ * 3. checks the heartbeat timeout of the zeppelin-server and interperter 
processes
+ */
+public class ClusterMonitor {
+  private static Logger LOGGER = LoggerFactory.getLogger(ClusterMonitor.class);
+
+  // Whether the thread has started
+  private static AtomicBoolean running = new AtomicBoolean(true);
+
+  private ClusterManager clusterManager = null;
+
+  // Save the CPU resource and MEmory usage of the server resources
+  // in the last few minutes through the queue, and then average them through 
the queue.
+  private Queue<UsageUtil> monitorUsageQueues = new LinkedList<>();
+  private final int USAGE_QUEUE_LIMIT = 100; // queue length
+  private int heartbeatInterval = 3000; // Heartbeat reporting 
interval(milliseconds)
+
+  // The zeppelin-server leader checks the heartbeat timeout of
+  // the zeppelin-server and zeppelin-interperter processes in the cluster 
metadata.
+  // If this time is exceeded, the zeppelin-server and interperter processes
+  // can have an exception and no heartbeat is reported.
+  private int heartbeatTimeout = 9000;
+
+  // Type of cluster monitoring object
+  private ClusterMetaType clusterMetaType;
+
+  // The key of the cluster monitoring object,
+  // the name of the cluster when monitoring the zeppelin-server,
+  // and the interperterGroupID when monitoring the interperter processes
+  private String metaKey;
+
+  public ClusterMonitor(ClusterManager clusterManagerServer) {
+    this.clusterManager = clusterManagerServer;
+
+    ZeppelinConfiguration zconf = new ZeppelinConfiguration();
+    heartbeatInterval = zconf.getClusterHeartbeatInterval();
+    heartbeatTimeout = zconf.getClusterHeartbeatTimeout();
+
+    if (heartbeatTimeout < heartbeatInterval) {
+      LOGGER.error("Heartbeat timeout must be greater than heartbeat period.");
+      heartbeatTimeout = heartbeatInterval * 3;
+      LOGGER.info("Heartbeat timeout is modified to 3 times the heartbeat 
period.");
+    }
+
+    if (heartbeatTimeout < heartbeatInterval * 3) {
+      LOGGER.warn("Heartbeat timeout recommended than 3 times the heartbeat 
period.");
+    }
+  }
+
+  //
+  public void start(ClusterMetaType clusterMetaType, String metaKey) {
+    this.clusterMetaType = clusterMetaType;
+    this.metaKey = metaKey;
+
+    new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (running.get()) {
+          switch (clusterMetaType) {
+            case ServerMeta:
+              sendMachineUsage();
+              checkHealthy();
+              break;
+            case IntpProcessMeta:
+              sendHeartbeat();
+              break;
+            default:
+              LOGGER.error("unknown cluster meta type:{}", clusterMetaType);
+              break;
+          }
+
+          try {
+            Thread.sleep(heartbeatInterval);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+      }
+    }).start();
+  }
+
+  public void shutdown() {
+    running.set(false);
+  }
+
+  // Check the healthy of each service and interperter instance
+  private void checkHealthy() {
+    // only leader check cluster healthy
+    if (!clusterManager.isClusterLeader()) {
+      return;
+    }
+
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("checkHealthy()");
+    }
+    Date now = new Date();
+    // check machine mate
+    for (ClusterMetaType metaType : ClusterMetaType.values()) {
+      Map<String, HashMap<String, Object>> clusterMeta
+          = clusterManager.getClusterMeta(metaType, "");
+
+      for (Map.Entry<String, HashMap<String, Object>> entry : 
clusterMeta.entrySet()) {
+        String key = entry.getKey();
+        Map<String, Object> meta = entry.getValue();
+
+        // Metadata that has been offline is not processed
+        String status = (String) meta.get(ClusterMeta.STATUS);
+        if (status.equals(ClusterMeta.OFFLINE_STATUS)) {
+          continue;
+        }
+
+        Object heartbeat = meta.get(ClusterMeta.HEARTBEAT);
+        if (heartbeat instanceof Date) {
+          Date dHeartbeat = (Date) heartbeat;
+          long diff = now.getTime() - dHeartbeat.getTime();
+          if (diff > heartbeatTimeout) {
+            // Set the metadata for the heartbeat timeout to offline
+            // Cannot delete metadata
+            HashMap<String, Object> mapValues = new HashMap<>();
+            mapValues.put(ClusterMeta.STATUS, ClusterMeta.OFFLINE_STATUS);
+            clusterManager.putClusterMeta(metaType, key, mapValues);
+            LOGGER.warn("offline heartbeat timeout[{}] meta[{}]", dHeartbeat, 
key);
+          }
+        } else {
+          LOGGER.error("wrong data type");
+        }
+      }
+    }
+  }
+
+  // The interpreter process sends a heartbeat to the cluster,
+  // indicating that the process is still active.
+  private void sendHeartbeat() {
+    HashMap<String, Object> mapMonitorUtil = new HashMap<>();
+    mapMonitorUtil.put(ClusterMeta.HEARTBEAT, new Date());
+    mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    clusterManager.putClusterMeta(IntpProcessMeta, metaKey, mapMonitorUtil);
+  }
+
+  // send the usage of each service
+  private void sendMachineUsage() {
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("sendMachineUsage >>>");
+    }
+
+    // Limit queue size
+    while (monitorUsageQueues.size() > USAGE_QUEUE_LIMIT) {
+      monitorUsageQueues.poll();
+    }
+    UsageUtil monitorUtil = getMachineUsage();
+    monitorUsageQueues.add(monitorUtil);
+
+    UsageUtil avgMonitorUtil = new UsageUtil();
+    for (UsageUtil monitor : monitorUsageQueues){
+      avgMonitorUtil.memoryUsed += monitor.memoryUsed;
+      avgMonitorUtil.memoryCapacity += monitor.memoryCapacity;
+      avgMonitorUtil.cpuUsed += monitor.cpuUsed;
+      avgMonitorUtil.cpuCapacity += monitor.cpuCapacity;
+    }
+
+    // Resource consumption average
+    int queueSize = monitorUsageQueues.size();
+    avgMonitorUtil.memoryUsed = avgMonitorUtil.memoryUsed / queueSize;
+    avgMonitorUtil.memoryCapacity = avgMonitorUtil.memoryCapacity / queueSize;
+    avgMonitorUtil.cpuUsed = avgMonitorUtil.cpuUsed / queueSize;
+    avgMonitorUtil.cpuCapacity = avgMonitorUtil.cpuCapacity / queueSize;
+
+    HashMap<String, Object> mapMonitorUtil = new HashMap<>();
+    mapMonitorUtil.put(ClusterMeta.MEMORY_USED, avgMonitorUtil.memoryUsed);
+    mapMonitorUtil.put(ClusterMeta.MEMORY_CAPACITY, 
avgMonitorUtil.memoryCapacity);
+    mapMonitorUtil.put(ClusterMeta.CPU_USED, avgMonitorUtil.cpuUsed);
+    mapMonitorUtil.put(ClusterMeta.CPU_CAPACITY, avgMonitorUtil.cpuCapacity);
+    mapMonitorUtil.put(ClusterMeta.HEARTBEAT, new Date());
+    mapMonitorUtil.put(ClusterMeta.STATUS, ClusterMeta.ONLINE_STATUS);
+
+    String clusterName = clusterManager.getClusterNodeName();
+    clusterManager.putClusterMeta(ServerMeta, clusterName, mapMonitorUtil);
+  }
+
+  private UsageUtil getMachineUsage() {
+    OperatingSystemMXBean operatingSystemMXBean
+        = ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
+
+    // Returns the amount of free physical memory in bytes.
+    long freePhysicalMemorySize = 
operatingSystemMXBean.getFreePhysicalMemorySize();
+
+    // Returns the total amount of physical memory in bytes.
+    long totalPhysicalMemorySize = 
operatingSystemMXBean.getTotalPhysicalMemorySize();
+
+    // Returns the "recent cpu usage" for the whole system.
+    double systemCpuLoad = operatingSystemMXBean.getSystemCpuLoad();
+
+    int process = Runtime.getRuntime().availableProcessors();
+
+    UsageUtil monitorUtil = new UsageUtil();
+    monitorUtil.memoryUsed = totalPhysicalMemorySize - freePhysicalMemorySize;
+    monitorUtil.memoryCapacity = totalPhysicalMemorySize;
+    monitorUtil.cpuUsed = (long) (process * systemCpuLoad * 100);
+    monitorUtil.cpuCapacity = process * 100;
+
+    return monitorUtil;
+  }
+
+  private class UsageUtil {
+    private long memoryUsed = 0;
+    private long memoryCapacity = 0;
+    private long cpuUsed = 0;
+    private long cpuCapacity = 0;
+  }
+}
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
index b96e32b..a26007c 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/cluster/meta/ClusterMeta.java
@@ -31,6 +31,9 @@ import java.util.Map;
 public class ClusterMeta implements Serializable {
   private static Logger logger = LoggerFactory.getLogger(ClusterMeta.class);
 
+  // The name of each server node in the cluster
+  public static String NODE_NAME            = "NODE_NAME";
+
   // zeppelin-server meta
   public static String SERVER_HOST          = "SERVER_HOST";
   public static String SERVER_PORT          = "SERVER_PORT";
@@ -39,6 +42,7 @@ public class ClusterMeta implements Serializable {
   public static String SERVER_START_TIME    = "SERVER_START_TIME";
 
   // interperter-process meta
+  public static String INTP_PROCESS_ID      = "INTP_PROCESS_ID";
   public static String INTP_TSERVER_HOST    = "INTP_TSERVER_HOST";
   public static String INTP_TSERVER_PORT    = "INTP_TSERVER_PORT";
   public static String INTP_START_TIME      = "INTP_START_TIME";
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java
new file mode 100644
index 0000000..1381adc
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactoryInterface.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.interpreter;
+
+/**
+ * InterpreterFactory Interface
+ * Provides the interface to the ClusterManagerServer
+ * through the user, nodeId, replName query interpreter
+ * Since the InterpreterFactory is in the zeppelin-zengine module,
+ * the ClusterManagerServer in the zeppelin-interpreter module
+ * cannot access InterpreterFactory#getInterpreter(...),
+ * So access through the interface.
+ */
+public interface InterpreterFactoryInterface {
+  Interpreter getInterpreter(String user,
+                             String noteId,
+                             String replName,
+                             String defaultInterpreterSetting)
+      throws InterpreterNotFoundException;
+}
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterNotFoundException.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterNotFoundException.java
similarity index 100%
rename from 
zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterNotFoundException.java
rename to 
zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterNotFoundException.java
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
new file mode 100644
index 0000000..546d235
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterIntpProcParameters.java
@@ -0,0 +1,938 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2018-9-28")
+public class ClusterIntpProcParameters implements 
org.apache.thrift.TBase<ClusterIntpProcParameters, 
ClusterIntpProcParameters._Fields>, java.io.Serializable, Cloneable, 
Comparable<ClusterIntpProcParameters> {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("ClusterIntpProcParameters");
+
+  private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new 
org.apache.thrift.protocol.TField("host", 
org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new 
org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, 
(short)2);
+  private static final org.apache.thrift.protocol.TField USER_NAME_FIELD_DESC 
= new org.apache.thrift.protocol.TField("userName", 
org.apache.thrift.protocol.TType.STRING, (short)3);
+  private static final org.apache.thrift.protocol.TField NOTE_ID_FIELD_DESC = 
new org.apache.thrift.protocol.TField("noteId", 
org.apache.thrift.protocol.TType.STRING, (short)4);
+  private static final org.apache.thrift.protocol.TField REPL_NAME_FIELD_DESC 
= new org.apache.thrift.protocol.TField("replName", 
org.apache.thrift.protocol.TType.STRING, (short)5);
+  private static final org.apache.thrift.protocol.TField 
DEFAULT_INTERPRETER_SETTING_FIELD_DESC = new 
org.apache.thrift.protocol.TField("defaultInterpreterSetting", 
org.apache.thrift.protocol.TType.STRING, (short)6);
+
+  private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = 
new HashMap<Class<? extends IScheme>, SchemeFactory>();
+  static {
+    schemes.put(StandardScheme.class, new 
ClusterIntpProcParametersStandardSchemeFactory());
+    schemes.put(TupleScheme.class, new 
ClusterIntpProcParametersTupleSchemeFactory());
+  }
+
+  public String host; // required
+  public int port; // required
+  public String userName; // required
+  public String noteId; // required
+  public String replName; // required
+  public String defaultInterpreterSetting; // required
+
+  /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    HOST((short)1, "host"),
+    PORT((short)2, "port"),
+    USER_NAME((short)3, "userName"),
+    NOTE_ID((short)4, "noteId"),
+    REPL_NAME((short)5, "replName"),
+    DEFAULT_INTERPRETER_SETTING((short)6, "defaultInterpreterSetting");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not 
found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // HOST
+          return HOST;
+        case 2: // PORT
+          return PORT;
+        case 3: // USER_NAME
+          return USER_NAME;
+        case 4: // NOTE_ID
+          return NOTE_ID;
+        case 5: // REPL_NAME
+          return REPL_NAME;
+        case 6: // DEFAULT_INTERPRETER_SETTING
+          return DEFAULT_INTERPRETER_SETTING;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __PORT_ISSET_ID = 0;
+  private byte __isset_bitfield = 0;
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> 
metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.HOST, new 
org.apache.thrift.meta_data.FieldMetaData("host", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PORT, new 
org.apache.thrift.meta_data.FieldMetaData("port", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.USER_NAME, new 
org.apache.thrift.meta_data.FieldMetaData("userName", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.NOTE_ID, new 
org.apache.thrift.meta_data.FieldMetaData("noteId", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.REPL_NAME, new 
org.apache.thrift.meta_data.FieldMetaData("replName", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.DEFAULT_INTERPRETER_SETTING, new 
org.apache.thrift.meta_data.FieldMetaData("defaultInterpreterSetting", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+        new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterIntpProcParameters.class,
 metaDataMap);
+  }
+
+  public ClusterIntpProcParameters() {
+  }
+
+  public ClusterIntpProcParameters(
+    String host,
+    int port,
+    String userName,
+    String noteId,
+    String replName,
+    String defaultInterpreterSetting)
+  {
+    this();
+    this.host = host;
+    this.port = port;
+    setPortIsSet(true);
+    this.userName = userName;
+    this.noteId = noteId;
+    this.replName = replName;
+    this.defaultInterpreterSetting = defaultInterpreterSetting;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public ClusterIntpProcParameters(ClusterIntpProcParameters other) {
+    __isset_bitfield = other.__isset_bitfield;
+    if (other.isSetHost()) {
+      this.host = other.host;
+    }
+    this.port = other.port;
+    if (other.isSetUserName()) {
+      this.userName = other.userName;
+    }
+    if (other.isSetNoteId()) {
+      this.noteId = other.noteId;
+    }
+    if (other.isSetReplName()) {
+      this.replName = other.replName;
+    }
+    if (other.isSetDefaultInterpreterSetting()) {
+      this.defaultInterpreterSetting = other.defaultInterpreterSetting;
+    }
+  }
+
+  public ClusterIntpProcParameters deepCopy() {
+    return new ClusterIntpProcParameters(this);
+  }
+
+  @Override
+  public void clear() {
+    this.host = null;
+    setPortIsSet(false);
+    this.port = 0;
+    this.userName = null;
+    this.noteId = null;
+    this.replName = null;
+    this.defaultInterpreterSetting = null;
+  }
+
+  public String getHost() {
+    return this.host;
+  }
+
+  public ClusterIntpProcParameters setHost(String host) {
+    this.host = host;
+    return this;
+  }
+
+  public void unsetHost() {
+    this.host = null;
+  }
+
+  /** Returns true if field host is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetHost() {
+    return this.host != null;
+  }
+
+  public void setHostIsSet(boolean value) {
+    if (!value) {
+      this.host = null;
+    }
+  }
+
+  public int getPort() {
+    return this.port;
+  }
+
+  public ClusterIntpProcParameters setPort(int port) {
+    this.port = port;
+    setPortIsSet(true);
+    return this;
+  }
+
+  public void unsetPort() {
+    __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__PORT_ISSET_ID);
+  }
+
+  /** Returns true if field port is set (has been assigned a value) and false 
otherwise */
+  public boolean isSetPort() {
+    return EncodingUtils.testBit(__isset_bitfield, __PORT_ISSET_ID);
+  }
+
+  public void setPortIsSet(boolean value) {
+    __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __PORT_ISSET_ID, 
value);
+  }
+
+  public String getUserName() {
+    return this.userName;
+  }
+
+  public ClusterIntpProcParameters setUserName(String userName) {
+    this.userName = userName;
+    return this;
+  }
+
+  public void unsetUserName() {
+    this.userName = null;
+  }
+
+  /** Returns true if field userName is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetUserName() {
+    return this.userName != null;
+  }
+
+  public void setUserNameIsSet(boolean value) {
+    if (!value) {
+      this.userName = null;
+    }
+  }
+
+  public String getNoteId() {
+    return this.noteId;
+  }
+
+  public ClusterIntpProcParameters setNoteId(String noteId) {
+    this.noteId = noteId;
+    return this;
+  }
+
+  public void unsetNoteId() {
+    this.noteId = null;
+  }
+
+  /** Returns true if field noteId is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetNoteId() {
+    return this.noteId != null;
+  }
+
+  public void setNoteIdIsSet(boolean value) {
+    if (!value) {
+      this.noteId = null;
+    }
+  }
+
+  public String getReplName() {
+    return this.replName;
+  }
+
+  public ClusterIntpProcParameters setReplName(String replName) {
+    this.replName = replName;
+    return this;
+  }
+
+  public void unsetReplName() {
+    this.replName = null;
+  }
+
+  /** Returns true if field replName is set (has been assigned a value) and 
false otherwise */
+  public boolean isSetReplName() {
+    return this.replName != null;
+  }
+
+  public void setReplNameIsSet(boolean value) {
+    if (!value) {
+      this.replName = null;
+    }
+  }
+
+  public String getDefaultInterpreterSetting() {
+    return this.defaultInterpreterSetting;
+  }
+
+  public ClusterIntpProcParameters setDefaultInterpreterSetting(String 
defaultInterpreterSetting) {
+    this.defaultInterpreterSetting = defaultInterpreterSetting;
+    return this;
+  }
+
+  public void unsetDefaultInterpreterSetting() {
+    this.defaultInterpreterSetting = null;
+  }
+
+  /** Returns true if field defaultInterpreterSetting is set (has been 
assigned a value) and false otherwise */
+  public boolean isSetDefaultInterpreterSetting() {
+    return this.defaultInterpreterSetting != null;
+  }
+
+  public void setDefaultInterpreterSettingIsSet(boolean value) {
+    if (!value) {
+      this.defaultInterpreterSetting = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case HOST:
+      if (value == null) {
+        unsetHost();
+      } else {
+        setHost((String)value);
+      }
+      break;
+
+    case PORT:
+      if (value == null) {
+        unsetPort();
+      } else {
+        setPort((Integer)value);
+      }
+      break;
+
+    case USER_NAME:
+      if (value == null) {
+        unsetUserName();
+      } else {
+        setUserName((String)value);
+      }
+      break;
+
+    case NOTE_ID:
+      if (value == null) {
+        unsetNoteId();
+      } else {
+        setNoteId((String)value);
+      }
+      break;
+
+    case REPL_NAME:
+      if (value == null) {
+        unsetReplName();
+      } else {
+        setReplName((String)value);
+      }
+      break;
+
+    case DEFAULT_INTERPRETER_SETTING:
+      if (value == null) {
+        unsetDefaultInterpreterSetting();
+      } else {
+        setDefaultInterpreterSetting((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case HOST:
+      return getHost();
+
+    case PORT:
+      return Integer.valueOf(getPort());
+
+    case USER_NAME:
+      return getUserName();
+
+    case NOTE_ID:
+      return getNoteId();
+
+    case REPL_NAME:
+      return getReplName();
+
+    case DEFAULT_INTERPRETER_SETTING:
+      return getDefaultInterpreterSetting();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned 
a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case HOST:
+      return isSetHost();
+    case PORT:
+      return isSetPort();
+    case USER_NAME:
+      return isSetUserName();
+    case NOTE_ID:
+      return isSetNoteId();
+    case REPL_NAME:
+      return isSetReplName();
+    case DEFAULT_INTERPRETER_SETTING:
+      return isSetDefaultInterpreterSetting();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof ClusterIntpProcParameters)
+      return this.equals((ClusterIntpProcParameters)that);
+    return false;
+  }
+
+  public boolean equals(ClusterIntpProcParameters that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_host = true && this.isSetHost();
+    boolean that_present_host = true && that.isSetHost();
+    if (this_present_host || that_present_host) {
+      if (!(this_present_host && that_present_host))
+        return false;
+      if (!this.host.equals(that.host))
+        return false;
+    }
+
+    boolean this_present_port = true;
+    boolean that_present_port = true;
+    if (this_present_port || that_present_port) {
+      if (!(this_present_port && that_present_port))
+        return false;
+      if (this.port != that.port)
+        return false;
+    }
+
+    boolean this_present_userName = true && this.isSetUserName();
+    boolean that_present_userName = true && that.isSetUserName();
+    if (this_present_userName || that_present_userName) {
+      if (!(this_present_userName && that_present_userName))
+        return false;
+      if (!this.userName.equals(that.userName))
+        return false;
+    }
+
+    boolean this_present_noteId = true && this.isSetNoteId();
+    boolean that_present_noteId = true && that.isSetNoteId();
+    if (this_present_noteId || that_present_noteId) {
+      if (!(this_present_noteId && that_present_noteId))
+        return false;
+      if (!this.noteId.equals(that.noteId))
+        return false;
+    }
+
+    boolean this_present_replName = true && this.isSetReplName();
+    boolean that_present_replName = true && that.isSetReplName();
+    if (this_present_replName || that_present_replName) {
+      if (!(this_present_replName && that_present_replName))
+        return false;
+      if (!this.replName.equals(that.replName))
+        return false;
+    }
+
+    boolean this_present_defaultInterpreterSetting = true && 
this.isSetDefaultInterpreterSetting();
+    boolean that_present_defaultInterpreterSetting = true && 
that.isSetDefaultInterpreterSetting();
+    if (this_present_defaultInterpreterSetting || 
that_present_defaultInterpreterSetting) {
+      if (!(this_present_defaultInterpreterSetting && 
that_present_defaultInterpreterSetting))
+        return false;
+      if 
(!this.defaultInterpreterSetting.equals(that.defaultInterpreterSetting))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    List<Object> list = new ArrayList<Object>();
+
+    boolean present_host = true && (isSetHost());
+    list.add(present_host);
+    if (present_host)
+      list.add(host);
+
+    boolean present_port = true;
+    list.add(present_port);
+    if (present_port)
+      list.add(port);
+
+    boolean present_userName = true && (isSetUserName());
+    list.add(present_userName);
+    if (present_userName)
+      list.add(userName);
+
+    boolean present_noteId = true && (isSetNoteId());
+    list.add(present_noteId);
+    if (present_noteId)
+      list.add(noteId);
+
+    boolean present_replName = true && (isSetReplName());
+    list.add(present_replName);
+    if (present_replName)
+      list.add(replName);
+
+    boolean present_defaultInterpreterSetting = true && 
(isSetDefaultInterpreterSetting());
+    list.add(present_defaultInterpreterSetting);
+    if (present_defaultInterpreterSetting)
+      list.add(defaultInterpreterSetting);
+
+    return list.hashCode();
+  }
+
+  @Override
+  public int compareTo(ClusterIntpProcParameters other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+
+    lastComparison = Boolean.valueOf(isSetHost()).compareTo(other.isSetHost());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetHost()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, 
other.host);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(isSetPort()).compareTo(other.isSetPort());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetPort()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, 
other.port);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(isSetUserName()).compareTo(other.isSetUserName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetUserName()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.userName, 
other.userName);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(isSetNoteId()).compareTo(other.isSetNoteId());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetNoteId()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.noteId, 
other.noteId);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(isSetReplName()).compareTo(other.isSetReplName());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetReplName()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.replName, 
other.replName);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = 
Boolean.valueOf(isSetDefaultInterpreterSetting()).compareTo(other.isSetDefaultInterpreterSetting());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetDefaultInterpreterSetting()) {
+      lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.defaultInterpreterSetting, 
other.defaultInterpreterSetting);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+    schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+    schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("ClusterIntpProcParameters(");
+    boolean first = true;
+
+    sb.append("host:");
+    if (this.host == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.host);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("port:");
+    sb.append(this.port);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("userName:");
+    if (this.userName == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.userName);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("noteId:");
+    if (this.noteId == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.noteId);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("replName:");
+    if (this.replName == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.replName);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("defaultInterpreterSetting:");
+    if (this.defaultInterpreterSetting == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.defaultInterpreterSetting);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    // check for sub-struct validity
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+      __isset_bitfield = 0;
+      read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private static class ClusterIntpProcParametersStandardSchemeFactory 
implements SchemeFactory {
+    public ClusterIntpProcParametersStandardScheme getScheme() {
+      return new ClusterIntpProcParametersStandardScheme();
+    }
+  }
+
+  private static class ClusterIntpProcParametersStandardScheme extends 
StandardScheme<ClusterIntpProcParameters> {
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot, 
ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
+      org.apache.thrift.protocol.TField schemeField;
+      iprot.readStructBegin();
+      while (true)
+      {
+        schemeField = iprot.readFieldBegin();
+        if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+          break;
+        }
+        switch (schemeField.id) {
+          case 1: // HOST
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.host = iprot.readString();
+              struct.setHostIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 2: // PORT
+            if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
+              struct.port = iprot.readI32();
+              struct.setPortIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 3: // USER_NAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.userName = iprot.readString();
+              struct.setUserNameIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 4: // NOTE_ID
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.noteId = iprot.readString();
+              struct.setNoteIdIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 5: // REPL_NAME
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.replName = iprot.readString();
+              struct.setReplNameIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          case 6: // DEFAULT_INTERPRETER_SETTING
+            if (schemeField.type == org.apache.thrift.protocol.TType.STRING) {
+              struct.defaultInterpreterSetting = iprot.readString();
+              struct.setDefaultInterpreterSettingIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+            }
+            break;
+          default:
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+        }
+        iprot.readFieldEnd();
+      }
+      iprot.readStructEnd();
+
+      // check for required fields of primitive type, which can't be checked 
in the validate method
+      struct.validate();
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot, 
ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
+      struct.validate();
+
+      oprot.writeStructBegin(STRUCT_DESC);
+      if (struct.host != null) {
+        oprot.writeFieldBegin(HOST_FIELD_DESC);
+        oprot.writeString(struct.host);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldBegin(PORT_FIELD_DESC);
+      oprot.writeI32(struct.port);
+      oprot.writeFieldEnd();
+      if (struct.userName != null) {
+        oprot.writeFieldBegin(USER_NAME_FIELD_DESC);
+        oprot.writeString(struct.userName);
+        oprot.writeFieldEnd();
+      }
+      if (struct.noteId != null) {
+        oprot.writeFieldBegin(NOTE_ID_FIELD_DESC);
+        oprot.writeString(struct.noteId);
+        oprot.writeFieldEnd();
+      }
+      if (struct.replName != null) {
+        oprot.writeFieldBegin(REPL_NAME_FIELD_DESC);
+        oprot.writeString(struct.replName);
+        oprot.writeFieldEnd();
+      }
+      if (struct.defaultInterpreterSetting != null) {
+        oprot.writeFieldBegin(DEFAULT_INTERPRETER_SETTING_FIELD_DESC);
+        oprot.writeString(struct.defaultInterpreterSetting);
+        oprot.writeFieldEnd();
+      }
+      oprot.writeFieldStop();
+      oprot.writeStructEnd();
+    }
+
+  }
+
+  private static class ClusterIntpProcParametersTupleSchemeFactory implements 
SchemeFactory {
+    public ClusterIntpProcParametersTupleScheme getScheme() {
+      return new ClusterIntpProcParametersTupleScheme();
+    }
+  }
+
+  private static class ClusterIntpProcParametersTupleScheme extends 
TupleScheme<ClusterIntpProcParameters> {
+
+    @Override
+    public void write(org.apache.thrift.protocol.TProtocol prot, 
ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
+      TTupleProtocol oprot = (TTupleProtocol) prot;
+      BitSet optionals = new BitSet();
+      if (struct.isSetHost()) {
+        optionals.set(0);
+      }
+      if (struct.isSetPort()) {
+        optionals.set(1);
+      }
+      if (struct.isSetUserName()) {
+        optionals.set(2);
+      }
+      if (struct.isSetNoteId()) {
+        optionals.set(3);
+      }
+      if (struct.isSetReplName()) {
+        optionals.set(4);
+      }
+      if (struct.isSetDefaultInterpreterSetting()) {
+        optionals.set(5);
+      }
+      oprot.writeBitSet(optionals, 6);
+      if (struct.isSetHost()) {
+        oprot.writeString(struct.host);
+      }
+      if (struct.isSetPort()) {
+        oprot.writeI32(struct.port);
+      }
+      if (struct.isSetUserName()) {
+        oprot.writeString(struct.userName);
+      }
+      if (struct.isSetNoteId()) {
+        oprot.writeString(struct.noteId);
+      }
+      if (struct.isSetReplName()) {
+        oprot.writeString(struct.replName);
+      }
+      if (struct.isSetDefaultInterpreterSetting()) {
+        oprot.writeString(struct.defaultInterpreterSetting);
+      }
+    }
+
+    @Override
+    public void read(org.apache.thrift.protocol.TProtocol prot, 
ClusterIntpProcParameters struct) throws org.apache.thrift.TException {
+      TTupleProtocol iprot = (TTupleProtocol) prot;
+      BitSet incoming = iprot.readBitSet(6);
+      if (incoming.get(0)) {
+        struct.host = iprot.readString();
+        struct.setHostIsSet(true);
+      }
+      if (incoming.get(1)) {
+        struct.port = iprot.readI32();
+        struct.setPortIsSet(true);
+      }
+      if (incoming.get(2)) {
+        struct.userName = iprot.readString();
+        struct.setUserNameIsSet(true);
+      }
+      if (incoming.get(3)) {
+        struct.noteId = iprot.readString();
+        struct.setNoteIdIsSet(true);
+      }
+      if (incoming.get(4)) {
+        struct.replName = iprot.readString();
+        struct.setReplNameIsSet(true);
+      }
+      if (incoming.get(5)) {
+        struct.defaultInterpreterSetting = iprot.readString();
+        struct.setDefaultInterpreterSettingIsSet(true);
+      }
+    }
+  }
+
+}
+
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
new file mode 100644
index 0000000..1e508dd
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/ClusterManagerService.java
@@ -0,0 +1,998 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.zeppelin.interpreter.thrift;
+
+import org.apache.thrift.scheme.IScheme;
+import org.apache.thrift.scheme.SchemeFactory;
+import org.apache.thrift.scheme.StandardScheme;
+
+import org.apache.thrift.scheme.TupleScheme;
+import org.apache.thrift.protocol.TTupleProtocol;
+import org.apache.thrift.protocol.TProtocolException;
+import org.apache.thrift.EncodingUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.async.AsyncMethodCallback;
+import org.apache.thrift.server.AbstractNonblockingServer.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import javax.annotation.Generated;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = 
"2018-9-28")
+public class ClusterManagerService {
+
+  public interface Iface {
+
+    public boolean createClusterInterpreterProcess(ClusterIntpProcParameters 
intpProcParameters) throws org.apache.thrift.TException;
+
+  }
+
+  public interface AsyncIface {
+
+    public void createClusterInterpreterProcess(ClusterIntpProcParameters 
intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler) 
throws org.apache.thrift.TException;
+
+  }
+
+  public static class Client extends org.apache.thrift.TServiceClient 
implements Iface {
+    public static class Factory implements 
org.apache.thrift.TServiceClientFactory<Client> {
+      public Factory() {}
+      public Client getClient(org.apache.thrift.protocol.TProtocol prot) {
+        return new Client(prot);
+      }
+      public Client getClient(org.apache.thrift.protocol.TProtocol iprot, 
org.apache.thrift.protocol.TProtocol oprot) {
+        return new Client(iprot, oprot);
+      }
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol prot)
+    {
+      super(prot, prot);
+    }
+
+    public Client(org.apache.thrift.protocol.TProtocol iprot, 
org.apache.thrift.protocol.TProtocol oprot) {
+      super(iprot, oprot);
+    }
+
+    public boolean createClusterInterpreterProcess(ClusterIntpProcParameters 
intpProcParameters) throws org.apache.thrift.TException
+    {
+      send_createClusterInterpreterProcess(intpProcParameters);
+      return recv_createClusterInterpreterProcess();
+    }
+
+    public void send_createClusterInterpreterProcess(ClusterIntpProcParameters 
intpProcParameters) throws org.apache.thrift.TException
+    {
+      createClusterInterpreterProcess_args args = new 
createClusterInterpreterProcess_args();
+      args.setIntpProcParameters(intpProcParameters);
+      sendBase("createClusterInterpreterProcess", args);
+    }
+
+    public boolean recv_createClusterInterpreterProcess() throws 
org.apache.thrift.TException
+    {
+      createClusterInterpreterProcess_result result = new 
createClusterInterpreterProcess_result();
+      receiveBase(result, "createClusterInterpreterProcess");
+      if (result.isSetSuccess()) {
+        return result.success;
+      }
+      throw new 
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.MISSING_RESULT,
 "createClusterInterpreterProcess failed: unknown result");
+    }
+
+  }
+  public static class AsyncClient extends org.apache.thrift.async.TAsyncClient 
implements AsyncIface {
+    public static class Factory implements 
org.apache.thrift.async.TAsyncClientFactory<AsyncClient> {
+      private org.apache.thrift.async.TAsyncClientManager clientManager;
+      private org.apache.thrift.protocol.TProtocolFactory protocolFactory;
+      public Factory(org.apache.thrift.async.TAsyncClientManager 
clientManager, org.apache.thrift.protocol.TProtocolFactory protocolFactory) {
+        this.clientManager = clientManager;
+        this.protocolFactory = protocolFactory;
+      }
+      public AsyncClient 
getAsyncClient(org.apache.thrift.transport.TNonblockingTransport transport) {
+        return new AsyncClient(protocolFactory, clientManager, transport);
+      }
+    }
+
+    public AsyncClient(org.apache.thrift.protocol.TProtocolFactory 
protocolFactory, org.apache.thrift.async.TAsyncClientManager clientManager, 
org.apache.thrift.transport.TNonblockingTransport transport) {
+      super(protocolFactory, clientManager, transport);
+    }
+
+    public void createClusterInterpreterProcess(ClusterIntpProcParameters 
intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler) 
throws org.apache.thrift.TException {
+      checkReady();
+      createClusterInterpreterProcess_call method_call = new 
createClusterInterpreterProcess_call(intpProcParameters, resultHandler, this, 
___protocolFactory, ___transport);
+      this.___currentMethod = method_call;
+      ___manager.call(method_call);
+    }
+
+    public static class createClusterInterpreterProcess_call extends 
org.apache.thrift.async.TAsyncMethodCall {
+      private ClusterIntpProcParameters intpProcParameters;
+      public createClusterInterpreterProcess_call(ClusterIntpProcParameters 
intpProcParameters, org.apache.thrift.async.AsyncMethodCallback resultHandler, 
org.apache.thrift.async.TAsyncClient client, 
org.apache.thrift.protocol.TProtocolFactory protocolFactory, 
org.apache.thrift.transport.TNonblockingTransport transport) throws 
org.apache.thrift.TException {
+        super(client, protocolFactory, transport, resultHandler, false);
+        this.intpProcParameters = intpProcParameters;
+      }
+
+      public void write_args(org.apache.thrift.protocol.TProtocol prot) throws 
org.apache.thrift.TException {
+        prot.writeMessageBegin(new 
org.apache.thrift.protocol.TMessage("createClusterInterpreterProcess", 
org.apache.thrift.protocol.TMessageType.CALL, 0));
+        createClusterInterpreterProcess_args args = new 
createClusterInterpreterProcess_args();
+        args.setIntpProcParameters(intpProcParameters);
+        args.write(prot);
+        prot.writeMessageEnd();
+      }
+
+      public boolean getResult() throws org.apache.thrift.TException {
+        if (getState() != 
org.apache.thrift.async.TAsyncMethodCall.State.RESPONSE_READ) {
+          throw new IllegalStateException("Method call not finished!");
+        }
+        org.apache.thrift.transport.TMemoryInputTransport memoryTransport = 
new org.apache.thrift.transport.TMemoryInputTransport(getFrameBuffer().array());
+        org.apache.thrift.protocol.TProtocol prot = 
client.getProtocolFactory().getProtocol(memoryTransport);
+        return (new Client(prot)).recv_createClusterInterpreterProcess();
+      }
+    }
+
+  }
+
+  public static class Processor<I extends Iface> extends 
org.apache.thrift.TBaseProcessor<I> implements org.apache.thrift.TProcessor {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(Processor.class.getName());
+    public Processor(I iface) {
+      super(iface, getProcessMap(new HashMap<String, 
org.apache.thrift.ProcessFunction<I, ? extends org.apache.thrift.TBase>>()));
+    }
+
+    protected Processor(I iface, Map<String,  
org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> 
processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends Iface> Map<String,  
org.apache.thrift.ProcessFunction<I, ? extends  org.apache.thrift.TBase>> 
getProcessMap(Map<String,  org.apache.thrift.ProcessFunction<I, ? extends  
org.apache.thrift.TBase>> processMap) {
+      processMap.put("createClusterInterpreterProcess", new 
createClusterInterpreterProcess());
+      return processMap;
+    }
+
+    public static class createClusterInterpreterProcess<I extends Iface> 
extends org.apache.thrift.ProcessFunction<I, 
createClusterInterpreterProcess_args> {
+      public createClusterInterpreterProcess() {
+        super("createClusterInterpreterProcess");
+      }
+
+      public createClusterInterpreterProcess_args getEmptyArgsInstance() {
+        return new createClusterInterpreterProcess_args();
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public createClusterInterpreterProcess_result getResult(I iface, 
createClusterInterpreterProcess_args args) throws org.apache.thrift.TException {
+        createClusterInterpreterProcess_result result = new 
createClusterInterpreterProcess_result();
+        result.success = 
iface.createClusterInterpreterProcess(args.intpProcParameters);
+        result.setSuccessIsSet(true);
+        return result;
+      }
+    }
+
+  }
+
+  public static class AsyncProcessor<I extends AsyncIface> extends 
org.apache.thrift.TBaseAsyncProcessor<I> {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncProcessor.class.getName());
+    public AsyncProcessor(I iface) {
+      super(iface, getProcessMap(new HashMap<String, 
org.apache.thrift.AsyncProcessFunction<I, ? extends org.apache.thrift.TBase, 
?>>()));
+    }
+
+    protected AsyncProcessor(I iface, Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, 
?>> processMap) {
+      super(iface, getProcessMap(processMap));
+    }
+
+    private static <I extends AsyncIface> Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  
org.apache.thrift.TBase,?>> getProcessMap(Map<String,  
org.apache.thrift.AsyncProcessFunction<I, ? extends  org.apache.thrift.TBase, 
?>> processMap) {
+      processMap.put("createClusterInterpreterProcess", new 
createClusterInterpreterProcess());
+      return processMap;
+    }
+
+    public static class createClusterInterpreterProcess<I extends AsyncIface> 
extends org.apache.thrift.AsyncProcessFunction<I, 
createClusterInterpreterProcess_args, Boolean> {
+      public createClusterInterpreterProcess() {
+        super("createClusterInterpreterProcess");
+      }
+
+      public createClusterInterpreterProcess_args getEmptyArgsInstance() {
+        return new createClusterInterpreterProcess_args();
+      }
+
+      public AsyncMethodCallback<Boolean> getResultHandler(final 
AsyncFrameBuffer fb, final int seqid) {
+        final org.apache.thrift.AsyncProcessFunction fcall = this;
+        return new AsyncMethodCallback<Boolean>() { 
+          public void onComplete(Boolean o) {
+            createClusterInterpreterProcess_result result = new 
createClusterInterpreterProcess_result();
+            result.success = o;
+            result.setSuccessIsSet(true);
+            try {
+              fcall.sendResponse(fb,result, 
org.apache.thrift.protocol.TMessageType.REPLY,seqid);
+              return;
+            } catch (Exception e) {
+              LOGGER.error("Exception writing to internal frame buffer", e);
+            }
+            fb.close();
+          }
+          public void onError(Exception e) {
+            byte msgType = org.apache.thrift.protocol.TMessageType.REPLY;
+            org.apache.thrift.TBase msg;
+            createClusterInterpreterProcess_result result = new 
createClusterInterpreterProcess_result();
+            {
+              msgType = org.apache.thrift.protocol.TMessageType.EXCEPTION;
+              msg = (org.apache.thrift.TBase)new 
org.apache.thrift.TApplicationException(org.apache.thrift.TApplicationException.INTERNAL_ERROR,
 e.getMessage());
+            }
+            try {
+              fcall.sendResponse(fb,msg,msgType,seqid);
+              return;
+            } catch (Exception ex) {
+              LOGGER.error("Exception writing to internal frame buffer", ex);
+            }
+            fb.close();
+          }
+        };
+      }
+
+      protected boolean isOneway() {
+        return false;
+      }
+
+      public void start(I iface, createClusterInterpreterProcess_args args, 
org.apache.thrift.async.AsyncMethodCallback<Boolean> resultHandler) throws 
TException {
+        
iface.createClusterInterpreterProcess(args.intpProcParameters,resultHandler);
+      }
+    }
+
+  }
+
+  public static class createClusterInterpreterProcess_args implements 
org.apache.thrift.TBase<createClusterInterpreterProcess_args, 
createClusterInterpreterProcess_args._Fields>, java.io.Serializable, Cloneable, 
Comparable<createClusterInterpreterProcess_args>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_args");
+
+    private static final org.apache.thrift.protocol.TField 
INTP_PROC_PARAMETERS_FIELD_DESC = new 
org.apache.thrift.protocol.TField("intpProcParameters", 
org.apache.thrift.protocol.TType.STRUCT, (short)1);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new 
createClusterInterpreterProcess_argsStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new 
createClusterInterpreterProcess_argsTupleSchemeFactory());
+    }
+
+    public ClusterIntpProcParameters intpProcParameters; // required
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      INTP_PROC_PARAMETERS((short)1, "intpProcParameters");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 1: // INTP_PROC_PARAMETERS
+            return INTP_PROC_PARAMETERS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    public static final Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.INTP_PROC_PARAMETERS, new 
org.apache.thrift.meta_data.FieldMetaData("intpProcParameters", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
 ClusterIntpProcParameters.class)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_args.class,
 metaDataMap);
+    }
+
+    public createClusterInterpreterProcess_args() {
+    }
+
+    public createClusterInterpreterProcess_args(
+      ClusterIntpProcParameters intpProcParameters)
+    {
+      this();
+      this.intpProcParameters = intpProcParameters;
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public 
createClusterInterpreterProcess_args(createClusterInterpreterProcess_args 
other) {
+      if (other.isSetIntpProcParameters()) {
+        this.intpProcParameters = new 
ClusterIntpProcParameters(other.intpProcParameters);
+      }
+    }
+
+    public createClusterInterpreterProcess_args deepCopy() {
+      return new createClusterInterpreterProcess_args(this);
+    }
+
+    @Override
+    public void clear() {
+      this.intpProcParameters = null;
+    }
+
+    public ClusterIntpProcParameters getIntpProcParameters() {
+      return this.intpProcParameters;
+    }
+
+    public createClusterInterpreterProcess_args 
setIntpProcParameters(ClusterIntpProcParameters intpProcParameters) {
+      this.intpProcParameters = intpProcParameters;
+      return this;
+    }
+
+    public void unsetIntpProcParameters() {
+      this.intpProcParameters = null;
+    }
+
+    /** Returns true if field intpProcParameters is set (has been assigned a 
value) and false otherwise */
+    public boolean isSetIntpProcParameters() {
+      return this.intpProcParameters != null;
+    }
+
+    public void setIntpProcParametersIsSet(boolean value) {
+      if (!value) {
+        this.intpProcParameters = null;
+      }
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case INTP_PROC_PARAMETERS:
+        if (value == null) {
+          unsetIntpProcParameters();
+        } else {
+          setIntpProcParameters((ClusterIntpProcParameters)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case INTP_PROC_PARAMETERS:
+        return getIntpProcParameters();
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case INTP_PROC_PARAMETERS:
+        return isSetIntpProcParameters();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof createClusterInterpreterProcess_args)
+        return this.equals((createClusterInterpreterProcess_args)that);
+      return false;
+    }
+
+    public boolean equals(createClusterInterpreterProcess_args that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_intpProcParameters = true && 
this.isSetIntpProcParameters();
+      boolean that_present_intpProcParameters = true && 
that.isSetIntpProcParameters();
+      if (this_present_intpProcParameters || that_present_intpProcParameters) {
+        if (!(this_present_intpProcParameters && 
that_present_intpProcParameters))
+          return false;
+        if (!this.intpProcParameters.equals(that.intpProcParameters))
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_intpProcParameters = true && (isSetIntpProcParameters());
+      list.add(present_intpProcParameters);
+      if (present_intpProcParameters)
+        list.add(intpProcParameters);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(createClusterInterpreterProcess_args other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = 
Boolean.valueOf(isSetIntpProcParameters()).compareTo(other.isSetIntpProcParameters());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetIntpProcParameters()) {
+        lastComparison = 
org.apache.thrift.TBaseHelper.compareTo(this.intpProcParameters, 
other.intpProcParameters);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new 
StringBuilder("createClusterInterpreterProcess_args(");
+      boolean first = true;
+
+      sb.append("intpProcParameters:");
+      if (this.intpProcParameters == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.intpProcParameters);
+      }
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+      if (intpProcParameters != null) {
+        intpProcParameters.validate();
+      }
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+      try {
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class 
createClusterInterpreterProcess_argsStandardSchemeFactory implements 
SchemeFactory {
+      public createClusterInterpreterProcess_argsStandardScheme getScheme() {
+        return new createClusterInterpreterProcess_argsStandardScheme();
+      }
+    }
+
+    private static class createClusterInterpreterProcess_argsStandardScheme 
extends StandardScheme<createClusterInterpreterProcess_args> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
createClusterInterpreterProcess_args struct) throws 
org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 1: // INTP_PROC_PARAMETERS
+              if (schemeField.type == org.apache.thrift.protocol.TType.STRUCT) 
{
+                struct.intpProcParameters = new ClusterIntpProcParameters();
+                struct.intpProcParameters.read(iprot);
+                struct.setIntpProcParametersIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked 
in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
createClusterInterpreterProcess_args struct) throws 
org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.intpProcParameters != null) {
+          oprot.writeFieldBegin(INTP_PROC_PARAMETERS_FIELD_DESC);
+          struct.intpProcParameters.write(oprot);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class 
createClusterInterpreterProcess_argsTupleSchemeFactory implements SchemeFactory 
{
+      public createClusterInterpreterProcess_argsTupleScheme getScheme() {
+        return new createClusterInterpreterProcess_argsTupleScheme();
+      }
+    }
+
+    private static class createClusterInterpreterProcess_argsTupleScheme 
extends TupleScheme<createClusterInterpreterProcess_args> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
createClusterInterpreterProcess_args struct) throws 
org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetIntpProcParameters()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetIntpProcParameters()) {
+          struct.intpProcParameters.write(oprot);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
createClusterInterpreterProcess_args struct) throws 
org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.intpProcParameters = new ClusterIntpProcParameters();
+          struct.intpProcParameters.read(iprot);
+          struct.setIntpProcParametersIsSet(true);
+        }
+      }
+    }
+
+  }
+
+  public static class createClusterInterpreterProcess_result implements 
org.apache.thrift.TBase<createClusterInterpreterProcess_result, 
createClusterInterpreterProcess_result._Fields>, java.io.Serializable, 
Cloneable, Comparable<createClusterInterpreterProcess_result>   {
+    private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new 
org.apache.thrift.protocol.TStruct("createClusterInterpreterProcess_result");
+
+    private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC 
= new org.apache.thrift.protocol.TField("success", 
org.apache.thrift.protocol.TType.BOOL, (short)0);
+
+    private static final Map<Class<? extends IScheme>, SchemeFactory> schemes 
= new HashMap<Class<? extends IScheme>, SchemeFactory>();
+    static {
+      schemes.put(StandardScheme.class, new 
createClusterInterpreterProcess_resultStandardSchemeFactory());
+      schemes.put(TupleScheme.class, new 
createClusterInterpreterProcess_resultTupleSchemeFactory());
+    }
+
+    public boolean success; // required
+
+    /** The set of fields this struct contains, along with convenience methods 
for finding and manipulating them. */
+    public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+      SUCCESS((short)0, "success");
+
+      private static final Map<String, _Fields> byName = new HashMap<String, 
_Fields>();
+
+      static {
+        for (_Fields field : EnumSet.allOf(_Fields.class)) {
+          byName.put(field.getFieldName(), field);
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, or null if its not 
found.
+       */
+      public static _Fields findByThriftId(int fieldId) {
+        switch(fieldId) {
+          case 0: // SUCCESS
+            return SUCCESS;
+          default:
+            return null;
+        }
+      }
+
+      /**
+       * Find the _Fields constant that matches fieldId, throwing an exception
+       * if it is not found.
+       */
+      public static _Fields findByThriftIdOrThrow(int fieldId) {
+        _Fields fields = findByThriftId(fieldId);
+        if (fields == null) throw new IllegalArgumentException("Field " + 
fieldId + " doesn't exist!");
+        return fields;
+      }
+
+      /**
+       * Find the _Fields constant that matches name, or null if its not found.
+       */
+      public static _Fields findByName(String name) {
+        return byName.get(name);
+      }
+
+      private final short _thriftId;
+      private final String _fieldName;
+
+      _Fields(short thriftId, String fieldName) {
+        _thriftId = thriftId;
+        _fieldName = fieldName;
+      }
+
+      public short getThriftFieldId() {
+        return _thriftId;
+      }
+
+      public String getFieldName() {
+        return _fieldName;
+      }
+    }
+
+    // isset id assignments
+    private static final int __SUCCESS_ISSET_ID = 0;
+    private byte __isset_bitfield = 0;
+    public static final Map<_Fields, 
org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+    static {
+      Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new 
EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+      tmpMap.put(_Fields.SUCCESS, new 
org.apache.thrift.meta_data.FieldMetaData("success", 
org.apache.thrift.TFieldRequirementType.DEFAULT, 
+          new 
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+      metaDataMap = Collections.unmodifiableMap(tmpMap);
+      
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(createClusterInterpreterProcess_result.class,
 metaDataMap);
+    }
+
+    public createClusterInterpreterProcess_result() {
+    }
+
+    public createClusterInterpreterProcess_result(
+      boolean success)
+    {
+      this();
+      this.success = success;
+      setSuccessIsSet(true);
+    }
+
+    /**
+     * Performs a deep copy on <i>other</i>.
+     */
+    public 
createClusterInterpreterProcess_result(createClusterInterpreterProcess_result 
other) {
+      __isset_bitfield = other.__isset_bitfield;
+      this.success = other.success;
+    }
+
+    public createClusterInterpreterProcess_result deepCopy() {
+      return new createClusterInterpreterProcess_result(this);
+    }
+
+    @Override
+    public void clear() {
+      setSuccessIsSet(false);
+      this.success = false;
+    }
+
+    public boolean isSuccess() {
+      return this.success;
+    }
+
+    public createClusterInterpreterProcess_result setSuccess(boolean success) {
+      this.success = success;
+      setSuccessIsSet(true);
+      return this;
+    }
+
+    public void unsetSuccess() {
+      __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, 
__SUCCESS_ISSET_ID);
+    }
+
+    /** Returns true if field success is set (has been assigned a value) and 
false otherwise */
+    public boolean isSetSuccess() {
+      return EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID);
+    }
+
+    public void setSuccessIsSet(boolean value) {
+      __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, 
__SUCCESS_ISSET_ID, value);
+    }
+
+    public void setFieldValue(_Fields field, Object value) {
+      switch (field) {
+      case SUCCESS:
+        if (value == null) {
+          unsetSuccess();
+        } else {
+          setSuccess((Boolean)value);
+        }
+        break;
+
+      }
+    }
+
+    public Object getFieldValue(_Fields field) {
+      switch (field) {
+      case SUCCESS:
+        return Boolean.valueOf(isSuccess());
+
+      }
+      throw new IllegalStateException();
+    }
+
+    /** Returns true if field corresponding to fieldID is set (has been 
assigned a value) and false otherwise */
+    public boolean isSet(_Fields field) {
+      if (field == null) {
+        throw new IllegalArgumentException();
+      }
+
+      switch (field) {
+      case SUCCESS:
+        return isSetSuccess();
+      }
+      throw new IllegalStateException();
+    }
+
+    @Override
+    public boolean equals(Object that) {
+      if (that == null)
+        return false;
+      if (that instanceof createClusterInterpreterProcess_result)
+        return this.equals((createClusterInterpreterProcess_result)that);
+      return false;
+    }
+
+    public boolean equals(createClusterInterpreterProcess_result that) {
+      if (that == null)
+        return false;
+
+      boolean this_present_success = true;
+      boolean that_present_success = true;
+      if (this_present_success || that_present_success) {
+        if (!(this_present_success && that_present_success))
+          return false;
+        if (this.success != that.success)
+          return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      List<Object> list = new ArrayList<Object>();
+
+      boolean present_success = true;
+      list.add(present_success);
+      if (present_success)
+        list.add(success);
+
+      return list.hashCode();
+    }
+
+    @Override
+    public int compareTo(createClusterInterpreterProcess_result other) {
+      if (!getClass().equals(other.getClass())) {
+        return getClass().getName().compareTo(other.getClass().getName());
+      }
+
+      int lastComparison = 0;
+
+      lastComparison = 
Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess());
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+      if (isSetSuccess()) {
+        lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, 
other.success);
+        if (lastComparison != 0) {
+          return lastComparison;
+        }
+      }
+      return 0;
+    }
+
+    public _Fields fieldForId(int fieldId) {
+      return _Fields.findByThriftId(fieldId);
+    }
+
+    public void read(org.apache.thrift.protocol.TProtocol iprot) throws 
org.apache.thrift.TException {
+      schemes.get(iprot.getScheme()).getScheme().read(iprot, this);
+    }
+
+    public void write(org.apache.thrift.protocol.TProtocol oprot) throws 
org.apache.thrift.TException {
+      schemes.get(oprot.getScheme()).getScheme().write(oprot, this);
+      }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new 
StringBuilder("createClusterInterpreterProcess_result(");
+      boolean first = true;
+
+      sb.append("success:");
+      sb.append(this.success);
+      first = false;
+      sb.append(")");
+      return sb.toString();
+    }
+
+    public void validate() throws org.apache.thrift.TException {
+      // check for required fields
+      // check for sub-struct validity
+    }
+
+    private void writeObject(java.io.ObjectOutputStream out) throws 
java.io.IOException {
+      try {
+        write(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(out)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private void readObject(java.io.ObjectInputStream in) throws 
java.io.IOException, ClassNotFoundException {
+      try {
+        // it doesn't seem like you should have to do this, but java 
serialization is wacky, and doesn't call the default constructor.
+        __isset_bitfield = 0;
+        read(new org.apache.thrift.protocol.TCompactProtocol(new 
org.apache.thrift.transport.TIOStreamTransport(in)));
+      } catch (org.apache.thrift.TException te) {
+        throw new java.io.IOException(te);
+      }
+    }
+
+    private static class 
createClusterInterpreterProcess_resultStandardSchemeFactory implements 
SchemeFactory {
+      public createClusterInterpreterProcess_resultStandardScheme getScheme() {
+        return new createClusterInterpreterProcess_resultStandardScheme();
+      }
+    }
+
+    private static class createClusterInterpreterProcess_resultStandardScheme 
extends StandardScheme<createClusterInterpreterProcess_result> {
+
+      public void read(org.apache.thrift.protocol.TProtocol iprot, 
createClusterInterpreterProcess_result struct) throws 
org.apache.thrift.TException {
+        org.apache.thrift.protocol.TField schemeField;
+        iprot.readStructBegin();
+        while (true)
+        {
+          schemeField = iprot.readFieldBegin();
+          if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { 
+            break;
+          }
+          switch (schemeField.id) {
+            case 0: // SUCCESS
+              if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) {
+                struct.success = iprot.readBool();
+                struct.setSuccessIsSet(true);
+              } else { 
+                org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+              }
+              break;
+            default:
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, 
schemeField.type);
+          }
+          iprot.readFieldEnd();
+        }
+        iprot.readStructEnd();
+
+        // check for required fields of primitive type, which can't be checked 
in the validate method
+        struct.validate();
+      }
+
+      public void write(org.apache.thrift.protocol.TProtocol oprot, 
createClusterInterpreterProcess_result struct) throws 
org.apache.thrift.TException {
+        struct.validate();
+
+        oprot.writeStructBegin(STRUCT_DESC);
+        if (struct.isSetSuccess()) {
+          oprot.writeFieldBegin(SUCCESS_FIELD_DESC);
+          oprot.writeBool(struct.success);
+          oprot.writeFieldEnd();
+        }
+        oprot.writeFieldStop();
+        oprot.writeStructEnd();
+      }
+
+    }
+
+    private static class 
createClusterInterpreterProcess_resultTupleSchemeFactory implements 
SchemeFactory {
+      public createClusterInterpreterProcess_resultTupleScheme getScheme() {
+        return new createClusterInterpreterProcess_resultTupleScheme();
+      }
+    }
+
+    private static class createClusterInterpreterProcess_resultTupleScheme 
extends TupleScheme<createClusterInterpreterProcess_result> {
+
+      @Override
+      public void write(org.apache.thrift.protocol.TProtocol prot, 
createClusterInterpreterProcess_result struct) throws 
org.apache.thrift.TException {
+        TTupleProtocol oprot = (TTupleProtocol) prot;
+        BitSet optionals = new BitSet();
+        if (struct.isSetSuccess()) {
+          optionals.set(0);
+        }
+        oprot.writeBitSet(optionals, 1);
+        if (struct.isSetSuccess()) {
+          oprot.writeBool(struct.success);
+        }
+      }
+
+      @Override
+      public void read(org.apache.thrift.protocol.TProtocol prot, 
createClusterInterpreterProcess_result struct) throws 
org.apache.thrift.TException {
+        TTupleProtocol iprot = (TTupleProtocol) prot;
+        BitSet incoming = iprot.readBitSet(1);
+        if (incoming.get(0)) {
+          struct.success = iprot.readBool();
+          struct.setSuccessIsSet(true);
+        }
+      }
+    }
+
+  }
+
+}
diff --git a/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift 
b/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
new file mode 100644
index 0000000..c6f208e
--- /dev/null
+++ b/zeppelin-interpreter/src/main/thrift/ClusterManagerService.thrift
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace java org.apache.zeppelin.interpreter.thrift
+
+struct ClusterIntpProcParameters {
+  1: string host,
+  2: i32 port,
+  3: string userName,
+  4: string noteId,
+  5: string replName,
+  6: string defaultInterpreterSetting
+}
+
+service ClusterManagerService {
+  bool createClusterInterpreterProcess(1: ClusterIntpProcParameters 
intpProcParameters);
+}
diff --git a/zeppelin-interpreter/src/main/thrift/genthrift.sh 
b/zeppelin-interpreter/src/main/thrift/genthrift.sh
index 23a295a..31efeae 100755
--- a/zeppelin-interpreter/src/main/thrift/genthrift.sh
+++ b/zeppelin-interpreter/src/main/thrift/genthrift.sh
@@ -21,6 +21,7 @@ rm -rf gen-java
 rm -rf ../java/org/apache/zeppelin/interpreter/thrift
 thrift --gen java RemoteInterpreterService.thrift
 thrift --gen java RemoteInterpreterEventService.thrift
+thrift --gen java ClusterManagerService.thrift
 for file in gen-java/org/apache/zeppelin/interpreter/thrift/* ; do
   cat java_license_header.txt ${file} > ${file}.tmp
   mv -f ${file}.tmp ${file}
diff --git 
a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
new file mode 100644
index 0000000..ef4d7fd
--- /dev/null
+++ 
b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/cluster/ClusterManagerTest.java
@@ -0,0 +1,135 @@
+package org.apache.zeppelin.cluster;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.zeppelin.cluster.meta.ClusterMeta;
+import org.apache.zeppelin.cluster.meta.ClusterMetaType;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class ClusterManagerTest {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(ClusterManagerTest.class);
+
+  private static ClusterManagerServer clusterManagerServer = null;
+  private static ClusterManagerClient clusterManagerClient = null;
+
+  private static ZeppelinConfiguration zconf = null;
+
+  static String zServerHost;
+  static int zServerPort;
+  static final String metaKey = "ClusterManagerTestKey";
+
+  @BeforeClass
+  public static void initClusterEnv() throws IOException, InterruptedException 
{
+    LOGGER.info("initClusterEnv >>>");
+
+    zconf = ZeppelinConfiguration.create();
+
+    // Set the cluster IP and port
+    zServerHost = RemoteInterpreterUtils.findAvailableHostAddress();
+    zServerPort = 
RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces();
+    zconf.setClusterAddress(zServerHost + ":" + zServerPort);
+
+    // mock cluster manager server
+    clusterManagerServer = ClusterManagerServer.getInstance();
+    clusterManagerServer.start(null);
+
+    // mock cluster manager client
+    clusterManagerClient = ClusterManagerClient.getInstance();
+    clusterManagerClient.start(metaKey);
+
+    // Waiting for cluster startup
+    int wait = 0;
+    while(wait++ < 100) {
+      if (clusterManagerServer.isClusterLeader()
+          && clusterManagerServer.raftInitialized()
+          && clusterManagerClient.raftInitialized()) {
+        LOGGER.info("wait {}(ms) found cluster leader", wait*3000);
+        break;
+      }
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+    assertEquals(true, clusterManagerServer.isClusterLeader());
+    LOGGER.info("initClusterEnv <<<");
+  }
+
+  @Test
+  public void getServerMeta() {
+    LOGGER.info("serverMeta >>>");
+
+    // Get metadata for all services
+    Object meta = 
clusterManagerClient.getClusterMeta(ClusterMetaType.ServerMeta, "");
+
+    LOGGER.info(meta.toString());
+
+    assertNotNull(meta);
+    assertEquals(true, (meta instanceof HashMap));
+    HashMap hashMap = (HashMap) meta;
+
+    // Get metadata for the current service
+    Object values = hashMap.get(zServerHost + ":" + zServerPort);
+    assertEquals(true, (values instanceof HashMap));
+    HashMap mapMetaValues = (HashMap) values;
+
+    assertEquals(true, mapMetaValues.size()>0);
+
+    LOGGER.info("serverMeta <<<");
+  }
+
+  @Test
+  public void putIntpProcessMeta() {
+    // mock IntpProcess Meta
+    HashMap<String, Object> meta = new HashMap<>();
+    meta.put(ClusterMeta.SERVER_HOST, zServerHost);
+    meta.put(ClusterMeta.SERVER_PORT, zServerPort);
+    meta.put(ClusterMeta.SERVER_TSERVER_HOST, "SERVER_TSERVER_HOST");
+    meta.put(ClusterMeta.SERVER_TSERVER_PORT, "SERVER_TSERVER_PORT");
+    meta.put(ClusterMeta.INTP_TSERVER_HOST, "INTP_TSERVER_HOST");
+    meta.put(ClusterMeta.INTP_TSERVER_PORT, "INTP_TSERVER_PORT");
+    meta.put(ClusterMeta.CPU_CAPACITY, "CPU_CAPACITY");
+    meta.put(ClusterMeta.CPU_USED, "CPU_USED");
+    meta.put(ClusterMeta.MEMORY_CAPACITY, "MEMORY_CAPACITY");
+    meta.put(ClusterMeta.MEMORY_USED, "MEMORY_USED");
+
+    // put IntpProcess Meta
+    clusterManagerClient.putClusterMeta(ClusterMetaType.IntpProcessMeta, 
metaKey, meta);
+
+    // get IntpProcess Meta
+    HashMap<String, HashMap<String, Object>> check
+        = clusterManagerClient.getClusterMeta(ClusterMetaType.IntpProcessMeta, 
metaKey);
+
+    LOGGER.info(check.toString());
+
+    assertNotNull(check);
+    assertNotNull(check.get(metaKey));
+    assertEquals(true, check.get(metaKey).size()>0);
+  }
+}
diff --git a/zeppelin-interpreter/src/test/resources/zeppelin-site.xml 
b/zeppelin-interpreter/src/test/resources/zeppelin-site.xml
new file mode 100644
index 0000000..2499e44
--- /dev/null
+++ b/zeppelin-interpreter/src/test/resources/zeppelin-site.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+    <property>
+        <name>zeppelin.cluster.addr</name>
+        <value></value>
+        <description>Server cluster address</description>
+    </property>
+
+    <property>
+        <name>zeppelin.server.addr</name>
+        <value>0.0.0.0</value>
+        <description>Server address</description>
+    </property>
+
+    <property>
+        <name>zeppelin.server.port</name>
+        <value>8080</value>
+        <description>Server port.</description>
+    </property>
+</configuration>
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index fe85f75..cf9074f 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -31,6 +31,8 @@ import javax.servlet.ServletContextListener;
 import org.apache.commons.lang.StringUtils;
 import org.apache.shiro.web.env.EnvironmentLoaderListener;
 import org.apache.shiro.web.servlet.ShiroFilter;
+import org.apache.zeppelin.cluster.ClusterManager;
+import org.apache.zeppelin.cluster.ClusterManagerServer;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
@@ -42,6 +44,7 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
+import org.apache.zeppelin.interpreter.thrift.ClusterManagerService;
 import org.apache.zeppelin.notebook.NoteEventListener;
 import org.apache.zeppelin.notebook.Notebook;
 import org.apache.zeppelin.notebook.AuthorizationService;
@@ -159,6 +162,10 @@ public class ZeppelinServer extends ResourceConfig {
                 .to(NoteEventListener.class)
                 .to(WebSocketServlet.class)
                 .in(Singleton.class);
+            bindAsContract(ClusterManagerServer.class)
+                .to(ClusterManager.class)
+                .to(ClusterManagerService.Iface.class)
+                .in(Singleton.class);
             if (conf.isZeppelinNotebookCronEnable()) {
               
bind(QuartzSchedulerService.class).to(SchedulerService.class).in(Singleton.class);
             } else {
@@ -186,6 +193,9 @@ public class ZeppelinServer extends ResourceConfig {
     // Notebook server
     setupNotebookServer(webApp, conf, sharedServiceLocator);
 
+    // Cluster Manager Server
+    setupClusterManagerServer(sharedServiceLocator);
+
     // JMX Enable
     Stream.of("ZEPPELIN_JMX_ENABLE")
         .map(System::getenv)
@@ -345,6 +355,12 @@ public class ZeppelinServer extends ResourceConfig {
     webapp.addServlet(servletHolder, "/ws/*");
   }
 
+  private static void setupClusterManagerServer(ServiceLocator serviceLocator) 
{
+    InterpreterFactory interpreterFactory
+        = sharedServiceLocator.getService(InterpreterFactory.class);
+    
sharedServiceLocator.getService(ClusterManagerServer.class).start(interpreterFactory);
+  }
+
   private static SslContextFactory getSslContextFactory(ZeppelinConfiguration 
conf) {
     SslContextFactory sslContextFactory = new SslContextFactory();
 
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index a3bc333..7721808 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -31,7 +31,7 @@ import java.util.List;
  * Factory class for creating interpreters.
  *
  */
-public class InterpreterFactory {
+public class InterpreterFactory implements InterpreterFactoryInterface {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(InterpreterFactory.class);
 
   private final InterpreterSettingManager interpreterSettingManager;
@@ -41,6 +41,7 @@ public class InterpreterFactory {
     this.interpreterSettingManager = interpreterSettingManager;
   }
 
+  @Override
   public Interpreter getInterpreter(String user,
                                     String noteId,
                                     String replName,

Reply via email to