http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
new file mode 100644
index 0000000..6759f97
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import com.google.gson.Gson;
+import org.apache.thrift.TException;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
+import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
+import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService;
+import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
+import org.apache.zeppelin.scheduler.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * HeliumApplicationFactory
+ */
+public class HeliumApplicationFactory implements ApplicationEventListener, 
NotebookEventListener {
+  private final Logger logger = 
LoggerFactory.getLogger(HeliumApplicationFactory.class);
+  private final ExecutorService executor;
+  private final Gson gson = new Gson();
+  private Notebook notebook;
+  private ApplicationEventListener applicationEventListener;
+
+  public HeliumApplicationFactory() {
+    executor = ExecutorFactory.singleton().createOrGet(
+        HeliumApplicationFactory.class.getName(), 10);
+  }
+
+  private boolean isRemote(InterpreterGroup group) {
+    return group.getAngularObjectRegistry() instanceof 
RemoteAngularObjectRegistry;
+  }
+
+
+  /**
+   * Load pkg and run task
+   */
+  public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) {
+    ApplicationState appState = paragraph.createOrGetApplicationState(pkg);
+    onLoad(paragraph.getNote().getId(), paragraph.getId(), appState.getId(),
+        appState.getHeliumPackage());
+    executor.submit(new LoadApplication(appState, pkg, paragraph));
+    return appState.getId();
+  }
+
+  /**
+   * Load application and run in the remote process
+   */
+  private class LoadApplication implements Runnable {
+    private final HeliumPackage pkg;
+    private final Paragraph paragraph;
+    private final ApplicationState appState;
+
+    public LoadApplication(ApplicationState appState, HeliumPackage pkg, 
Paragraph paragraph) {
+      this.appState = appState;
+      this.pkg = pkg;
+      this.paragraph = paragraph;
+    }
+
+    @Override
+    public void run() {
+      try {
+        // get interpreter process
+        Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName());
+        InterpreterGroup intpGroup = intp.getInterpreterGroup();
+        RemoteInterpreterProcess intpProcess = 
intpGroup.getRemoteInterpreterProcess();
+        if (intpProcess == null) {
+          throw new ApplicationException("Target interpreter process is not 
running");
+        }
+
+        // load application
+        load(intpProcess, appState);
+
+        // run application
+        RunApplication runTask = new RunApplication(paragraph, 
appState.getId());
+        runTask.run();
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+
+        if (appState != null) {
+          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.ERROR);
+          appState.setOutput(e.getMessage());
+        }
+      }
+    }
+
+    private void load(RemoteInterpreterProcess intpProcess, ApplicationState 
appState)
+        throws Exception {
+
+      RemoteInterpreterService.Client client = null;
+
+      synchronized (appState) {
+        if (appState.getStatus() == ApplicationState.Status.LOADED) {
+          // already loaded
+          return;
+        }
+
+        try {
+          appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADING);
+          String pkgInfo = gson.toJson(pkg);
+          String appId = appState.getId();
+
+          client = intpProcess.getClient();
+          RemoteApplicationResult ret = client.loadApplication(
+              appId,
+              pkgInfo,
+              paragraph.getNote().getId(),
+              paragraph.getId());
+
+          if (ret.isSuccess()) {
+            appStatusChange(paragraph, appState.getId(), 
ApplicationState.Status.LOADED);
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          throw e;
+        } finally {
+          if (client != null) {
+            intpProcess.releaseClient(client);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Get ApplicationState
+   * @param paragraph
+   * @param appId
+   * @return
+   */
+  public ApplicationState get(Paragraph paragraph, String appId) {
+    return paragraph.getApplicationState(appId);
+  }
+
+  /**
+   * Unload application
+   * It does not remove ApplicationState
+   *
+   * @param paragraph
+   * @param appId
+   */
+  public void unload(Paragraph paragraph, String appId) {
+    executor.execute(new UnloadApplication(paragraph, appId));
+  }
+
+  /**
+   * Unload application task
+   */
+  private class UnloadApplication implements Runnable {
+    private final Paragraph paragraph;
+    private final String appId;
+
+    public UnloadApplication(Paragraph paragraph, String appId) {
+      this.paragraph = paragraph;
+      this.appId = appId;
+    }
+
+    @Override
+    public void run() {
+      ApplicationState appState = null;
+      try {
+        appState = paragraph.getApplicationState(appId);
+
+        if (appState == null) {
+          logger.warn("Can not find {} to unload from {}", appId, 
paragraph.getId());
+          return;
+        }
+        if (appState.getStatus() == ApplicationState.Status.UNLOADED) {
+          // not loaded
+          return;
+        }
+        unload(appState);
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+        if (appState != null) {
+          appStatusChange(paragraph, appId, ApplicationState.Status.ERROR);
+          appState.setOutput(e.getMessage());
+        }
+      }
+    }
+
+    private void unload(ApplicationState appsToUnload) throws 
ApplicationException {
+      synchronized (appsToUnload) {
+        if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) {
+          throw new ApplicationException(
+              "Can't unload application status " + appsToUnload.getStatus());
+        }
+        appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADING);
+        Interpreter intp = paragraph.getCurrentRepl();
+        if (intp == null) {
+          throw new ApplicationException("No interpreter found");
+        }
+
+        RemoteInterpreterProcess intpProcess =
+            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+        if (intpProcess == null) {
+          throw new ApplicationException("Target interpreter process is not 
running");
+        }
+
+        RemoteInterpreterService.Client client;
+        try {
+          client = intpProcess.getClient();
+        } catch (Exception e) {
+          throw new ApplicationException(e);
+        }
+
+        try {
+          RemoteApplicationResult ret = 
client.unloadApplication(appsToUnload.getId());
+
+          if (ret.isSuccess()) {
+            appStatusChange(paragraph, appsToUnload.getId(), 
ApplicationState.Status.UNLOADED);
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          throw new ApplicationException(e);
+        } finally {
+          intpProcess.releaseClient(client);
+        }
+      }
+    }
+  }
+
+  /**
+   * Run application
+   * It does not remove ApplicationState
+   *
+   * @param paragraph
+   * @param appId
+   */
+  public void run(Paragraph paragraph, String appId) {
+    executor.execute(new RunApplication(paragraph, appId));
+  }
+
+  /**
+   * Run application task
+   */
+  private class RunApplication implements Runnable {
+    private final Paragraph paragraph;
+    private final String appId;
+
+    public RunApplication(Paragraph paragraph, String appId) {
+      this.paragraph = paragraph;
+      this.appId = appId;
+    }
+
+    @Override
+    public void run() {
+      ApplicationState appState = null;
+      try {
+        appState = paragraph.getApplicationState(appId);
+
+        if (appState == null) {
+          logger.warn("Can not find {} to unload from {}", appId, 
paragraph.getId());
+          return;
+        }
+
+        run(appState);
+      } catch (Exception e) {
+        logger.error(e.getMessage(), e);
+        if (appState != null) {
+          appStatusChange(paragraph, appId, ApplicationState.Status.UNLOADED);
+          appState.setOutput(e.getMessage());
+        }
+      }
+    }
+
+    private void run(ApplicationState app) throws ApplicationException {
+      synchronized (app) {
+        if (app.getStatus() != ApplicationState.Status.LOADED) {
+          throw new ApplicationException(
+              "Can't run application status " + app.getStatus());
+        }
+
+        Interpreter intp = paragraph.getCurrentRepl();
+        if (intp == null) {
+          throw new ApplicationException("No interpreter found");
+        }
+
+        RemoteInterpreterProcess intpProcess =
+            intp.getInterpreterGroup().getRemoteInterpreterProcess();
+        if (intpProcess == null) {
+          throw new ApplicationException("Target interpreter process is not 
running");
+        }
+        RemoteInterpreterService.Client client = null;
+        try {
+          client = intpProcess.getClient();
+        } catch (Exception e) {
+          throw new ApplicationException(e);
+        }
+
+        try {
+          RemoteApplicationResult ret = client.runApplication(app.getId());
+
+          if (ret.isSuccess()) {
+            // success
+          } else {
+            throw new ApplicationException(ret.getMsg());
+          }
+        } catch (TException e) {
+          intpProcess.releaseBrokenClient(client);
+          client = null;
+          throw new ApplicationException(e);
+        } finally {
+          if (client != null) {
+            intpProcess.releaseClient(client);
+          }
+        }
+      }
+    }
+  }
+
+  @Override
+  public void onOutputAppend(String noteId, String paragraphId, String appId, 
String output) {
+    ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
+
+    if (appToUpdate != null) {
+      appToUpdate.appendOutput(output);
+    } else {
+      logger.error("Can't find app {}", appId);
+    }
+
+    if (applicationEventListener != null) {
+      applicationEventListener.onOutputAppend(noteId, paragraphId, appId, 
output);
+    }
+  }
+
+  @Override
+  public void onOutputUpdated(String noteId, String paragraphId, String appId, 
String output) {
+    ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
+
+    if (appToUpdate != null) {
+      appToUpdate.setOutput(output);
+    } else {
+      logger.error("Can't find app {}", appId);
+    }
+
+    if (applicationEventListener != null) {
+      applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, 
output);
+    }
+  }
+
+  @Override
+  public void onLoad(String noteId, String paragraphId, String appId, 
HeliumPackage pkg) {
+    if (applicationEventListener != null) {
+      applicationEventListener.onLoad(noteId, paragraphId, appId, pkg);
+    }
+  }
+
+  @Override
+  public void onStatusChange(String noteId, String paragraphId, String appId, 
String status) {
+    ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId);
+    if (appToUpdate != null) {
+      appToUpdate.setStatus(ApplicationState.Status.valueOf(status));
+    }
+
+    if (applicationEventListener != null) {
+      applicationEventListener.onStatusChange(noteId, paragraphId, appId, 
status);
+    }
+  }
+
+  private void appStatusChange(Paragraph paragraph,
+                               String appId,
+                               ApplicationState.Status status) {
+    ApplicationState app = paragraph.getApplicationState(appId);
+    app.setStatus(status);
+    onStatusChange(paragraph.getNote().getId(), paragraph.getId(), appId, 
status.toString());
+  }
+
+  private ApplicationState getAppState(String noteId, String paragraphId, 
String appId) {
+    if (notebook == null) {
+      return null;
+    }
+
+    Note note = notebook.getNote(noteId);
+    if (note == null) {
+      logger.error("Can't get note {}", noteId);
+      return null;
+    }
+    Paragraph paragraph = note.getParagraph(paragraphId);
+    if (paragraph == null) {
+      logger.error("Can't get paragraph {}", paragraphId);
+      return null;
+    }
+
+    ApplicationState appFound = paragraph.getApplicationState(appId);
+
+    return appFound;
+  }
+
+  public Notebook getNotebook() {
+    return notebook;
+  }
+
+  public void setNotebook(Notebook notebook) {
+    this.notebook = notebook;
+  }
+
+  public ApplicationEventListener getApplicationEventListener() {
+    return applicationEventListener;
+  }
+
+  public void setApplicationEventListener(ApplicationEventListener 
applicationEventListener) {
+    this.applicationEventListener = applicationEventListener;
+  }
+
+  @Override
+  public void onNoteRemove(Note note) {
+  }
+
+  @Override
+  public void onNoteCreate(Note note) {
+
+  }
+
+  @Override
+  public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
+    for (Paragraph p : note.getParagraphs()) {
+      Interpreter currentInterpreter = p.getCurrentRepl();
+      List<InterpreterSetting.InterpreterInfo> infos = 
setting.getInterpreterInfos();
+      for (InterpreterSetting.InterpreterInfo info : infos) {
+        if (info.getClassName().equals(currentInterpreter.getClassName())) {
+          onParagraphRemove(p);
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public void onParagraphRemove(Paragraph paragraph) {
+    List<ApplicationState> appStates = paragraph.getAllApplicationStates();
+    for (ApplicationState app : appStates) {
+      UnloadApplication unloadJob = new UnloadApplication(paragraph, 
app.getId());
+      unloadJob.run();
+    }
+  }
+
+  @Override
+  public void onParagraphCreate(Paragraph p) {
+
+  }
+
+  @Override
+  public void onParagraphStatusChange(Paragraph p, Job.Status status) {
+    if (status == Job.Status.FINISHED) {
+      // refresh application
+      List<ApplicationState> appStates = p.getAllApplicationStates();
+
+      for (ApplicationState app : appStates) {
+        loadAndRun(app.getHeliumPackage(), p);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java
new file mode 100644
index 0000000..2a93caa
--- /dev/null
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Helium config. This object will be persisted to conf/heliumc.conf
+ */
+public class HeliumConf {
+  List<HeliumRegistry> registry = new LinkedList<HeliumRegistry>();
+
+  public List<HeliumRegistry> getRegistry() {
+    return registry;
+  }
+
+  public void setRegistry(List<HeliumRegistry> registry) {
+    this.registry = registry;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java
new file mode 100644
index 0000000..ef28835
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import com.google.gson.Gson;
+import com.google.gson.stream.JsonReader;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.URI;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Simple Helium registry on local filesystem
+ */
+public class HeliumLocalRegistry extends HeliumRegistry {
+  Logger logger = LoggerFactory.getLogger(HeliumLocalRegistry.class);
+
+  private final Gson gson;
+
+  public HeliumLocalRegistry(String name, String uri) {
+    super(name, uri);
+    gson = new Gson();
+
+  }
+
+
+  @Override
+  public synchronized List<HeliumPackage> getAll() throws IOException {
+    List<HeliumPackage> result = new LinkedList<HeliumPackage>();
+
+    File file = new File(uri());
+    File [] files = file.listFiles();
+    if (files == null) {
+      return result;
+    }
+
+    for (File f : files) {
+      if (f.getName().startsWith(".")) {
+        continue;
+      }
+
+      HeliumPackage pkgInfo = readPackageInfo(f);
+      if (pkgInfo != null) {
+        result.add(pkgInfo);
+      }
+    }
+    return result;
+  }
+
+  private HeliumPackage readPackageInfo(File f) {
+    try {
+      JsonReader reader = new JsonReader(new 
StringReader(FileUtils.readFileToString(f)));
+      reader.setLenient(true);
+
+      return gson.fromJson(reader, HeliumPackage.class);
+    } catch (IOException e) {
+      logger.error(e.getMessage(), e);
+      return null;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java
new file mode 100644
index 0000000..57a9d45
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+/**
+ * search result
+ */
+public class HeliumPackageSearchResult {
+  private final String registry;
+  private final HeliumPackage pkg;
+
+  /**
+   * Create search result item
+   * @param registry registry name
+   * @param pkg package information
+   */
+  public HeliumPackageSearchResult(String registry, HeliumPackage pkg) {
+    this.registry = registry;
+    this.pkg = pkg;
+  }
+
+  public String getRegistry() {
+    return registry;
+  }
+
+  public HeliumPackage getPkg() {
+    return pkg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java
new file mode 100644
index 0000000..45c1640
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Suggested apps
+ */
+public class HeliumPackageSuggestion {
+  private final List<HeliumPackageSearchResult> available =
+      new LinkedList<HeliumPackageSearchResult>();
+
+  /*
+   * possible future improvement
+   * provides n - 'favorite' list, based on occurrence of apps in notebook
+   */
+
+  public HeliumPackageSuggestion() {
+
+  }
+
+  public void addAvailablePackage(HeliumPackageSearchResult r) {
+    available.add(r);
+
+  }
+
+  public void sort() {
+    Collections.sort(available, new Comparator<HeliumPackageSearchResult>() {
+      @Override
+      public int compare(HeliumPackageSearchResult o1, 
HeliumPackageSearchResult o2) {
+        return o1.getPkg().getName().compareTo(o2.getPkg().getName());
+      }
+    });
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java
new file mode 100644
index 0000000..125ad92
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+/**
+ * Helium package registry
+ */
+public abstract class HeliumRegistry {
+  private final String name;
+  private final String uri;
+
+  public HeliumRegistry(String name, String uri) {
+    this.name = name;
+    this.uri = uri;
+  }
+  public String name() {
+    return name;
+  }
+  public String uri() {
+    return uri;
+  }
+  public abstract List<HeliumPackage> getAll() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java
new file mode 100644
index 0000000..3abcb9f
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import com.google.gson.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Type;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * HeliumRegistrySerializer (and deserializer) for gson
+ */
+public class HeliumRegistrySerializer
+    implements JsonSerializer<HeliumRegistry>, 
JsonDeserializer<HeliumRegistry> {
+  Logger logger = LoggerFactory.getLogger(HeliumRegistrySerializer.class);
+
+  @Override
+  public HeliumRegistry deserialize(JsonElement json,
+                                Type type,
+                                JsonDeserializationContext 
jsonDeserializationContext)
+      throws JsonParseException {
+    JsonObject jsonObject = json.getAsJsonObject();
+    String className = jsonObject.get("class").getAsString();
+    String uri = jsonObject.get("uri").getAsString();
+    String name = jsonObject.get("name").getAsString();
+
+    try {
+      logger.info("Restore helium registry {} {} {}", name, className, uri);
+      Class<HeliumRegistry> cls =
+          (Class<HeliumRegistry>) 
getClass().getClassLoader().loadClass(className);
+      Constructor<HeliumRegistry> constructor = 
cls.getConstructor(String.class, String.class);
+      HeliumRegistry registry = constructor.newInstance(name, uri);
+      return registry;
+    } catch (ClassNotFoundException | NoSuchMethodException | 
IllegalAccessException |
+        InstantiationException | InvocationTargetException e) {
+      logger.error(e.getMessage(), e);
+      return null;
+    }
+  }
+
+  @Override
+  public JsonElement serialize(HeliumRegistry heliumRegistry,
+                               Type type,
+                               JsonSerializationContext 
jsonSerializationContext) {
+    JsonObject json = new JsonObject();
+    json.addProperty("class", heliumRegistry.getClass().getName());
+    json.addProperty("uri", heliumRegistry.uri());
+    json.addProperty("name", heliumRegistry.name());
+    return json;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
index 5595c14..47a4325 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java
@@ -30,7 +30,10 @@ import org.apache.zeppelin.dep.Dependency;
 import org.apache.zeppelin.dep.DependencyResolver;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.helium.ApplicationEventListener;
 import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter;
+import org.apache.zeppelin.interpreter.dev.DevInterpreter;
+import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener;
@@ -82,22 +85,29 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
 
   private AngularObjectRegistryListener angularObjectRegistryListener;
   private final RemoteInterpreterProcessListener 
remoteInterpreterProcessListener;
+  private final ApplicationEventListener appEventListener;
 
   private DependencyResolver depResolver;
 
+  private Map<String, String> env = new HashMap<String, String>();
+
+  private Interpreter devInterpreter;
+
   public InterpreterFactory(ZeppelinConfiguration conf,
       AngularObjectRegistryListener angularObjectRegistryListener,
       RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appEventListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException, RepositoryException {
     this(conf, new InterpreterOption(true), angularObjectRegistryListener,
-        remoteInterpreterProcessListener, depResolver);
+            remoteInterpreterProcessListener, appEventListener, depResolver);
   }
 
 
   public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption 
defaultOption,
       AngularObjectRegistryListener angularObjectRegistryListener,
       RemoteInterpreterProcessListener remoteInterpreterProcessListener,
+      ApplicationEventListener appEventListener,
       DependencyResolver depResolver)
       throws InterpreterException, IOException, RepositoryException {
     this.conf = conf;
@@ -106,6 +116,7 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
     this.depResolver = depResolver;
     this.interpreterRepositories = depResolver.getRepos();
     this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
+    this.appEventListener = appEventListener;
     String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS);
     interpreterClassList = replsConf.split(",");
     String groupOrder = 
conf.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER);
@@ -547,8 +558,18 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
         Interpreter intp;
 
         if (option.isRemote()) {
-          intp = createRemoteRepl(info.getPath(), key, info.getClassName(), 
properties,
-              interpreterSetting.id());
+          if (option.isConnectExistingProcess()) {
+            intp = connectToRemoteRepl(
+                noteId,
+                info.getClassName(),
+                option.getHost(), option.getPort(), properties);
+          } else {
+            intp = createRemoteRepl(info.getPath(),
+                key,
+                info.getClassName(),
+                properties,
+                interpreterSetting.id());
+          }
         } else {
           intp = createRepl(info.getPath(), info.getClassName(), properties);
         }
@@ -850,6 +871,26 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
     }
   }
 
+  private Interpreter connectToRemoteRepl(String noteId,
+                                          String className,
+                                          String host,
+                                          int port,
+                                          Properties property) {
+    int connectTimeout = 
conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT);
+    int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE);
+    LazyOpenInterpreter intp = new LazyOpenInterpreter(
+        new RemoteInterpreter(
+            property,
+            noteId,
+            className,
+            host,
+            port,
+            connectTimeout,
+            maxPoolSize,
+            remoteInterpreterProcessListener,
+            appEventListener));
+    return intp;
+  }
 
   private Interpreter createRemoteRepl(String interpreterPath, String noteId, 
String className,
       Properties property, String interpreterSettingId) {
@@ -859,10 +900,14 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
 
     updatePropertiesFromRegisteredInterpreter(property, className);
 
-    LazyOpenInterpreter intp = new LazyOpenInterpreter(new 
RemoteInterpreter(property, noteId,
-        className, conf.getInterpreterRemoteRunnerPath(), interpreterPath, 
localRepoPath,
-        connectTimeout, maxPoolSize, remoteInterpreterProcessListener));
-    return intp;
+
+    RemoteInterpreter remoteInterpreter = new RemoteInterpreter(
+        property, noteId, className, conf.getInterpreterRemoteRunnerPath(),
+        interpreterPath, localRepoPath, connectTimeout,
+        maxPoolSize, remoteInterpreterProcessListener, appEventListener);
+    remoteInterpreter.setEnv(env);
+
+    return new LazyOpenInterpreter(remoteInterpreter);
   }
 
   private Properties updatePropertiesFromRegisteredInterpreter(Properties 
properties,
@@ -1037,6 +1082,11 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
       }
     }
 
+    // dev interpreter
+    if (DevInterpreter.isInterpreterName(replName)) {
+      return getDevInterpreter();
+    }
+
     return null;
   }
 
@@ -1073,4 +1123,34 @@ public class InterpreterFactory implements 
InterpreterGroupFactory {
     depResolver.delRepo(id);
     saveToFile();
   }
+
+  public Map<String, String> getEnv() {
+    return env;
+  }
+
+  public void setEnv(Map<String, String> env) {
+    this.env = env;
+  }
+
+
+  public Interpreter getDevInterpreter() {
+    if (devInterpreter == null) {
+      InterpreterOption option = new InterpreterOption();
+      option.setRemote(true);
+
+      InterpreterGroup interpreterGroup = createInterpreterGroup("dev", 
option);
+
+      devInterpreter = connectToRemoteRepl("dev", 
DevInterpreter.class.getName(),
+          "localhost",
+          ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT,
+          new Properties());
+
+      LinkedList<Interpreter> intpList = new LinkedList<Interpreter>();
+      intpList.add(devInterpreter);
+      interpreterGroup.put("dev", intpList);
+
+      devInterpreter.setInterpreterGroup(interpreterGroup);
+    }
+    return devInterpreter;
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
index f9e43ab..7aac781 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java
@@ -22,16 +22,13 @@ package org.apache.zeppelin.interpreter;
  */
 public class InterpreterOption {
   boolean remote;
+  String host = null;
+  int port = -1;
   boolean perNoteSession;
   boolean perNoteProcess;
   
   boolean isExistingProcess;
 
-  String host;
-  String port;
-
-
-  
 
   public boolean isExistingProcess() {
     return isExistingProcess;
@@ -41,18 +38,10 @@ public class InterpreterOption {
     this.isExistingProcess = isExistingProcess;
   }
 
-  public String getPort() {
-    return port;
-  }
-
-  public void setPort(String port) {
+  public void setPort(int port) {
     this.port = port;
   }
 
-  public String getHost() {
-    return host;
-  }
-
   public void setHost(String host) {
     this.host = host;
   }
@@ -82,6 +71,18 @@ public class InterpreterOption {
     this.perNoteSession = perNoteSession;
   }
 
+  public boolean isConnectExistingProcess() {
+    return (host != null && port != -1);
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public int getPort() {
+    return port;
+  }
+
   public boolean isPerNoteProcess() {
     return perNoteProcess;
   }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
new file mode 100644
index 0000000..1505db9
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.helium.HeliumPackage;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+
+/**
+ * Current state of application
+ */
+public class ApplicationState {
+
+  /**
+   * Status of Application
+   */
+  public static enum Status {
+    LOADING,
+    LOADED,
+    UNLOADING,
+    UNLOADED,
+    ERROR
+  };
+
+  Status status = Status.UNLOADED;
+
+  String id;   // unique id for this instance. Similar to note id or paragraph 
id
+  HeliumPackage pkg;
+  String output;
+
+  public ApplicationState(String id, HeliumPackage pkg) {
+    this.id = id;
+    this.pkg = pkg;
+  }
+
+  /**
+   * After ApplicationState is restored from NotebookRepo,
+   * such as after Zeppelin daemon starts or Notebook import,
+   * Application status need to be reset.
+   */
+  public void resetStatus() {
+    if (status != Status.ERROR) {
+      status = Status.UNLOADED;
+    }
+  }
+
+
+  @Override
+  public boolean equals(Object o) {
+    String compareName;
+    if (o instanceof ApplicationState) {
+      return pkg.equals(((ApplicationState) o).getHeliumPackage());
+    } else if (o instanceof HeliumPackage) {
+      return pkg.equals((HeliumPackage) o);
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return pkg.hashCode();
+  }
+
+  public String getId() {
+    return id;
+  }
+
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public String getOutput() {
+    return output;
+  }
+
+  public void setOutput(String output) {
+    this.output = output;
+  }
+
+  public synchronized void appendOutput(String output) {
+    if (this.output == null) {
+      this.output = output;
+    } else {
+      this.output += output;
+    }
+  }
+
+  public HeliumPackage getHeliumPackage() {
+    return pkg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
index e57ed9b..0a8a45d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java
@@ -30,6 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.Input;
+import org.apache.zeppelin.helium.HeliumApplicationFactory;
 import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
@@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Binded interpreters for a note
  */
-public class Note implements Serializable, JobListener {
+public class Note implements Serializable, ParagraphJobListener {
   static Logger logger = LoggerFactory.getLogger(Note.class);
   private static final long serialVersionUID = 7920699076577612429L;
 
@@ -78,6 +79,7 @@ public class Note implements Serializable, JobListener {
   private transient NotebookRepo repo;
   private transient SearchService index;
   private transient ScheduledFuture delayedPersist;
+  private transient NoteEventListener noteEventListener;
   private transient Credentials credentials;
 
   /**
@@ -98,11 +100,13 @@ public class Note implements Serializable, JobListener {
   public Note() {}
 
   public Note(NotebookRepo repo, InterpreterFactory factory,
-      JobListenerFactory jlFactory, SearchService noteIndex, Credentials 
credentials) {
+      JobListenerFactory jlFactory, SearchService noteIndex, Credentials 
credentials,
+      NoteEventListener noteEventListener) {
     this.repo = repo;
     this.factory = factory;
     this.jobListenerFactory = jlFactory;
     this.index = noteIndex;
+    this.noteEventListener = noteEventListener;
     this.credentials = credentials;
     generateId();
   }
@@ -202,6 +206,9 @@ public class Note implements Serializable, JobListener {
     synchronized (paragraphs) {
       paragraphs.add(p);
     }
+    if (noteEventListener != null) {
+      noteEventListener.onParagraphCreate(p);
+    }
     return p;
   }
 
@@ -239,6 +246,9 @@ public class Note implements Serializable, JobListener {
     synchronized (paragraphs) {
       paragraphs.add(newParagraph);
     }
+    if (noteEventListener != null) {
+      noteEventListener.onParagraphCreate(newParagraph);
+    }
   }
 
   /**
@@ -252,6 +262,9 @@ public class Note implements Serializable, JobListener {
     synchronized (paragraphs) {
       paragraphs.add(index, p);
     }
+    if (noteEventListener != null) {
+      noteEventListener.onParagraphCreate(p);
+    }
     return p;
   }
 
@@ -287,12 +300,14 @@ public class Note implements Serializable, JobListener {
         if (p.getId().equals(paragraphId)) {
           index.deleteIndexDoc(this, p);
           i.remove();
+
+          if (noteEventListener != null) {
+            noteEventListener.onParagraphRemove(p);
+          }
           return p;
         }
       }
     }
-
-
     return null;
   }
 
@@ -431,9 +446,11 @@ public class Note implements Serializable, JobListener {
         AuthenticationInfo authenticationInfo = new AuthenticationInfo();
         authenticationInfo.setUser(cronExecutingUser);
         p.setAuthenticationInfo(authenticationInfo);
+
         p.setInterpreterFactory(factory);
         p.setListener(jobListenerFactory.getParagraphJobListener(this));
         Interpreter intp = factory.getInterpreter(getId(), 
p.getRequiredReplName());
+
         intp.getScheduler().submit(p);
       }
     }
@@ -450,6 +467,7 @@ public class Note implements Serializable, JobListener {
     p.setListener(jobListenerFactory.getParagraphJobListener(this));
     String requiredReplName = p.getRequiredReplName();
     Interpreter intp = factory.getInterpreter(getId(), requiredReplName);
+
     if (intp == null) {
       // TODO(jongyoul): Make "%jdbc" configurable from JdbcInterpreter
       if (conf.getUseJdbcAlias() && null != (intp = 
factory.getInterpreter(getId(), "jdbc"))) {
@@ -526,8 +544,25 @@ public class Note implements Serializable, JobListener {
       if (registry instanceof RemoteAngularObjectRegistry) {
         // remove paragraph scope object
         ((RemoteAngularObjectRegistry) 
registry).removeAllAndNotifyRemoteProcess(id, paragraphId);
+
+        // remove app scope object
+        List<ApplicationState> appStates = 
getParagraph(paragraphId).getAllApplicationStates();
+        if (appStates != null) {
+          for (ApplicationState app : appStates) {
+            ((RemoteAngularObjectRegistry) 
registry).removeAllAndNotifyRemoteProcess(
+                id, app.getId());
+          }
+        }
       } else {
         registry.removeAll(id, paragraphId);
+
+        // remove app scope object
+        List<ApplicationState> appStates = 
getParagraph(paragraphId).getAllApplicationStates();
+        if (appStates != null) {
+          for (ApplicationState app : appStates) {
+            registry.removeAll(id, app.getId());
+          }
+        }
       }
     }
   }
@@ -624,12 +659,67 @@ public class Note implements Serializable, JobListener {
 
   @Override
   public void beforeStatusChange(Job job, Status before, Status after) {
+    if (jobListenerFactory != null) {
+      ParagraphJobListener listener = 
jobListenerFactory.getParagraphJobListener(this);
+      if (listener != null) {
+        listener.beforeStatusChange(job, before, after);
+      }
+    }
   }
 
   @Override
   public void afterStatusChange(Job job, Status before, Status after) {
+    if (jobListenerFactory != null) {
+      ParagraphJobListener listener = 
jobListenerFactory.getParagraphJobListener(this);
+      if (listener != null) {
+        listener.afterStatusChange(job, before, after);
+      }
+    }
+
+    if (noteEventListener != null) {
+      noteEventListener.onParagraphStatusChange((Paragraph) job, after);
+    }
   }
 
   @Override
-  public void onProgressUpdate(Job job, int progress) {}
+  public void onProgressUpdate(Job job, int progress) {
+    if (jobListenerFactory != null) {
+      ParagraphJobListener listener = 
jobListenerFactory.getParagraphJobListener(this);
+      if (listener != null) {
+        listener.onProgressUpdate(job, progress);
+      }
+    }
+  }
+
+
+  @Override
+  public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output) {
+    if (jobListenerFactory != null) {
+      ParagraphJobListener listener = 
jobListenerFactory.getParagraphJobListener(this);
+      if (listener != null) {
+        listener.onOutputAppend(paragraph, out, output);
+      }
+    }
+  }
+
+  @Override
+  public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output) {
+    if (jobListenerFactory != null) {
+      ParagraphJobListener listener = 
jobListenerFactory.getParagraphJobListener(this);
+      if (listener != null) {
+        listener.onOutputUpdate(paragraph, out, output);
+      }
+    }
+  }
+
+
+
+  public NoteEventListener getNoteEventListener() {
+    return noteEventListener;
+  }
+
+  public void setNoteEventListener(NoteEventListener noteEventListener) {
+    this.noteEventListener = noteEventListener;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java
new file mode 100644
index 0000000..5f98f70
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.scheduler.Job;
+
+/**
+ * NoteEventListener
+ */
+public interface NoteEventListener {
+  public void onParagraphRemove(Paragraph p);
+  public void onParagraphCreate(Paragraph p);
+  public void onParagraphStatusChange(Paragraph p, Job.Status status);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index 3243ba7..ab2ce5d 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -41,6 +41,7 @@ import 
org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry;
 import org.apache.zeppelin.notebook.repo.NotebookRepo;
 import org.apache.zeppelin.notebook.repo.NotebookRepoSync;
 import org.apache.zeppelin.resource.ResourcePoolUtils;
+import org.apache.zeppelin.scheduler.Job;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.apache.zeppelin.search.SearchService;
 import org.apache.zeppelin.user.AuthenticationInfo;
@@ -64,7 +65,7 @@ import com.google.gson.stream.JsonReader;
 /**
  * Collection of Notes.
  */
-public class Notebook {
+public class Notebook implements NoteEventListener {
   static Logger logger = LoggerFactory.getLogger(Notebook.class);
 
   @SuppressWarnings("unused") @Deprecated //TODO(bzz): remove unused
@@ -80,6 +81,8 @@ public class Notebook {
   private NotebookRepo notebookRepo;
   private SearchService notebookIndex;
   private NotebookAuthorization notebookAuthorization;
+  private final List<NotebookEventListener> notebookEventListeners =
+      Collections.synchronizedList(new LinkedList<NotebookEventListener>());
   private Credentials credentials;
 
   /**
@@ -151,7 +154,13 @@ public class Notebook {
    */
   public Note createNote(List<String> interpreterIds, AuthenticationInfo 
subject)
       throws IOException {
-    Note note = new Note(notebookRepo, replFactory, jobListenerFactory, 
notebookIndex, credentials);
+    Note note = new Note(
+        notebookRepo,
+        replFactory,
+        jobListenerFactory,
+        notebookIndex,
+        credentials,
+        this);
     synchronized (notes) {
       notes.put(note.id(), note);
     }
@@ -162,6 +171,7 @@ public class Notebook {
 
     notebookIndex.addIndexDoc(note);
     note.persist(subject);
+    fireNoteCreateEvent(note);
     return note;
   }
   
@@ -216,7 +226,7 @@ public class Notebook {
       logger.error(e.toString(), e);
       throw e;
     }
-    
+
     return newNote;
   }
 
@@ -256,6 +266,13 @@ public class Notebook {
       List<String> interpreterSettingIds) throws IOException {
     Note note = getNote(id);
     if (note != null) {
+      List<InterpreterSetting> currentBindings = 
replFactory.getInterpreterSettings(id);
+      for (InterpreterSetting setting : currentBindings) {
+        if (!interpreterSettingIds.contains(setting.id())) {
+          fireUnbindInterpreter(note, setting);
+        }
+      }
+
       replFactory.setInterpreters(note.getId(), interpreterSettingIds);
       // comment out while note.getNoteReplLoader().setInterpreters(...) do 
the same
       // replFactory.putNoteInterpreterSettingBinding(id, 
interpreterSettingIds);
@@ -303,6 +320,15 @@ public class Notebook {
         // remove paragraph scope object
         for (Paragraph p : note.getParagraphs()) {
           ((RemoteAngularObjectRegistry) 
registry).removeAllAndNotifyRemoteProcess(id, p.getId());
+
+          // remove app scope object
+          List<ApplicationState> appStates = p.getAllApplicationStates();
+          if (appStates != null) {
+            for (ApplicationState app : appStates) {
+              ((RemoteAngularObjectRegistry) 
registry).removeAllAndNotifyRemoteProcess(
+                  id, app.getId());
+            }
+          }
         }
         // remove notebook scope object
         ((RemoteAngularObjectRegistry) 
registry).removeAllAndNotifyRemoteProcess(id, null);
@@ -310,6 +336,14 @@ public class Notebook {
         // remove paragraph scope object
         for (Paragraph p : note.getParagraphs()) {
           registry.removeAll(id, p.getId());
+
+          // remove app scope object
+          List<ApplicationState> appStates = p.getAllApplicationStates();
+          if (appStates != null) {
+            for (ApplicationState app : appStates) {
+              registry.removeAll(id, app.getId());
+            }
+          }
         }
         // remove notebook scope object
         registry.removeAll(id, null);
@@ -318,6 +352,8 @@ public class Notebook {
 
     ResourcePoolUtils.removeResourcesBelongsToNote(id);
 
+    fireNoteRemoveEvent(note);
+
     try {
       note.unpersist(subject);
     } catch (IOException e) {
@@ -379,6 +415,8 @@ public class Notebook {
       }
     }
 
+    note.setNoteEventListener(this);
+
     synchronized (notes) {
       notes.put(note.id(), note);
       refreshCron(note.id());
@@ -402,6 +440,7 @@ public class Notebook {
         }
       }
     }
+
     return note;
   }
 
@@ -737,4 +776,46 @@ public class Notebook {
     this.notebookIndex.close();
   }
 
+  public void addNotebookEventListener(NotebookEventListener listener) {
+    notebookEventListeners.add(listener);
+  }
+
+  private void fireNoteCreateEvent(Note note) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onNoteCreate(note);
+    }
+  }
+
+  private void fireNoteRemoveEvent(Note note) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onNoteRemove(note);
+    }
+  }
+
+  private void fireUnbindInterpreter(Note note, InterpreterSetting setting) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onUnbindInterpreter(note, setting);
+    }
+  }
+
+  @Override
+  public void onParagraphRemove(Paragraph p) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onParagraphRemove(p);
+    }
+  }
+
+  @Override
+  public void onParagraphCreate(Paragraph p) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onParagraphCreate(p);
+    }
+  }
+
+  @Override
+  public void onParagraphStatusChange(Paragraph p, Job.Status status) {
+    for (NotebookEventListener listener : notebookEventListeners) {
+      listener.onParagraphStatusChange(p, status);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java
new file mode 100644
index 0000000..904eba0
--- /dev/null
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.notebook;
+
+import org.apache.zeppelin.interpreter.InterpreterSetting;
+
+/**
+ * Notebook event
+ */
+public interface NotebookEventListener extends NoteEventListener {
+  public void onNoteRemove(Note note);
+  public void onNoteCreate(Note note);
+
+  public void onUnbindInterpreter(Note note, InterpreterSetting setting);
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
index df4765d..d1a7824 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook;
 
 import org.apache.zeppelin.display.AngularObject;
 import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.helium.HeliumPackage;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.apache.zeppelin.user.Credentials;
@@ -61,6 +62,11 @@ public class Paragraph extends Job implements Serializable, 
Cloneable {
   private Map<String, Object> config; // paragraph configs like isOpen, 
colWidth, etc
   public final GUI settings;          // form and parameter settings
 
+  /**
+   * Applicaiton states in this paragraph
+   */
+  private final List<ApplicationState> apps =  new 
LinkedList<ApplicationState>();
+
   @VisibleForTesting
   Paragraph() {
     super(generateId(), null);
@@ -230,7 +236,7 @@ public class Paragraph extends Job implements Serializable, 
Cloneable {
     String replName = getRequiredReplName();
     Interpreter repl = getRepl(replName);
     if (repl != null) {
-      return repl.getProgress(getInterpreterContext());
+      return repl.getProgress(getInterpreterContext(null));
     } else {
       return 0;
     }
@@ -283,7 +289,6 @@ public class Paragraph extends Job implements Serializable, 
Cloneable {
       context.out.flush();
       InterpreterResult.Type outputType = context.out.getType();
       byte[] interpreterOutput = context.out.toByteArray();
-      context.out.clear();
 
       if (interpreterOutput != null && interpreterOutput.length > 0) {
         message = new String(interpreterOutput);
@@ -323,12 +328,44 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
     if (job != null) {
       job.setStatus(Status.ABORT);
     } else {
-      repl.cancel(getInterpreterContext());
+      repl.cancel(getInterpreterContext(null));
     }
     return true;
   }
 
   private InterpreterContext getInterpreterContext() {
+    final Paragraph self = this;
+
+    return getInterpreterContext(new InterpreterOutput(new 
InterpreterOutputListener() {
+      @Override
+      public void onAppend(InterpreterOutput out, byte[] line) {
+        updateParagraphResult(out);
+        ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new 
String(line));
+      }
+
+      @Override
+      public void onUpdate(InterpreterOutput out, byte[] output) {
+        updateParagraphResult(out);
+        ((ParagraphJobListener) getListener()).onOutputUpdate(self, out,
+            new String(output));
+      }
+
+      private void updateParagraphResult(InterpreterOutput out) {
+        // update paragraph result
+        Throwable t = null;
+        String message = null;
+        try {
+          message = new String(out.toByteArray());
+        } catch (IOException e) {
+          logger().error(e.getMessage(), e);
+          t = e;
+        }
+        setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), 
t);
+      }
+    }));
+  }
+
+  private InterpreterContext getInterpreterContext(InterpreterOutput output) {
     AngularObjectRegistry registry = null;
     ResourcePool resourcePool = null;
 
@@ -363,33 +400,7 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
             registry,
             resourcePool,
             runners,
-            new InterpreterOutput(new InterpreterOutputListener() {
-              @Override
-              public void onAppend(InterpreterOutput out, byte[] line) {
-                updateParagraphResult(out);
-                ((ParagraphJobListener) getListener()).onOutputAppend(self, 
out, new String(line));
-              }
-
-              @Override
-              public void onUpdate(InterpreterOutput out, byte[] output) {
-                updateParagraphResult(out);
-                ((ParagraphJobListener) getListener()).onOutputUpdate(self, 
out,
-                        new String(output));
-              }
-
-              private void updateParagraphResult(InterpreterOutput out) {
-                // update paragraph result
-                Throwable t = null;
-                String message = null;
-                try {
-                  message = new String(out.toByteArray());
-                } catch (IOException e) {
-                  logger().error(e.getMessage(), e);
-                  t = e;
-                }
-                setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), 
message), t);
-              }
-            }));
+            output);
     return interpreterContext;
   }
 
@@ -433,6 +444,44 @@ public class Paragraph extends Job implements 
Serializable, Cloneable {
     return paraClone;
   }
 
+  private String getApplicationId(HeliumPackage pkg) {
+    return "app_" + getNote().getId() + "-" + getId() + 
pkg.getName().replaceAll("\\.", "_");
+  }
+
+  public ApplicationState createOrGetApplicationState(HeliumPackage pkg) {
+    synchronized (apps) {
+      for (ApplicationState as : apps) {
+        if (as.equals(pkg)) {
+          return as;
+        }
+      }
+
+      String appId = getApplicationId(pkg);
+      ApplicationState appState = new ApplicationState(appId, pkg);
+      apps.add(appState);
+      return appState;
+    }
+  }
+
+
+  public ApplicationState getApplicationState(String appId) {
+    synchronized (apps) {
+      for (ApplicationState as : apps) {
+        if (as.getId().equals(appId)) {
+          return as;
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public List<ApplicationState> getAllApplicationStates() {
+    synchronized (apps) {
+      return new LinkedList<ApplicationState>(apps);
+    }
+  }
+
   String extractVariablesFromAngularRegistry(String scriptBody, Map<String, 
Input> inputs,
                                              AngularObjectRegistry 
angularRegistry) {
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
index 152e087..a74e6c7 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java
@@ -36,6 +36,7 @@ import org.apache.commons.vfs2.Selectors;
 import org.apache.commons.vfs2.VFS;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars;
+import org.apache.zeppelin.notebook.ApplicationState;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.NoteInfo;
 import org.apache.zeppelin.notebook.Paragraph;
@@ -172,6 +173,15 @@ public class VFSNotebookRepo implements NotebookRepo {
       if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) {
         p.setStatus(Status.ABORT);
       }
+
+      List<ApplicationState> appStates = p.getAllApplicationStates();
+      if (appStates != null) {
+        for (ApplicationState app : appStates) {
+          if (app.getStatus() != ApplicationState.Status.ERROR) {
+            app.setStatus(ApplicationState.Status.UNLOADED);
+          }
+        }
+      }
     }
 
     return note;

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index 320709e..08b3235 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -111,9 +111,15 @@ public class Message {
     CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations
                   // @param settings serialized Map<String, String> object
 
-    CHECKPOINT_NOTEBOOK,     // [c-s] checkpoint notebook to storage repository
-                             // @param noteId
-                             // @param checkpointName
+    CHECKPOINT_NOTEBOOK,    // [c-s] checkpoint notebook to storage repository
+                            // @param noteId
+                            // @param checkpointName
+
+    APP_APPEND_OUTPUT,      // [s-c] append output
+    APP_UPDATE_OUTPUT,      // [s-c] update (replace) output
+    APP_LOAD,               // [s-c] on app load
+    APP_STATUS_CHANGE,      // [s-c] on app status change
+
     LIST_NOTEBOOK_JOBS,     // [c-s] get notebook job management infomations
     LIST_UPDATE_NOTEBOOK_JOBS // [c-s] get job management informations for 
until unixtime
                                // @param unixTime

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
new file mode 100644
index 0000000..602f384
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zeppelin.conf.ZeppelinConfiguration;
+import org.apache.zeppelin.dep.DependencyResolver;
+import org.apache.zeppelin.interpreter.*;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter1;
+import org.apache.zeppelin.interpreter.mock.MockInterpreter2;
+import org.apache.zeppelin.notebook.*;
+import org.apache.zeppelin.notebook.repo.VFSNotebookRepo;
+import org.apache.zeppelin.scheduler.ExecutorFactory;
+import org.apache.zeppelin.scheduler.Job;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+import org.apache.zeppelin.search.SearchService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class HeliumApplicationFactoryTest implements JobListenerFactory {
+  private File tmpDir;
+  private File notebookDir;
+  private ZeppelinConfiguration conf;
+  private SchedulerFactory schedulerFactory;
+  private DependencyResolver depResolver;
+  private InterpreterFactory factory;
+  private VFSNotebookRepo notebookRepo;
+  private Notebook notebook;
+  private HeliumApplicationFactory heliumAppFactory;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new 
File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis());
+    tmpDir.mkdirs();
+    File confDir = new File(tmpDir, "conf");
+    confDir.mkdirs();
+    notebookDir = new File(tmpDir + "/notebook");
+    notebookDir.mkdirs();
+
+    File home = new 
File(getClass().getClassLoader().getResource("note").getFile()) // 
zeppelin/zeppelin-zengine/target/test-classes/note
+        .getParentFile()               // 
zeppelin/zeppelin-zengine/target/test-classes
+        .getParentFile()               // zeppelin/zeppelin-zengine/target
+        .getParentFile()               // zeppelin/zeppelin-zengine
+        .getParentFile();              // zeppelin
+
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), 
home.getAbsolutePath());
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
 tmpDir.getAbsolutePath() + "/conf");
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(),
 notebookDir.getAbsolutePath());
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(),
 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2");
+
+    conf = new ZeppelinConfiguration();
+
+    this.schedulerFactory = new SchedulerFactory();
+
+    MockInterpreter1.register("mock1", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter1");
+    MockInterpreter2.register("mock2", 
"org.apache.zeppelin.interpreter.mock.MockInterpreter2");
+
+
+    heliumAppFactory = new HeliumApplicationFactory();
+    depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + 
"/local-repo");
+    factory = new InterpreterFactory(conf,
+        new InterpreterOption(true), null, null, heliumAppFactory, 
depResolver);
+    HashMap<String, String> env = new HashMap<String, String>();
+    env.put("ZEPPELIN_CLASSPATH", new 
File("./target/test-classes").getAbsolutePath());
+    factory.setEnv(env);
+
+    SearchService search = mock(SearchService.class);
+    notebookRepo = new VFSNotebookRepo(conf);
+    NotebookAuthorization notebookAuthorization = new 
NotebookAuthorization(conf);
+    notebook = new Notebook(
+        conf,
+        notebookRepo,
+        schedulerFactory,
+        factory,
+        this,
+        search,
+        notebookAuthorization,
+        null);
+
+    heliumAppFactory.setNotebook(notebook);
+
+    notebook.addNotebookEventListener(heliumAppFactory);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    List<InterpreterSetting> settings = factory.get();
+    for (InterpreterSetting setting : settings) {
+      for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) {
+        intpGroup.close();
+      }
+    }
+
+    FileUtils.deleteDirectory(tmpDir);
+    
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(),
+        ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue());
+  }
+
+
+  @Test
+  public void testLoadRunUnloadApplication()
+      throws IOException, ApplicationException, InterruptedException {
+    // given
+    HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
+        "name1",
+        "desc1",
+        "",
+        HeliumTestApplication.class.getName(),
+        new String[][]{});
+
+    Note note1 = notebook.createNote(null);
+    
factory.setInterpreters(note1.getId(),factory.getDefaultInterpreterSettingList());
+
+    Paragraph p1 = note1.addParagraph();
+
+    // make sure interpreter process running
+    p1.setText("%mock1 job");
+    note1.run(p1.getId());
+    while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
+
+    assertEquals("repl1: job", p1.getResult().message());
+
+    // when
+    assertEquals(0, p1.getAllApplicationStates().size());
+    String appId = heliumAppFactory.loadAndRun(pkg1, p1);
+    assertEquals(1, p1.getAllApplicationStates().size());
+    ApplicationState app = p1.getApplicationState(appId);
+    Thread.sleep(500); // wait for enough time
+
+    // then
+    assertEquals("Hello world 1", app.getOutput());
+
+    // when
+    heliumAppFactory.run(p1, appId);
+    Thread.sleep(500); // wait for enough time
+
+    // then
+    assertEquals("Hello world 2", app.getOutput());
+
+    // clean
+    heliumAppFactory.unload(p1, appId);
+    notebook.removeNote(note1.getId(), null);
+  }
+
+  @Test
+  public void testUnloadOnParagraphRemove() throws IOException {
+    // given
+    HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
+        "name1",
+        "desc1",
+        "",
+        HeliumTestApplication.class.getName(),
+        new String[][]{});
+
+    Note note1 = notebook.createNote(null);
+    factory.setInterpreters(note1.id(), 
factory.getDefaultInterpreterSettingList());
+
+    Paragraph p1 = note1.addParagraph();
+
+    // make sure interpreter process running
+    p1.setText("%mock1 job");
+    note1.run(p1.getId());
+    while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
+
+    assertEquals(0, p1.getAllApplicationStates().size());
+    String appId = heliumAppFactory.loadAndRun(pkg1, p1);
+    ApplicationState app = p1.getApplicationState(appId);
+    while (app.getStatus() != ApplicationState.Status.LOADED) {
+      Thread.yield();
+    }
+
+    // when remove paragraph
+    note1.removeParagraph(p1.getId());
+
+    // then
+    assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
+
+    // clean
+    notebook.removeNote(note1.getId(), null);
+  }
+
+
+  @Test
+  public void testUnloadOnInterpreterUnbind() throws IOException {
+    // given
+    HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
+        "name1",
+        "desc1",
+        "",
+        HeliumTestApplication.class.getName(),
+        new String[][]{});
+
+    Note note1 = notebook.createNote(null);
+    notebook.bindInterpretersToNote(note1.id(), 
factory.getDefaultInterpreterSettingList());
+
+    Paragraph p1 = note1.addParagraph();
+
+    // make sure interpreter process running
+    p1.setText("%mock1 job");
+    note1.run(p1.getId());
+    while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
+
+    assertEquals(0, p1.getAllApplicationStates().size());
+    String appId = heliumAppFactory.loadAndRun(pkg1, p1);
+    ApplicationState app = p1.getApplicationState(appId);
+    while (app.getStatus() != ApplicationState.Status.LOADED) {
+      Thread.yield();
+    }
+
+    // when unbind interpreter
+    notebook.bindInterpretersToNote(note1.id(), new LinkedList<String>());
+
+    // then
+    assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
+
+    // clean
+    notebook.removeNote(note1.getId(), null);
+  }
+
+
+  @Test
+  public void testUnloadOnInterpreterRestart() throws IOException {
+    // given
+    HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
+        "name1",
+        "desc1",
+        "",
+        HeliumTestApplication.class.getName(),
+        new String[][]{});
+
+    Note note1 = notebook.createNote(null);
+    notebook.bindInterpretersToNote(note1.id(), 
factory.getDefaultInterpreterSettingList());
+    String mock1IntpSettingId = null;
+    for (InterpreterSetting setting : 
notebook.getBindedInterpreterSettings(note1.id())) {
+      if (setting.getName().equals("mock1")) {
+        mock1IntpSettingId = setting.id();
+        break;
+      }
+    }
+
+    Paragraph p1 = note1.addParagraph();
+
+    // make sure interpreter process running
+    p1.setText("%mock1 job");
+    note1.run(p1.getId());
+    while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield();
+    assertEquals(0, p1.getAllApplicationStates().size());
+    String appId = heliumAppFactory.loadAndRun(pkg1, p1);
+    ApplicationState app = p1.getApplicationState(appId);
+    while (app.getStatus() != ApplicationState.Status.LOADED) {
+      Thread.yield();
+    }
+    // wait until application is executed
+    while (!"Hello world 1".equals(app.getOutput())) {
+      Thread.yield();
+    }
+    // when restart interpreter
+    factory.restart(mock1IntpSettingId);
+    while (app.getStatus() == ApplicationState.Status.LOADED) {
+      Thread.yield();
+    }
+    // then
+    assertEquals(ApplicationState.Status.UNLOADED, app.getStatus());
+
+    // clean
+    notebook.removeNote(note1.getId(), null);
+  }
+
+  @Override
+  public ParagraphJobListener getParagraphJobListener(Note note) {
+    return new ParagraphJobListener() {
+      @Override
+      public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, 
String output) {
+      }
+
+      @Override
+      public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, 
String output) {
+
+      }
+
+      @Override
+      public void onProgressUpdate(Job job, int progress) {
+
+      }
+
+      @Override
+      public void beforeStatusChange(Job job, Job.Status before, Job.Status 
after) {
+
+      }
+
+      @Override
+      public void afterStatusChange(Job job, Job.Status before, Job.Status 
after) {
+
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java
new file mode 100644
index 0000000..e90cbc3
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import com.google.gson.Gson;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+
+public class HeliumLocalRegistryTest {
+  private File tmpDir;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" 
+ System.currentTimeMillis());
+    tmpDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  @Test
+  public void testGetAllPackage() throws IOException {
+    // given
+    File r1Path = new File(tmpDir, "r1");
+    HeliumLocalRegistry r1 = new HeliumLocalRegistry("r1", 
r1Path.getAbsolutePath());
+    assertEquals(0, r1.getAll().size());
+
+    // when
+    Gson gson = new Gson();
+    HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION,
+        "app1",
+        "desc1",
+        "artifact1",
+        "classname1",
+        new String[][]{});
+    FileUtils.writeStringToFile(new File(r1Path, "pkg1.json"), 
gson.toJson(pkg1));
+
+    // then
+    assertEquals(1, r1.getAll().size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
new file mode 100644
index 0000000..1ed31c5
--- /dev/null
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zeppelin.helium;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class HeliumTest {
+  private File tmpDir;
+  private File localRegistryPath;
+
+  @Before
+  public void setUp() throws Exception {
+    tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" 
+ System.currentTimeMillis());
+    tmpDir.mkdirs();
+    localRegistryPath = new File(tmpDir, "helium");
+    localRegistryPath.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtils.deleteDirectory(tmpDir);
+  }
+
+  @Test
+  public void testSaveLoadConf() throws IOException, URISyntaxException {
+    // given
+    File heliumConf = new File(tmpDir, "helium.conf");
+    Helium helium = new Helium(heliumConf.getAbsolutePath(), 
localRegistryPath.getAbsolutePath());
+    assertFalse(heliumConf.exists());
+    HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
+    helium.addRegistry(registry1);
+    assertEquals(2, helium.getAllRegistry().size());
+    assertEquals(0, helium.getAllPackageInfo().size());
+
+    // when
+    helium.save();
+
+    // then
+    assertTrue(heliumConf.exists());
+
+    // then
+    Helium heliumRestored = new Helium(heliumConf.getAbsolutePath(), 
localRegistryPath.getAbsolutePath());
+    assertEquals(2, heliumRestored.getAllRegistry().size());
+  }
+
+  @Test
+  public void testRestoreRegistryInstances() throws IOException, 
URISyntaxException {
+    File heliumConf = new File(tmpDir, "helium.conf");
+    Helium helium = new Helium(heliumConf.getAbsolutePath(), 
localRegistryPath.getAbsolutePath());
+    HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1");
+    HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2");
+    helium.addRegistry(registry1);
+    helium.addRegistry(registry2);
+
+    // when
+    registry1.add(new HeliumPackage(
+        HeliumPackage.Type.APPLICATION,
+        "name1",
+        "desc1",
+        "artifact1",
+        "className1",
+        new String[][]{}));
+
+    registry2.add(new HeliumPackage(
+        HeliumPackage.Type.APPLICATION,
+        "name2",
+        "desc2",
+        "artifact2",
+        "className2",
+        new String[][]{}));
+
+    // then
+    assertEquals(2, helium.getAllPackageInfo().size());
+  }
+}

Reply via email to