ZEPPELIN-25 Ability to create rich gui inside of Notebook

This PR implements https://issues.apache.org/jira/browse/ZEPPELIN-25

Here's a short video demo of this feature.
[![IMAGE ALT TEXT 
HERE](http://img.youtube.com/vi/xU5TBS_MsAs/0.jpg)](http://www.youtube.com/watch?v=xU5TBS_MsAs)

for someone who want to try, here's api
```scala

// bind 'varName' variable with 'v' value
z.angularBind(varName: String, v: Object)

// unbind 'varName'
z.angularUnbind(varName: String)

// get value of 'varName'
z.angular(varName:String)

// add watcher to 'varName' variable.
// that is monitoring change and run 'func' when it is changed.
z.angularWatch(varName:String, func: (Object, Object) => Unit))

// remove watcher from 'varName'
z.angularUnwatch(varName:String)
```

Any paragraph's output starting with '%angular'  will be considered as angular 
code. %angular as a interpreter also available.
![image](https://cloud.githubusercontent.com/assets/1540981/7003457/a1e6fe2a-dc95-11e4-8272-380f11c6ae81.png)

Any feedback is welcome!

Author: Lee moon soo <[email protected]>

Closes #27 from Leemoonsoo/angular and squashes the following commits:

04d7175 [Lee moon soo] Remove implicit conversion because of side effect
34fa298 [Lee moon soo] jquery to angular
8076098 [Lee moon soo] Remove unnecessary type information
88fd635 [Lee moon soo] catch and print watcher user provided routine exception
46dba2f [Lee moon soo] Catch sql syntax error
2ebfa59 [Lee moon soo] Let z.run optionally take InterpreterContext
ee29866 [Lee moon soo] ZEPPELIN-32 implement z.show()
dac416d [Lee moon soo] Implement z.run()
0899011 [Lee moon soo] Fix test
0033d32 [Lee moon soo] Add angular interpreter
42ee479 [Lee moon soo] com.nflabs -> org.apache
4d32d19 [Lee moon soo] ZEPPELIN-25 prevent watcher called multiple times
d4d270e [Lee moon soo] ZEPPELIN-25 add unittest
6ce8f36 [Lee moon soo] ZEPPELIN-25 implement watcher
6df7f23 [Lee moon soo] ZEPPELIN-25 broadcast angular object change to related 
notes
5954e29 [Lee moon soo] ZEPPELIN-25 save/restore angular object registry 
snapshot to the notebook file
c288198 [Lee moon soo] ZEPPELIN-25 send scope variables when loading note
67f6926 [Lee moon soo] ZEPPELIN-25 impelemnet JS(angular) -JVM(scala) two-way 
binding
bb52d7b [Lee moon soo] Add %angular display system
a7c77b8 [Lee moon soo] Update license of ScreenCaptureHtmlUnitDriver.java
6d7e063 [Lee moon soo] Add source file license header


Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/58b70e3b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/58b70e3b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/58b70e3b

Branch: refs/heads/master
Commit: 58b70e3bc0b9fb814a5919037318a79ae67f678f
Parents: c0a7d08
Author: Lee moon soo <[email protected]>
Authored: Sat Apr 11 11:15:05 2015 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sun Apr 12 14:58:32 2015 +0900

----------------------------------------------------------------------
 angular/pom.xml                                 |  127 ++
 .../zeppelin/angular/AngularInterpreter.java    |   81 +
 conf/zeppelin-site.xml.template                 |    2 +-
 pom.xml                                         |    1 +
 .../apache/zeppelin/spark/SparkInterpreter.java |    6 +-
 .../zeppelin/spark/SparkSqlInterpreter.java     |   84 +-
 .../apache/zeppelin/spark/ZeppelinContext.java  |  348 ++++-
 .../zeppelin/spark/DepInterpreterTest.java      |    9 +-
 .../zeppelin/spark/SparkInterpreterTest.java    |   10 +-
 .../zeppelin/spark/SparkSqlInterpreterTest.java |   14 +-
 .../apache/zeppelin/display/AngularObject.java  |  129 ++
 .../zeppelin/display/AngularObjectListener.java |   25 +
 .../zeppelin/display/AngularObjectRegistry.java |  106 ++
 .../display/AngularObjectRegistryListener.java  |   28 +
 .../zeppelin/display/AngularObjectWatcher.java  |   37 +
 .../interpreter/InterpreterContext.java         |   19 +-
 .../interpreter/InterpreterContextRunner.java   |   58 +
 .../zeppelin/interpreter/InterpreterGroup.java  |   21 +-
 .../zeppelin/interpreter/InterpreterResult.java |    1 +
 .../remote/InterpreterContextRunnerPool.java    |   88 ++
 .../interpreter/remote/RemoteAngularObject.java |   50 +
 .../remote/RemoteAngularObjectRegistry.java     |   67 +
 .../interpreter/remote/RemoteInterpreter.java   |   20 +-
 .../remote/RemoteInterpreterContextRunner.java  |   38 +
 .../remote/RemoteInterpreterEventPoller.java    |  126 ++
 .../remote/RemoteInterpreterProcess.java        |   50 +-
 .../remote/RemoteInterpreterServer.java         |  140 +-
 .../thrift/RemoteInterpreterContext.java        |  108 +-
 .../thrift/RemoteInterpreterEvent.java          |  502 ++++++
 .../thrift/RemoteInterpreterEventType.java      |   54 +
 .../thrift/RemoteInterpreterService.java        | 1462 ++++++++++++++++++
 .../zeppelin/scheduler/ExecutorFactory.java     |   83 +
 .../zeppelin/scheduler/SchedulerFactory.java    |    9 +-
 .../main/thrift/RemoteInterpreterService.thrift |   19 +-
 .../display/AngularObjectRegistryTest.java      |   67 +
 .../zeppelin/display/AngularObjectTest.java     |   78 +
 .../remote/RemoteAngularObjectTest.java         |  144 ++
 .../remote/RemoteInterpreterProcessTest.java    |   16 +-
 .../remote/RemoteInterpreterTest.java           |   32 +-
 .../remote/mock/MockInterpreterAngular.java     |  117 ++
 .../zeppelin/scheduler/RemoteSchedulerTest.java |   12 +-
 .../apache/zeppelin/server/ZeppelinServer.java  |    2 +-
 .../org/apache/zeppelin/socket/Message.java     |    5 +
 .../apache/zeppelin/socket/NotebookServer.java  |  151 +-
 .../zeppelin/rest/ZeppelinRestApiTest.java      |    2 +-
 zeppelin-web/app/scripts/controllers/main.js    |    4 +-
 .../app/scripts/controllers/notebook.js         |   49 +
 .../app/scripts/controllers/paragraph.js        |   26 +-
 zeppelin-web/app/views/paragraph.html           |    5 +
 .../zeppelin/conf/ZeppelinConfiguration.java    |    1 +
 .../interpreter/InterpreterFactory.java         |   63 +-
 .../interpreter/InterpreterSetting.java         |   10 +-
 .../java/org/apache/zeppelin/notebook/Note.java |   37 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |   77 +-
 .../org/apache/zeppelin/notebook/Paragraph.java |   49 +-
 .../interpreter/InterpreterFactoryTest.java     |    6 +-
 .../apache/zeppelin/notebook/NotebookTest.java  |    4 +-
 57 files changed, 4676 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/pom.xml
----------------------------------------------------------------------
diff --git a/angular/pom.xml b/angular/pom.xml
new file mode 100644
index 0000000..580b848
--- /dev/null
+++ b/angular/pom.xml
@@ -0,0 +1,127 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+  
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>zeppelin</artifactId>
+    <groupId>org.apache.zeppelin</groupId>
+    <version>0.5.0-SNAPSHOT</version>
+  </parent>
+
+  <groupId>org.apache.zeppelin</groupId>
+  <artifactId>zeppelin-angular</artifactId>
+  <packaging>jar</packaging>
+  <version>0.5.0-SNAPSHOT</version>
+  <name>Zeppelin: Angular interpreter</name>
+  <url>http://zeppelin.incubator.apache.org</url>
+
+  <dependencies>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>zeppelin-interpreter</artifactId>
+      <version>${project.version}</version>
+      <scope>provided</scope>
+    </dependency> 
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+    </dependency>
+    
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-deploy-plugin</artifactId>
+        <version>2.7</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.3.1</version>            
+        <executions> 
+          <execution> 
+            <id>enforce</id> 
+            <phase>none</phase> 
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <artifactId>maven-dependency-plugin</artifactId>
+        <version>2.8</version>
+        <executions>
+          <execution>
+            <id>copy-dependencies</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy-dependencies</goal>
+            </goals>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+            </configuration>
+          </execution>
+          <execution>
+            <id>copy-artifact</id>
+            <phase>package</phase>
+            <goals>
+              <goal>copy</goal>
+            </goals>
+            <configuration>
+              
<outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory>
+              <overWriteReleases>false</overWriteReleases>
+              <overWriteSnapshots>false</overWriteSnapshots>
+              <overWriteIfNewer>true</overWriteIfNewer>
+              <includeScope>runtime</includeScope>
+              <artifactItems>
+                <artifactItem>
+                  <groupId>${project.groupId}</groupId>
+                  <artifactId>${project.artifactId}</artifactId>
+                  <version>${project.version}</version>
+                  <type>${project.packaging}</type>
+                </artifactItem>
+              </artifactItems>              
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
----------------------------------------------------------------------
diff --git 
a/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java 
b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
new file mode 100644
index 0000000..c7a406d
--- /dev/null
+++ b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.angular;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import org.apache.zeppelin.interpreter.InterpreterResult.Type;
+import org.apache.zeppelin.scheduler.Scheduler;
+import org.apache.zeppelin.scheduler.SchedulerFactory;
+
+/**
+ *
+ */
+public class AngularInterpreter extends Interpreter {
+  static {
+    Interpreter.register("angular", AngularInterpreter.class.getName());
+  }
+
+  public AngularInterpreter(Properties property) {
+    super(property);
+  }
+
+  @Override
+  public void open() {
+  }
+
+  @Override
+  public void close() {
+  }
+
+  @Override
+  public InterpreterResult interpret(String st, InterpreterContext context) {
+    return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st);
+  }
+
+  @Override
+  public void cancel(InterpreterContext context) {
+  }
+
+  @Override
+  public FormType getFormType() {
+    return FormType.NATIVE;
+  }
+
+  @Override
+  public int getProgress(InterpreterContext context) {
+    return 0;
+  }
+
+  @Override
+  public List<String> completion(String buf, int cursor) {
+    return new LinkedList<String>();
+  }
+
+  @Override
+  public Scheduler getScheduler() {
+    return SchedulerFactory.singleton().createOrGetFIFOScheduler(
+        AngularInterpreter.class.getName() + this.hashCode());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/conf/zeppelin-site.xml.template
----------------------------------------------------------------------
diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template
index 32011b1..13d794c 100644
--- a/conf/zeppelin-site.xml.template
+++ b/conf/zeppelin-site.xml.template
@@ -60,7 +60,7 @@
 
 <property>
   <name>zeppelin.interpreters</name>
-  
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value>
+  
<value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value>
   <description>Comma separated interpreter configurations. First interpreter 
become a default</description>
 </property>
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 20a073e..d2fc77c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
     <module>zeppelin-zengine</module>
     <module>spark</module>
     <module>markdown</module>
+    <module>angular</module>
     <module>shell</module>
     <module>hive</module>
     <module>zeppelin-web</module>

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
index 2461109..b038dd6 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java
@@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter {
                 + "we should set this value")
             .add("zeppelin.spark.useHiveContext", "true",
                  "Use HiveContext instead of SQLContext if it is true.")
+            .add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL 
result to display.")
             .add("args", "", "spark commandline args").build());
 
   }
@@ -398,7 +399,8 @@ public class SparkInterpreter extends Interpreter {
 
     dep = getDependencyResolver();
 
-    z = new ZeppelinContext(sc, sqlc, null, dep, printStream);
+    z = new ZeppelinContext(sc, sqlc, null, dep, printStream,
+        Integer.parseInt(getProperty("zeppelin.spark.maxResult")));
 
     try {
       if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) {
@@ -510,7 +512,7 @@ public class SparkInterpreter extends Interpreter {
   }
 
   String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+    return "zeppelin-" + context.getParagraphId();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java 
b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
index 2555988..618579d 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java
@@ -29,18 +29,15 @@ import org.apache.spark.scheduler.ActiveJob;
 import org.apache.spark.scheduler.DAGScheduler;
 import org.apache.spark.scheduler.Stage;
 import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.SQLContext.QueryExecution;
-import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.ui.jobs.JobProgressListener;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder;
 import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterUtils;
+import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.WrappedInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterResult.Code;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 import org.slf4j.Logger;
@@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter {
   }
 
   private String getJobGroup(InterpreterContext context){
-    return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId();
+    return "zeppelin-" + context.getParagraphId();
   }
 
   private int maxResult;
@@ -126,82 +123,13 @@ public class SparkSqlInterpreter extends Interpreter {
       sc.setLocalProperty("spark.scheduler.pool", null);
     }
 
-    sc.setJobGroup(getJobGroup(context), "Zeppelin", false);
-
-    // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3
-    Object rdd;
-    Object[] rows = null;
     try {
-      rdd = sqlc.sql(st);
-
-      Method take = rdd.getClass().getMethod("take", int.class);
-      rows = (Object[]) take.invoke(rdd, maxResult + 1);
+      Object rdd = sqlc.sql(st);
+      String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult);
+      return new InterpreterResult(Code.SUCCESS, msg);
     } catch (Exception e) {
-      logger.error("Error", e);
-      sc.clearJobGroup();
-      return new InterpreterResult(Code.ERROR, 
InterpreterUtils.getMostRelevantMessage(e));
-    }
-
-    String msg = null;
-
-    // get field names
-    Method queryExecution;
-    QueryExecution qe;
-    try {
-      queryExecution = rdd.getClass().getMethod("queryExecution");
-      qe = (QueryExecution) queryExecution.invoke(rdd);
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    List<Attribute> columns =
-        scala.collection.JavaConverters.asJavaListConverter(
-            qe.analyzed().output()).asJava();
-
-    for (Attribute col : columns) {
-      if (msg == null) {
-        msg = col.name();
-      } else {
-        msg += "\t" + col.name();
-      }
-    }
-
-    msg += "\n";
-
-    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, 
DynamicType,
-    // FloatType, FractionalType, IntegerType, IntegralType, LongType, 
MapType, NativeType,
-    // NullType, NumericType, ShortType, StringType, StructType
-
-    try {
-      for (int r = 0; r < maxResult && r < rows.length; r++) {
-        Object row = rows[r];
-        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
-        Method apply = row.getClass().getMethod("apply", int.class);
-
-        for (int i = 0; i < columns.size(); i++) {
-          if (!(Boolean) isNullAt.invoke(row, i)) {
-            msg += apply.invoke(row, i).toString();
-          } else {
-            msg += "null";
-          }
-          if (i != columns.size() - 1) {
-            msg += "\t";
-          }
-        }
-        msg += "\n";
-      }
-    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
-        | IllegalArgumentException | InvocationTargetException e) {
-      throw new InterpreterException(e);
-    }
-
-    if (rows.length > maxResult) {
-      msg += "\n<font color=red>Results are limited by " + maxResult + 
".</font>";
+      return new InterpreterResult(Code.ERROR, e.getMessage());
     }
-    InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + 
msg);
-    sc.clearJobGroup();
-    return rett;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java 
b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
index 87cd188..2c03f1c 100644
--- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
+++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java
@@ -22,19 +22,31 @@ import static 
scala.collection.JavaConversions.asJavaIterable;
 import static scala.collection.JavaConversions.collectionAsScalaIterable;
 
 import java.io.PrintStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.SQLContext.QueryExecution;
+import org.apache.spark.sql.catalyst.expressions.Attribute;
 import org.apache.spark.sql.hive.HiveContext;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectWatcher;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.display.Input.ParamOption;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.spark.dep.DependencyResolver;
 
 import scala.Tuple2;
+import scala.Unit;
 import scala.collection.Iterable;
 
 /**
@@ -47,15 +59,18 @@ public class ZeppelinContext extends HashMap<String, 
Object> {
   private DependencyResolver dep;
   private PrintStream out;
   private InterpreterContext interpreterContext;
+  private int maxResult;
 
   public ZeppelinContext(SparkContext sc, SQLContext sql,
       InterpreterContext interpreterContext,
-      DependencyResolver dep, PrintStream printStream) {
+      DependencyResolver dep, PrintStream printStream,
+      int maxResult) {
     this.sc = sc;
     this.sqlContext = sql;
     this.interpreterContext = interpreterContext;
     this.dep = dep;
     this.out = printStream;
+    this.maxResult = maxResult;
   }
 
   public SparkContext sc;
@@ -63,12 +78,6 @@ public class ZeppelinContext extends HashMap<String, Object> 
{
   public HiveContext hiveContext;
   private GUI gui;
 
-  /* spark-1.3
-  public SchemaRDD sql(String sql) {
-    return sqlContext.sql(sql);
-  }
-  */
-
   /**
    * Load dependency for interpreter and runtime (driver).
    * And distribute them to spark cluster (sc.add())
@@ -221,25 +230,6 @@ public class ZeppelinContext extends HashMap<String, 
Object> {
     this.gui = o;
   }
 
-  public void run(String lines) {
-    /*
-    String intpName = Paragraph.getRequiredReplName(lines);
-    String scriptBody = Paragraph.getScriptBody(lines);
-    Interpreter intp = interpreterContext.getParagraph().getRepl(intpName);
-    InterpreterResult ret = intp.interpret(scriptBody, interpreterContext);
-    if (ret.code() == InterpreterResult.Code.SUCCESS) {
-      out.println("%" + ret.type().toString().toLowerCase() + " " + 
ret.message());
-    } else if (ret.code() == InterpreterResult.Code.ERROR) {
-      out.println("Error: " + ret.message());
-    } else if (ret.code() == InterpreterResult.Code.INCOMPLETE) {
-      out.println("Incomplete");
-    } else {
-      out.println("Unknown error");
-    }
-    */
-    throw new RuntimeException("Missing implementation");
-  }
-
   private void restartInterpreter() {
   }
 
@@ -251,4 +241,310 @@ public class ZeppelinContext extends HashMap<String, 
Object> {
     this.interpreterContext = interpreterContext;
   }
 
+  public void setMaxResult(int maxResult) {
+    this.maxResult = maxResult;
+  }
+
+  /**
+   * show DataFrame or SchemaRDD
+   * @param o DataFrame or SchemaRDD object
+   */
+  public void show(Object o) {
+    show(o, maxResult);
+  }
+
+  /**
+   * show DataFrame or SchemaRDD
+   * @param o DataFrame or SchemaRDD object
+   * @param maxResult maximum number of rows to display
+   */
+  public void show(Object o, int maxResult) {
+    Class cls = null;
+    try {
+      cls = this.getClass().forName("org.apache.spark.sql.DataFrame");
+    } catch (ClassNotFoundException e) {
+    }
+
+    if (cls == null) {
+      try {
+        cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD");
+      } catch (ClassNotFoundException e) {
+      }
+    }
+
+    if (cls == null) {
+      throw new InterpreterException("Can not road DataFrame/SchemaRDD class");
+    }
+
+    if (cls.isInstance(o)) {
+      out.print(showRDD(sc, interpreterContext, o, maxResult));
+    } else {
+      out.print(o.toString());
+    }
+  }
+
+  public static String showRDD(SparkContext sc,
+      InterpreterContext interpreterContext,
+      Object rdd, int maxResult) {
+    Object[] rows = null;
+    Method take;
+    String jobGroup = "zeppelin-" + interpreterContext.getParagraphId();
+    sc.setJobGroup(jobGroup, "Zeppelin", false);
+
+    try {
+      take = rdd.getClass().getMethod("take", int.class);
+      rows = (Object[]) take.invoke(rdd, maxResult + 1);
+
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      sc.clearJobGroup();
+      throw new InterpreterException(e);
+    }
+
+    String msg = null;
+
+    // get field names
+    Method queryExecution;
+    QueryExecution qe;
+    try {
+      queryExecution = rdd.getClass().getMethod("queryExecution");
+      qe = (QueryExecution) queryExecution.invoke(rdd);
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
+    List<Attribute> columns =
+        scala.collection.JavaConverters.asJavaListConverter(
+            qe.analyzed().output()).asJava();
+
+    for (Attribute col : columns) {
+      if (msg == null) {
+        msg = col.name();
+      } else {
+        msg += "\t" + col.name();
+      }
+    }
+
+    msg += "\n";
+
+    // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, 
DynamicType,
+    // FloatType, FractionalType, IntegerType, IntegralType, LongType, 
MapType, NativeType,
+    // NullType, NumericType, ShortType, StringType, StructType
+
+    try {
+      for (int r = 0; r < maxResult && r < rows.length; r++) {
+        Object row = rows[r];
+        Method isNullAt = row.getClass().getMethod("isNullAt", int.class);
+        Method apply = row.getClass().getMethod("apply", int.class);
+
+        for (int i = 0; i < columns.size(); i++) {
+          if (!(Boolean) isNullAt.invoke(row, i)) {
+            msg += apply.invoke(row, i).toString();
+          } else {
+            msg += "null";
+          }
+          if (i != columns.size() - 1) {
+            msg += "\t";
+          }
+        }
+        msg += "\n";
+      }
+    } catch (NoSuchMethodException | SecurityException | IllegalAccessException
+        | IllegalArgumentException | InvocationTargetException e) {
+      throw new InterpreterException(e);
+    }
+
+    if (rows.length > maxResult) {
+      msg += "\n<font color=red>Results are limited by " + maxResult + 
".</font>";
+    }
+    sc.clearJobGroup();
+    return "%table " + msg;
+  }
+
+  /**
+   * Run paragraph by id
+   * @param id
+   */
+  public void run(String id) {
+    run(id, interpreterContext);
+  }
+
+  /**
+   * Run paragraph by id
+   * @param id
+   * @param context
+   */
+  public void run(String id, InterpreterContext context) {
+    if (id.equals(context.getParagraphId())) {
+      throw new InterpreterException("Can not run current Paragraph");
+    }
+
+    for (InterpreterContextRunner r : context.getRunners()) {
+      if (id.equals(r.getParagraphId())) {
+        r.run();
+        return;
+      }
+    }
+
+    throw new InterpreterException("Paragraph " + id + " not found");
+  }
+
+  /**
+   * Run paragraph at idx
+   * @param idx
+   */
+  public void run(int idx) {
+    run(idx, interpreterContext);
+  }
+
+  /**
+   * Run paragraph at index
+   * @param idx index starting from 0
+   * @param context interpreter context
+   */
+  public void run(int idx, InterpreterContext context) {
+    if (idx >= context.getRunners().size()) {
+      throw new InterpreterException("Index out of bound");
+    }
+
+    InterpreterContextRunner runner = context.getRunners().get(idx);
+    if (runner.getParagraphId().equals(context.getParagraphId())) {
+      throw new InterpreterException("Can not run current Paragraph");
+    }
+
+    runner.run();
+  }
+
+  public void run(List<Object> paragraphIdOrIdx) {
+    run(paragraphIdOrIdx, interpreterContext);
+  }
+
+  /**
+   * Run paragraphs
+   * @param paragraphIdOrIdxs list of paragraph id or idx
+   */
+  public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) {
+    for (Object idOrIdx : paragraphIdOrIdx) {
+      if (idOrIdx instanceof String) {
+        String id = (String) idOrIdx;
+        run(id, context);
+      } else if (idOrIdx instanceof Integer) {
+        Integer idx = (Integer) idOrIdx;
+        run(idx, context);
+      } else {
+        throw new InterpreterException("Paragraph " + idOrIdx + " not found");
+      }
+    }
+  }
+
+  public void runAll() {
+    runAll(interpreterContext);
+  }
+
+  /**
+   * Run all paragraphs. except this.
+   */
+  public void runAll(InterpreterContext context) {
+    for (InterpreterContextRunner r : context.getRunners()) {
+      if (r.getParagraphId().equals(context.getParagraphId())) {
+        // skip itself
+        continue;
+      }
+      r.run();
+    }
+  }
+
+  public List<String> listParagraphs() {
+    List<String> paragraphs = new LinkedList<String>();
+
+    for (InterpreterContextRunner r : interpreterContext.getRunners()) {
+      paragraphs.add(r.getParagraphId());
+    }
+
+    return paragraphs;
+  }
+
+
+
+  public Object angular(String name) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    AngularObject ao = registry.get(name);
+    if (ao == null) {
+      return null;
+    } else {
+      return ao.get();
+    }
+  }
+
+  public void angularBind(String name, Object o) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name) == null) {
+      registry.add(name, o);
+    } else {
+      registry.get(name).set(o);
+    }
+  }
+
+  public void angularBind(String name, Object o, AngularObjectWatcher w) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name) == null) {
+      registry.add(name, o);
+    } else {
+      registry.get(name).set(o);
+    }
+    angularWatch(name, w);
+  }
+
+  public void angularWatch(String name, AngularObjectWatcher w) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name) != null) {
+      registry.get(name).addWatcher(w);
+    }
+  }
+
+
+  public void angularWatch(String name,
+      final scala.Function2<Object, Object, Unit> func) {
+    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) 
{
+      @Override
+      public void watch(Object oldObject, Object newObject,
+          InterpreterContext context) {
+        func.apply(newObject, newObject);
+      }
+    };
+    angularWatch(name, w);
+  }
+
+  public void angularWatch(
+      String name,
+      final scala.Function3<Object, Object, InterpreterContext, Unit> func) {
+    AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) 
{
+      @Override
+      public void watch(Object oldObject, Object newObject,
+          InterpreterContext context) {
+        func.apply(oldObject, newObject, context);
+      }
+    };
+    angularWatch(name, w);
+  }
+
+  public void angularUnwatch(String name, AngularObjectWatcher w) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name) != null) {
+      registry.get(name).removeWatcher(w);
+    }
+  }
+
+  public void angularUnwatch(String name) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    if (registry.get(name) != null) {
+      registry.get(name).clearAllWatchers();
+    }
+  }
+
+  public void angularUnbind(String name) {
+    AngularObjectRegistry registry = 
interpreterContext.getAngularObjectRegistry();
+    registry.remove(name);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
index 8d24cc4..2f8254d 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java
@@ -21,15 +21,16 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Properties;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.spark.DepInterpreter;
-import org.apache.zeppelin.spark.SparkInterpreter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,7 +58,9 @@ public class DepInterpreterTest {
     intpGroup.add(dep);
     dep.setInterpreterGroup(intpGroup);
 
-    context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI());
+    context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LinkedList<InterpreterContextRunner>());
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
index 20f7fa4..a5e0fe2 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java
@@ -22,13 +22,16 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Properties;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Code;
-import org.apache.zeppelin.spark.SparkInterpreter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.FixMethodOrder;
@@ -55,7 +58,10 @@ public class SparkInterpreterTest {
          repl.open();
          }
 
-    context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI());
+         InterpreterGroup intpGroup = new InterpreterGroup();
+    context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI(),
+        new AngularObjectRegistry(intpGroup.getId(), null),
+        new LinkedList<InterpreterContextRunner>());
        }
 
   @After

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
----------------------------------------------------------------------
diff --git 
a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java 
b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
index 71f088d..27198b3 100644
--- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
+++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java
@@ -20,15 +20,16 @@ package org.apache.zeppelin.spark;
 import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Properties;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResult.Type;
-import org.apache.zeppelin.spark.SparkInterpreter;
-import org.apache.zeppelin.spark.SparkSqlInterpreter;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -38,6 +39,7 @@ public class SparkSqlInterpreterTest {
        private SparkSqlInterpreter sql;
   private SparkInterpreter repl;
   private InterpreterContext context;
+  private InterpreterGroup intpGroup;
 
        @Before
        public void setUp() throws Exception {
@@ -55,13 +57,15 @@ public class SparkSqlInterpreterTest {
 
                sql = new SparkSqlInterpreter(p);
 
-               InterpreterGroup intpGroup = new InterpreterGroup();
+               intpGroup = new InterpreterGroup();
                  intpGroup.add(repl);
                  intpGroup.add(sql);
                  sql.setInterpreterGroup(intpGroup);
                  sql.open();
                }
-               context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI());
+               context = new InterpreterContext("id", "title", "text", new 
HashMap<String, Object>(), new GUI(),
+                   new AngularObjectRegistry(intpGroup.getId(), null),
+                   new LinkedList<InterpreterContextRunner>());
        }
 
        @After
@@ -79,7 +83,7 @@ public class SparkSqlInterpreterTest {
                assertEquals(Type.TABLE, ret.type());
                assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message());
 
-               assertEquals(InterpreterResult.Code.ERROR, 
sql.interpret("select wrong syntax", context).code());
+         assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select 
wrong syntax", context).code());
                assertEquals(InterpreterResult.Code.SUCCESS, 
sql.interpret("select case when name==\"aa\" then name else name end from 
people", context).code());
        }
 

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
new file mode 100644
index 0000000..bbfcd1b
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.zeppelin.scheduler.ExecutorFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ *
+ * @param <T>
+ */
+public class AngularObject<T> {
+  private String name;
+  private T object;
+  private transient AngularObjectListener listener;
+  private transient List<AngularObjectWatcher> watchers
+    = new LinkedList<AngularObjectWatcher>();
+
+  protected AngularObject(String name, T o,
+      AngularObjectListener listener) {
+    this.name = name;
+    this.listener = listener;
+    object = o;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof AngularObject) {
+      return name.equals(((AngularObject) o).name);
+    } else {
+      return false;
+    }
+  }
+
+  public Object get() {
+    return object;
+  }
+
+  public void emit(){
+    if (listener != null) {
+      listener.updated(this);
+    }
+  }
+
+  public void set(T o) {
+    set(o, true);
+  }
+
+  public void set(T o, boolean emit) {
+    final T before = object;
+    final T after = o;
+    object = o;
+    if (emit) {
+      emit();
+    }
+
+    final Logger logger = LoggerFactory.getLogger(AngularObject.class);
+    List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>();
+    synchronized (watchers) {
+      ws.addAll(watchers);
+    }
+
+    ExecutorService executor = 
ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50);
+    for (final AngularObjectWatcher w : ws) {
+      executor.submit(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            w.watch(before, after);
+          } catch (Exception e) {
+            logger.error("Exception on watch", e);
+          }
+        }
+      });
+    }
+  }
+
+  public void setListener(AngularObjectListener listener) {
+    this.listener = listener;
+  }
+
+  public AngularObjectListener getListener() {
+    return listener;
+  }
+
+  public void addWatcher(AngularObjectWatcher watcher) {
+    synchronized (watchers) {
+      watchers.add(watcher);
+    }
+  }
+
+  public void removeWatcher(AngularObjectWatcher watcher) {
+    synchronized (watchers) {
+      watchers.remove(watcher);
+    }
+  }
+
+  public void clearAllWatchers() {
+    synchronized (watchers) {
+      watchers.clear();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
new file mode 100644
index 0000000..880e487
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+/**
+ *
+ */
+public interface AngularObjectListener {
+  public void updated(AngularObject updatedObject);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
new file mode 100644
index 0000000..56eca22
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ *
+ *
+ */
+public class AngularObjectRegistry {
+  Map<String, AngularObject> registry = new HashMap<String, AngularObject>();
+  private AngularObjectRegistryListener listener;
+  private String interpreterId;
+
+  AngularObjectListener angularObjectListener;
+
+  public AngularObjectRegistry(final String interpreterId,
+      final AngularObjectRegistryListener listener) {
+    this.interpreterId = interpreterId;
+    this.listener = listener;
+    angularObjectListener = new AngularObjectListener() {
+      @Override
+      public void updated(AngularObject updatedObject) {
+        if (listener != null) {
+          listener.onUpdate(interpreterId, updatedObject);
+        }
+      }
+    };
+  }
+
+  public AngularObjectRegistryListener getListener() {
+    return listener;
+  }
+
+  public AngularObject add(String name, Object o) {
+    return add(name, o, true);
+  }
+
+  public AngularObject add(String name, Object o, boolean emit) {
+    AngularObject ao = createNewAngularObject(name, o);
+
+    synchronized (registry) {
+      registry.put(name, ao);
+      if (listener != null && emit) {
+        listener.onAdd(interpreterId, ao);
+      }
+    }
+
+    return ao;
+  }
+
+  protected AngularObject createNewAngularObject(String name, Object o) {
+    return new AngularObject(name, o, angularObjectListener);
+  }
+
+  protected AngularObjectListener getAngularObjectListener() {
+    return angularObjectListener;
+  }
+
+  public AngularObject remove(String name) {
+    synchronized (registry) {
+      AngularObject o = registry.remove(name);
+      if (listener != null) {
+        listener.onRemove(interpreterId, o);;
+      }
+      return o;
+    }
+  }
+
+  public AngularObject get(String name) {
+    synchronized (registry) {
+      return registry.get(name);
+    }
+  }
+
+  public List<AngularObject> getAll() {
+    List<AngularObject> all = new LinkedList<AngularObject>();
+    synchronized (registry) {
+      all.addAll(registry.values());
+    }
+    return all;
+  }
+
+  public String getInterpreterGroupId() {
+    return interpreterId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
new file mode 100644
index 0000000..3f08efa
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+/**
+ *
+ *
+ */
+public interface AngularObjectRegistryListener {
+  public void onAdd(String interpreterGroupId, AngularObject object);
+  public void onUpdate(String interpreterGroupId, AngularObject object);
+  public void onRemove(String interpreterGroupId, AngularObject object);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
new file mode 100644
index 0000000..c5bd5e2
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.display;
+
+import org.apache.zeppelin.interpreter.InterpreterContext;
+
+/**
+ *
+ */
+public abstract class AngularObjectWatcher {
+  private InterpreterContext context;
+
+  public AngularObjectWatcher(InterpreterContext context) {
+    this.context = context;
+  }
+
+  void watch(Object oldObject, Object newObject) {
+    watch(oldObject, newObject, context);
+  }
+
+  public abstract void watch(Object oldObject, Object newObject, 
InterpreterContext context);
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
index 2d70c8e..2e4564e 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java
@@ -17,8 +17,10 @@
 
 package org.apache.zeppelin.interpreter;
 
+import java.util.List;
 import java.util.Map;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.GUI;
 
 /**
@@ -30,19 +32,24 @@ public class InterpreterContext {
   private final String paragraphText;
   private final Map<String, Object> config;
   private GUI gui;
-
+  private AngularObjectRegistry angularObjectRegistry;
+  private List<InterpreterContextRunner> runners;
 
   public InterpreterContext(String paragraphId,
                             String paragraphTitle,
                             String paragraphText,
                             Map<String, Object> config,
-                            GUI gui
+                            GUI gui,
+                            AngularObjectRegistry angularObjectRegistry,
+                            List<InterpreterContextRunner> runners
                             ) {
     this.paragraphId = paragraphId;
     this.paragraphTitle = paragraphTitle;
     this.paragraphText = paragraphText;
     this.config = config;
     this.gui = gui;
+    this.angularObjectRegistry = angularObjectRegistry;
+    this.runners = runners;
   }
 
   public String getParagraphId() {
@@ -65,4 +72,12 @@ public class InterpreterContext {
     return gui;
   }
 
+  public AngularObjectRegistry getAngularObjectRegistry() {
+    return angularObjectRegistry;
+  }
+
+  public List<InterpreterContextRunner> getRunners() {
+    return runners;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
new file mode 100644
index 0000000..7a2df10
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter;
+
+/**
+ */
+public abstract class InterpreterContextRunner implements Runnable {
+  String noteId;
+  private String paragraphId;
+
+  public InterpreterContextRunner(String noteId, String paragraphId) {
+    this.noteId = noteId;
+    this.paragraphId = paragraphId;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof InterpreterContextRunner) {
+      InterpreterContextRunner io = ((InterpreterContextRunner) o);
+      if (io.getParagraphId().equals(paragraphId) &&
+          io.getNoteId().equals(noteId)) {
+        return true;
+      } else {
+        return false;
+      }
+
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public abstract void run();
+
+  public String getNoteId() {
+    return noteId;
+  }
+
+  public String getParagraphId() {
+    return paragraphId;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
index 834630a..9baaef3 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java
@@ -21,6 +21,8 @@ import java.util.LinkedList;
 import java.util.Properties;
 import java.util.Random;
 
+import org.apache.zeppelin.display.AngularObjectRegistry;
+
 /**
  * InterpreterGroup is list of interpreters in the same group.
  * And unit of interpreter instantiate, restart, bind, unbind.
@@ -28,6 +30,16 @@ import java.util.Random;
 public class InterpreterGroup extends LinkedList<Interpreter>{
   String id;
 
+  AngularObjectRegistry angularObjectRegistry;
+
+  public InterpreterGroup(String id) {
+    this.id = id;
+  }
+
+  public InterpreterGroup() {
+    getId();
+  }
+
   private static String generateId() {
     return "InterpreterGroup_" + System.currentTimeMillis() + "_"
            + new Random().nextInt();
@@ -42,7 +54,6 @@ public class InterpreterGroup extends LinkedList<Interpreter>{
     }
   }
 
-
   public Properties getProperty() {
     Properties p = new Properties();
     for (Interpreter intp : this) {
@@ -51,6 +62,14 @@ public class InterpreterGroup extends 
LinkedList<Interpreter>{
     return p;
   }
 
+  public AngularObjectRegistry getAngularObjectRegistry() {
+    return angularObjectRegistry;
+  }
+
+  public void setAngularObjectRegistry(AngularObjectRegistry 
angularObjectRegistry) {
+    this.angularObjectRegistry = angularObjectRegistry;
+  }
+
   public void close() {
     for (Interpreter intp : this) {
       intp.close();

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
index 0659a47..5d8d96f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java
@@ -48,6 +48,7 @@ public class InterpreterResult implements Serializable {
   public static enum Type {
     TEXT,
     HTML,
+    ANGULAR,
     TABLE,
     IMG,
     SVG,

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
new file mode 100644
index 0000000..ca2df12
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class InterpreterContextRunnerPool {
+  Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class);
+  private Map<String, List<InterpreterContextRunner>> 
interpreterContextRunners;
+
+  public InterpreterContextRunnerPool() {
+    interpreterContextRunners = new HashMap<String, 
List<InterpreterContextRunner>>();
+
+  }
+
+  // add runner
+  public void add(String noteId, InterpreterContextRunner runner) {
+    synchronized (interpreterContextRunners) {
+      if (!interpreterContextRunners.containsKey(noteId)) {
+        interpreterContextRunners.put(noteId, new 
LinkedList<InterpreterContextRunner>());
+      }
+
+      interpreterContextRunners.get(noteId).add(runner);
+    }
+  }
+
+  // replace all runners to noteId
+  public void addAll(String noteId, List<InterpreterContextRunner> runners) {
+    synchronized (interpreterContextRunners) {
+      if (!interpreterContextRunners.containsKey(noteId)) {
+        interpreterContextRunners.put(noteId, new 
LinkedList<InterpreterContextRunner>());
+      }
+
+      interpreterContextRunners.get(noteId).addAll(runners);
+    }
+  }
+
+  public void clear(String noteId) {
+    synchronized (interpreterContextRunners) {
+      interpreterContextRunners.remove(noteId);
+    }
+  }
+
+
+  public void run(String noteId, String paragraphId) {
+    synchronized (interpreterContextRunners) {
+      List<InterpreterContextRunner> list = 
interpreterContextRunners.get(noteId);
+      if (list != null) {
+        for (InterpreterContextRunner r : list) {
+          if (noteId.equals(r.getNoteId()) && 
paragraphId.equals(r.getParagraphId())) {
+            logger.info("run paragraph {} on note {} from InterpreterContext",
+                r.getParagraphId(), r.getNoteId());
+            r.run();
+            return;
+          }
+        }
+      }
+
+      throw new InterpreterException("Can not run paragraph " + paragraphId + 
" on " + noteId);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
new file mode 100644
index 0000000..3abd764
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectListener;
+
+/**
+ *
+ */
+public class RemoteAngularObject extends AngularObject {
+
+  private transient RemoteInterpreterProcess remoteInterpreterProcess;
+
+  RemoteAngularObject(String name, Object o, String interpreterGroupId,
+      AngularObjectListener listener,
+      RemoteInterpreterProcess remoteInterpreterProcess) {
+    super(name, o, listener);
+    this.remoteInterpreterProcess = remoteInterpreterProcess;
+  }
+
+  @Override
+  public void set(Object o, boolean emit) {
+    set(o,  emit, true);
+  }
+
+  public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) {
+    super.set(o, emitWeb);
+
+    if (emitRemoteProcess) {
+      // send updated value to remote interpreter
+      remoteInterpreterProcess.updateRemoteAngularObject(getName(), o);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
new file mode 100644
index 0000000..c711f69
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.display.AngularObjectRegistryListener;
+import org.apache.zeppelin.interpreter.Interpreter;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.WrappedInterpreter;
+
+/**
+ *
+ */
+public class RemoteAngularObjectRegistry extends AngularObjectRegistry {
+
+  private InterpreterGroup interpreterGroup;
+
+  public RemoteAngularObjectRegistry(String interpreterId,
+      AngularObjectRegistryListener listener,
+      InterpreterGroup interpreterGroup) {
+    super(interpreterId, listener);
+    this.interpreterGroup = interpreterGroup;
+  }
+
+  private RemoteInterpreterProcess getRemoteInterpreterProcess() {
+    if (interpreterGroup.size() == 0) {
+      throw new RuntimeException("Can't get remoteInterpreterProcess");
+    }
+    Interpreter p = interpreterGroup.get(0);
+    while (p instanceof WrappedInterpreter) {
+      p = ((WrappedInterpreter) p).getInnerInterpreter();
+    }
+
+    if (p instanceof RemoteInterpreter) {
+      return ((RemoteInterpreter) p).getInterpreterProcess();
+    } else {
+      throw new RuntimeException("Can't get remoteInterpreterProcess");
+    }
+  }
+
+  @Override
+  protected AngularObject createNewAngularObject(String name, Object o) {
+    RemoteInterpreterProcess remoteInterpreterProcess = 
getRemoteInterpreterProcess();
+    if (remoteInterpreterProcess == null) {
+      throw new RuntimeException("Remote Interpreter process not found");
+    }
+    return new RemoteAngularObject(name, o, getInterpreterGroupId(),
+        getAngularObjectListener(),
+        getRemoteInterpreterProcess());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
index e905d5f..3e6128f 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java
@@ -26,6 +26,7 @@ import org.apache.thrift.TException;
 import org.apache.zeppelin.display.GUI;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -56,6 +57,8 @@ public class RemoteInterpreter extends Interpreter {
   static Map<String, RemoteInterpreterProcess> interpreterGroupReference
     = new HashMap<String, RemoteInterpreterProcess>();
 
+  private InterpreterContextRunnerPool interpreterContextRunnerPool;
+
   public RemoteInterpreter(Properties property,
       String className,
       String interpreterRunner,
@@ -67,6 +70,7 @@ public class RemoteInterpreter extends Interpreter {
     this.interpreterRunner = interpreterRunner;
     this.interpreterPath = interpreterPath;
     env = new HashMap<String, String>();
+    interpreterContextRunnerPool = new InterpreterContextRunnerPool();
   }
 
   public RemoteInterpreter(Properties property,
@@ -119,7 +123,7 @@ public class RemoteInterpreter extends Interpreter {
       }
     }
 
-    int rc = interpreterProcess.reference();
+    int rc = interpreterProcess.reference(getInterpreterGroup());
 
     synchronized (interpreterProcess) {
       // when first process created
@@ -187,6 +191,15 @@ public class RemoteInterpreter extends Interpreter {
       throw new InterpreterException(e1);
     }
 
+    List<InterpreterContextRunner> runners = context.getRunners();
+    if (runners != null && runners.size() != 0) {
+      // assume all runners in this InterpreterContext have the same note id
+      String noteId = runners.get(0).getNoteId();
+
+      interpreterContextRunnerPool.clear(noteId);
+      interpreterContextRunnerPool.addAll(noteId, runners);
+    }
+
     try {
       GUI settings = context.getGui();
       RemoteInterpreterResult remoteResult = client.interpret(className, st, 
convert(context));
@@ -316,7 +329,7 @@ public class RemoteInterpreter extends Interpreter {
           .containsKey(getInterpreterGroupKey(interpreterGroup))) {
         interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup),
             new RemoteInterpreterProcess(interpreterRunner,
-                interpreterPath, env));
+                interpreterPath, env, interpreterContextRunnerPool));
 
         logger.info("setInterpreterGroup = "
             + getInterpreterGroupKey(interpreterGroup) + " class=" + className
@@ -335,7 +348,8 @@ public class RemoteInterpreter extends Interpreter {
         ic.getParagraphTitle(),
         ic.getParagraphText(),
         gson.toJson(ic.getConfig()),
-        gson.toJson(ic.getGui()));
+        gson.toJson(ic.getGui()),
+        gson.toJson(ic.getRunners()));
   }
 
   private InterpreterResult convert(RemoteInterpreterResult result) {

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
new file mode 100644
index 0000000..8d16ec5
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterException;
+
+/**
+ *
+ */
+public class RemoteInterpreterContextRunner extends InterpreterContextRunner {
+
+  public RemoteInterpreterContextRunner(String noteId, String paragraphId) {
+    super(noteId, paragraphId);
+  }
+
+  @Override
+  public void run() {
+    // this class should be used only for gson deserialize abstract class
+    // code should not reach here
+    throw new InterpreterException("Assert");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
new file mode 100644
index 0000000..dc9ef0b
--- /dev/null
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.remote;
+
+import org.apache.thrift.TException;
+import org.apache.zeppelin.display.AngularObject;
+import org.apache.zeppelin.display.AngularObjectRegistry;
+import org.apache.zeppelin.interpreter.InterpreterContextRunner;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType;
+import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.gson.Gson;
+
+/**
+ *
+ */
+public class RemoteInterpreterEventPoller extends Thread {
+  Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class);
+  private RemoteInterpreterProcess interpreterProcess;
+  boolean shutdown;
+  private InterpreterGroup interpreterGroup;
+
+  public RemoteInterpreterEventPoller(
+      InterpreterGroup interpreterGroup,
+      RemoteInterpreterProcess interpreterProcess) {
+    this.interpreterGroup = interpreterGroup;
+    this.interpreterProcess = interpreterProcess;
+    shutdown = false;
+  }
+
+  @Override
+  public void run() {
+    Client client = null;
+
+    while (shutdown == false) {
+      try {
+        client = interpreterProcess.getClient();
+      } catch (Exception e1) {
+        logger.error("Can't get RemoteInterpreterEvent", e1);
+        try {
+          synchronized (this) {
+            wait(1000);
+          }
+        } catch (InterruptedException e) {
+        }
+        continue;
+      }
+
+      RemoteInterpreterEvent event = null;
+      try {
+        event = client.getEvent();
+      } catch (TException e) {
+        logger.error("Can't get RemoteInterpreterEvent", e);
+        try {
+          synchronized (this) {
+            wait(1000);
+          }
+        } catch (InterruptedException e1) {
+        }
+        continue;
+      }
+
+      interpreterProcess.releaseClient(client);
+
+      Gson gson = new Gson();
+
+      AngularObjectRegistry angularObjectRegistry = 
interpreterGroup.getAngularObjectRegistry();
+
+      try {
+        if (event.getType() == RemoteInterpreterEventType.NO_OP) {
+          continue;
+        } else if (event.getType() == 
RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) {
+          AngularObject angularObject = gson.fromJson(event.getData(), 
AngularObject.class);
+          angularObjectRegistry.add(angularObject.getName(), 
angularObject.get());
+        } else if (event.getType() == 
RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) {
+          AngularObject angularObject = gson.fromJson(event.getData(), 
AngularObject.class);
+          AngularObject localAngularObject = 
angularObjectRegistry.get(angularObject.getName());
+          if (localAngularObject instanceof RemoteAngularObject) {
+            // to avoid ping-pong loop
+            ((RemoteAngularObject) localAngularObject).set(
+                angularObject.get(), true, false);
+          } else {
+            localAngularObject.set(angularObject.get());
+          }
+        } else if (event.getType() == 
RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) {
+          AngularObject angularObject = gson.fromJson(event.getData(), 
AngularObject.class);
+          angularObjectRegistry.remove(angularObject.getName());
+        } else if (event.getType() == 
RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) {
+          InterpreterContextRunner runnerFromRemote = gson.fromJson(
+              event.getData(), RemoteInterpreterContextRunner.class);
+
+          interpreterProcess.getInterpreterContextRunnerPool().run(
+              runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId());
+        }
+      } catch (Exception e) {
+        logger.error("Can't handle event " + event, e);
+      }
+    }
+  }
+
+  public void shutdown() {
+    shutdown = true;
+    synchronized (this) {
+      notify();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
index a128cd7..dbfaa35 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java
@@ -28,11 +28,15 @@ import org.apache.commons.exec.ExecuteResultHandler;
 import org.apache.commons.exec.ExecuteWatchdog;
 import org.apache.commons.exec.environment.EnvironmentUtils;
 import org.apache.commons.pool2.impl.GenericObjectPool;
+import org.apache.thrift.TException;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.gson.Gson;
+
 /**
  *
  */
@@ -48,11 +52,17 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
 
   private GenericObjectPool<Client> clientPool;
   private Map<String, String> env;
+  private RemoteInterpreterEventPoller remoteInterpreterEventPoller;
+  private InterpreterContextRunnerPool interpreterContextRunnerPool;
 
-  public RemoteInterpreterProcess(String intpRunner, String intpDir, 
Map<String, String> env) {
+  public RemoteInterpreterProcess(String intpRunner,
+      String intpDir,
+      Map<String, String> env,
+      InterpreterContextRunnerPool interpreterContextRunnerPool) {
     this.interpreterRunner = intpRunner;
     this.interpreterDir = intpDir;
     this.env = env;
+    this.interpreterContextRunnerPool = interpreterContextRunnerPool;
     referenceCount = new AtomicInteger(0);
   }
 
@@ -60,7 +70,7 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
     return port;
   }
 
-  public int reference() {
+  public int reference(InterpreterGroup interpreterGroup) {
     synchronized (referenceCount) {
       if (executor == null) {
         // start server process
@@ -108,6 +118,9 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
         }
 
         clientPool = new GenericObjectPool<Client>(new 
ClientFactory("localhost", port));
+
+        remoteInterpreterEventPoller = new 
RemoteInterpreterEventPoller(interpreterGroup, this);
+        remoteInterpreterEventPoller.start();
       }
       return referenceCount.incrementAndGet();
     }
@@ -126,6 +139,8 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
       int r = referenceCount.decrementAndGet();
       if (r == 0) {
         logger.info("shutdown interpreter process");
+        remoteInterpreterEventPoller.shutdown();
+
         // first try shutdown
         try {
           Client client = getClient();
@@ -205,4 +220,35 @@ public class RemoteInterpreterProcess implements 
ExecuteResultHandler {
       return clientPool.getNumIdle();
     }
   }
+
+  /**
+   * Called when angular object is updated in client side to propagate
+   * change to the remote process
+   * @param name
+   * @param o
+   */
+  public void updateRemoteAngularObject(String name, Object o) {
+    Client client = null;
+    try {
+      client = getClient();
+    } catch (NullPointerException e) {
+      // remote process not started
+      return;
+    } catch (Exception e) {
+      logger.error("Can't update angular object", e);
+    }
+
+    try {
+      Gson gson = new Gson();
+      client.angularObjectUpdate(name, gson.toJson(o));
+    } catch (TException e) {
+      logger.error("Can't update angular object", e);
+    } finally {
+      releaseClient(client);
+    }
+  }
+
+  public InterpreterContextRunnerPool getInterpreterContextRunnerPool() {
+    return interpreterContextRunnerPool;
+  }
 }

Reply via email to