ZEPPELIN-25 Ability to create rich gui inside of Notebook This PR implements https://issues.apache.org/jira/browse/ZEPPELIN-25
Here's a short video demo of this feature. [](http://www.youtube.com/watch?v=xU5TBS_MsAs) for someone who want to try, here's api ```scala // bind 'varName' variable with 'v' value z.angularBind(varName: String, v: Object) // unbind 'varName' z.angularUnbind(varName: String) // get value of 'varName' z.angular(varName:String) // add watcher to 'varName' variable. // that is monitoring change and run 'func' when it is changed. z.angularWatch(varName:String, func: (Object, Object) => Unit)) // remove watcher from 'varName' z.angularUnwatch(varName:String) ``` Any paragraph's output starting with '%angular' will be considered as angular code. %angular as a interpreter also available.  Any feedback is welcome! Author: Lee moon soo <[email protected]> Closes #27 from Leemoonsoo/angular and squashes the following commits: 04d7175 [Lee moon soo] Remove implicit conversion because of side effect 34fa298 [Lee moon soo] jquery to angular 8076098 [Lee moon soo] Remove unnecessary type information 88fd635 [Lee moon soo] catch and print watcher user provided routine exception 46dba2f [Lee moon soo] Catch sql syntax error 2ebfa59 [Lee moon soo] Let z.run optionally take InterpreterContext ee29866 [Lee moon soo] ZEPPELIN-32 implement z.show() dac416d [Lee moon soo] Implement z.run() 0899011 [Lee moon soo] Fix test 0033d32 [Lee moon soo] Add angular interpreter 42ee479 [Lee moon soo] com.nflabs -> org.apache 4d32d19 [Lee moon soo] ZEPPELIN-25 prevent watcher called multiple times d4d270e [Lee moon soo] ZEPPELIN-25 add unittest 6ce8f36 [Lee moon soo] ZEPPELIN-25 implement watcher 6df7f23 [Lee moon soo] ZEPPELIN-25 broadcast angular object change to related notes 5954e29 [Lee moon soo] ZEPPELIN-25 save/restore angular object registry snapshot to the notebook file c288198 [Lee moon soo] ZEPPELIN-25 send scope variables when loading note 67f6926 [Lee moon soo] ZEPPELIN-25 impelemnet JS(angular) -JVM(scala) two-way binding bb52d7b [Lee moon soo] Add %angular display system a7c77b8 [Lee moon soo] Update license of ScreenCaptureHtmlUnitDriver.java 6d7e063 [Lee moon soo] Add source file license header Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/58b70e3b Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/58b70e3b Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/58b70e3b Branch: refs/heads/master Commit: 58b70e3bc0b9fb814a5919037318a79ae67f678f Parents: c0a7d08 Author: Lee moon soo <[email protected]> Authored: Sat Apr 11 11:15:05 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Apr 12 14:58:32 2015 +0900 ---------------------------------------------------------------------- angular/pom.xml | 127 ++ .../zeppelin/angular/AngularInterpreter.java | 81 + conf/zeppelin-site.xml.template | 2 +- pom.xml | 1 + .../apache/zeppelin/spark/SparkInterpreter.java | 6 +- .../zeppelin/spark/SparkSqlInterpreter.java | 84 +- .../apache/zeppelin/spark/ZeppelinContext.java | 348 ++++- .../zeppelin/spark/DepInterpreterTest.java | 9 +- .../zeppelin/spark/SparkInterpreterTest.java | 10 +- .../zeppelin/spark/SparkSqlInterpreterTest.java | 14 +- .../apache/zeppelin/display/AngularObject.java | 129 ++ .../zeppelin/display/AngularObjectListener.java | 25 + .../zeppelin/display/AngularObjectRegistry.java | 106 ++ .../display/AngularObjectRegistryListener.java | 28 + .../zeppelin/display/AngularObjectWatcher.java | 37 + .../interpreter/InterpreterContext.java | 19 +- .../interpreter/InterpreterContextRunner.java | 58 + .../zeppelin/interpreter/InterpreterGroup.java | 21 +- .../zeppelin/interpreter/InterpreterResult.java | 1 + .../remote/InterpreterContextRunnerPool.java | 88 ++ .../interpreter/remote/RemoteAngularObject.java | 50 + .../remote/RemoteAngularObjectRegistry.java | 67 + .../interpreter/remote/RemoteInterpreter.java | 20 +- .../remote/RemoteInterpreterContextRunner.java | 38 + .../remote/RemoteInterpreterEventPoller.java | 126 ++ .../remote/RemoteInterpreterProcess.java | 50 +- .../remote/RemoteInterpreterServer.java | 140 +- .../thrift/RemoteInterpreterContext.java | 108 +- .../thrift/RemoteInterpreterEvent.java | 502 ++++++ .../thrift/RemoteInterpreterEventType.java | 54 + .../thrift/RemoteInterpreterService.java | 1462 ++++++++++++++++++ .../zeppelin/scheduler/ExecutorFactory.java | 83 + .../zeppelin/scheduler/SchedulerFactory.java | 9 +- .../main/thrift/RemoteInterpreterService.thrift | 19 +- .../display/AngularObjectRegistryTest.java | 67 + .../zeppelin/display/AngularObjectTest.java | 78 + .../remote/RemoteAngularObjectTest.java | 144 ++ .../remote/RemoteInterpreterProcessTest.java | 16 +- .../remote/RemoteInterpreterTest.java | 32 +- .../remote/mock/MockInterpreterAngular.java | 117 ++ .../zeppelin/scheduler/RemoteSchedulerTest.java | 12 +- .../apache/zeppelin/server/ZeppelinServer.java | 2 +- .../org/apache/zeppelin/socket/Message.java | 5 + .../apache/zeppelin/socket/NotebookServer.java | 151 +- .../zeppelin/rest/ZeppelinRestApiTest.java | 2 +- zeppelin-web/app/scripts/controllers/main.js | 4 +- .../app/scripts/controllers/notebook.js | 49 + .../app/scripts/controllers/paragraph.js | 26 +- zeppelin-web/app/views/paragraph.html | 5 + .../zeppelin/conf/ZeppelinConfiguration.java | 1 + .../interpreter/InterpreterFactory.java | 63 +- .../interpreter/InterpreterSetting.java | 10 +- .../java/org/apache/zeppelin/notebook/Note.java | 37 +- .../org/apache/zeppelin/notebook/Notebook.java | 77 +- .../org/apache/zeppelin/notebook/Paragraph.java | 49 +- .../interpreter/InterpreterFactoryTest.java | 6 +- .../apache/zeppelin/notebook/NotebookTest.java | 4 +- 57 files changed, 4676 insertions(+), 203 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/pom.xml ---------------------------------------------------------------------- diff --git a/angular/pom.xml b/angular/pom.xml new file mode 100644 index 0000000..580b848 --- /dev/null +++ b/angular/pom.xml @@ -0,0 +1,127 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>zeppelin</artifactId> + <groupId>org.apache.zeppelin</groupId> + <version>0.5.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.zeppelin</groupId> + <artifactId>zeppelin-angular</artifactId> + <packaging>jar</packaging> + <version>0.5.0-SNAPSHOT</version> + <name>Zeppelin: Angular interpreter</name> + <url>http://zeppelin.incubator.apache.org</url> + + <dependencies> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>zeppelin-interpreter</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-deploy-plugin</artifactId> + <version>2.7</version> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <artifactId>maven-dependency-plugin</artifactId> + <version>2.8</version> + <executions> + <execution> + <id>copy-dependencies</id> + <phase>package</phase> + <goals> + <goal>copy-dependencies</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + </configuration> + </execution> + <execution> + <id>copy-artifact</id> + <phase>package</phase> + <goals> + <goal>copy</goal> + </goals> + <configuration> + <outputDirectory>${project.build.directory}/../../interpreter/angular</outputDirectory> + <overWriteReleases>false</overWriteReleases> + <overWriteSnapshots>false</overWriteSnapshots> + <overWriteIfNewer>true</overWriteIfNewer> + <includeScope>runtime</includeScope> + <artifactItems> + <artifactItem> + <groupId>${project.groupId}</groupId> + <artifactId>${project.artifactId}</artifactId> + <version>${project.version}</version> + <type>${project.packaging}</type> + </artifactItem> + </artifactItems> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java ---------------------------------------------------------------------- diff --git a/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java new file mode 100644 index 0000000..c7a406d --- /dev/null +++ b/angular/src/main/java/org/apache/zeppelin/angular/AngularInterpreter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.angular; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; + +/** + * + */ +public class AngularInterpreter extends Interpreter { + static { + Interpreter.register("angular", AngularInterpreter.class.getName()); + } + + public AngularInterpreter(Properties property) { + super(property); + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return new InterpreterResult(Code.SUCCESS, Type.ANGULAR, st); + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<String> completion(String buf, int cursor) { + return new LinkedList<String>(); + } + + @Override + public Scheduler getScheduler() { + return SchedulerFactory.singleton().createOrGetFIFOScheduler( + AngularInterpreter.class.getName() + this.hashCode()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index 32011b1..13d794c 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -60,7 +60,7 @@ <property> <name>zeppelin.interpreters</name> - <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value> + <value>org.apache.zeppelin.spark.SparkInterpreter,org.apache.zeppelin.spark.PySparkInterpreter,org.apache.zeppelin.spark.SparkSqlInterpreter,org.apache.zeppelin.spark.DepInterpreter,org.apache.zeppelin.markdown.Markdown,org.apache.zeppelin.angular.AngularInterpreter,org.apache.zeppelin.shell.ShellInterpreter,org.apache.zeppelin.hive.HiveInterpreter</value> <description>Comma separated interpreter configurations. First interpreter become a default</description> </property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 20a073e..d2fc77c 100644 --- a/pom.xml +++ b/pom.xml @@ -88,6 +88,7 @@ <module>zeppelin-zengine</module> <module>spark</module> <module>markdown</module> + <module>angular</module> <module>shell</module> <module>hive</module> <module>zeppelin-web</module> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 2461109..b038dd6 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -107,6 +107,7 @@ public class SparkInterpreter extends Interpreter { + "we should set this value") .add("zeppelin.spark.useHiveContext", "true", "Use HiveContext instead of SQLContext if it is true.") + .add("zeppelin.spark.maxResult", "1000", "Max number of SparkSQL result to display.") .add("args", "", "spark commandline args").build()); } @@ -398,7 +399,8 @@ public class SparkInterpreter extends Interpreter { dep = getDependencyResolver(); - z = new ZeppelinContext(sc, sqlc, null, dep, printStream); + z = new ZeppelinContext(sc, sqlc, null, dep, printStream, + Integer.parseInt(getProperty("zeppelin.spark.maxResult"))); try { if (sc.version().startsWith("1.1") || sc.version().startsWith("1.2")) { @@ -510,7 +512,7 @@ public class SparkInterpreter extends Interpreter { } String getJobGroup(InterpreterContext context){ - return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId(); + return "zeppelin-" + context.getParagraphId(); } /** http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index 2555988..618579d 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -29,18 +29,15 @@ import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Stage; import org.apache.spark.sql.SQLContext; -import org.apache.spark.sql.SQLContext.QueryExecution; -import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.ui.jobs.JobProgressListener; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterUtils; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; @@ -76,7 +73,7 @@ public class SparkSqlInterpreter extends Interpreter { } private String getJobGroup(InterpreterContext context){ - return "zeppelin-" + this.hashCode() + "-" + context.getParagraphId(); + return "zeppelin-" + context.getParagraphId(); } private int maxResult; @@ -126,82 +123,13 @@ public class SparkSqlInterpreter extends Interpreter { sc.setLocalProperty("spark.scheduler.pool", null); } - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); - - // SchemaRDD - spark 1.1, 1.2, DataFrame - spark 1.3 - Object rdd; - Object[] rows = null; try { - rdd = sqlc.sql(st); - - Method take = rdd.getClass().getMethod("take", int.class); - rows = (Object[]) take.invoke(rdd, maxResult + 1); + Object rdd = sqlc.sql(st); + String msg = ZeppelinContext.showRDD(sc, context, rdd, maxResult); + return new InterpreterResult(Code.SUCCESS, msg); } catch (Exception e) { - logger.error("Error", e); - sc.clearJobGroup(); - return new InterpreterResult(Code.ERROR, InterpreterUtils.getMostRelevantMessage(e)); - } - - String msg = null; - - // get field names - Method queryExecution; - QueryExecution qe; - try { - queryExecution = rdd.getClass().getMethod("queryExecution"); - qe = (QueryExecution) queryExecution.invoke(rdd); - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); - } - - List<Attribute> columns = - scala.collection.JavaConverters.asJavaListConverter( - qe.analyzed().output()).asJava(); - - for (Attribute col : columns) { - if (msg == null) { - msg = col.name(); - } else { - msg += "\t" + col.name(); - } - } - - msg += "\n"; - - // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType, - // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType, - // NullType, NumericType, ShortType, StringType, StructType - - try { - for (int r = 0; r < maxResult && r < rows.length; r++) { - Object row = rows[r]; - Method isNullAt = row.getClass().getMethod("isNullAt", int.class); - Method apply = row.getClass().getMethod("apply", int.class); - - for (int i = 0; i < columns.size(); i++) { - if (!(Boolean) isNullAt.invoke(row, i)) { - msg += apply.invoke(row, i).toString(); - } else { - msg += "null"; - } - if (i != columns.size() - 1) { - msg += "\t"; - } - } - msg += "\n"; - } - } catch (NoSuchMethodException | SecurityException | IllegalAccessException - | IllegalArgumentException | InvocationTargetException e) { - throw new InterpreterException(e); - } - - if (rows.length > maxResult) { - msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>"; + return new InterpreterResult(Code.ERROR, e.getMessage()); } - InterpreterResult rett = new InterpreterResult(Code.SUCCESS, "%table " + msg); - sc.clearJobGroup(); - return rett; } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index 87cd188..2c03f1c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -22,19 +22,31 @@ import static scala.collection.JavaConversions.asJavaIterable; import static scala.collection.JavaConversions.collectionAsScalaIterable; import java.io.PrintStream; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import org.apache.spark.SparkContext; import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SQLContext.QueryExecution; +import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.hive.HiveContext; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectWatcher; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.display.Input.ParamOption; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.spark.dep.DependencyResolver; import scala.Tuple2; +import scala.Unit; import scala.collection.Iterable; /** @@ -47,15 +59,18 @@ public class ZeppelinContext extends HashMap<String, Object> { private DependencyResolver dep; private PrintStream out; private InterpreterContext interpreterContext; + private int maxResult; public ZeppelinContext(SparkContext sc, SQLContext sql, InterpreterContext interpreterContext, - DependencyResolver dep, PrintStream printStream) { + DependencyResolver dep, PrintStream printStream, + int maxResult) { this.sc = sc; this.sqlContext = sql; this.interpreterContext = interpreterContext; this.dep = dep; this.out = printStream; + this.maxResult = maxResult; } public SparkContext sc; @@ -63,12 +78,6 @@ public class ZeppelinContext extends HashMap<String, Object> { public HiveContext hiveContext; private GUI gui; - /* spark-1.3 - public SchemaRDD sql(String sql) { - return sqlContext.sql(sql); - } - */ - /** * Load dependency for interpreter and runtime (driver). * And distribute them to spark cluster (sc.add()) @@ -221,25 +230,6 @@ public class ZeppelinContext extends HashMap<String, Object> { this.gui = o; } - public void run(String lines) { - /* - String intpName = Paragraph.getRequiredReplName(lines); - String scriptBody = Paragraph.getScriptBody(lines); - Interpreter intp = interpreterContext.getParagraph().getRepl(intpName); - InterpreterResult ret = intp.interpret(scriptBody, interpreterContext); - if (ret.code() == InterpreterResult.Code.SUCCESS) { - out.println("%" + ret.type().toString().toLowerCase() + " " + ret.message()); - } else if (ret.code() == InterpreterResult.Code.ERROR) { - out.println("Error: " + ret.message()); - } else if (ret.code() == InterpreterResult.Code.INCOMPLETE) { - out.println("Incomplete"); - } else { - out.println("Unknown error"); - } - */ - throw new RuntimeException("Missing implementation"); - } - private void restartInterpreter() { } @@ -251,4 +241,310 @@ public class ZeppelinContext extends HashMap<String, Object> { this.interpreterContext = interpreterContext; } + public void setMaxResult(int maxResult) { + this.maxResult = maxResult; + } + + /** + * show DataFrame or SchemaRDD + * @param o DataFrame or SchemaRDD object + */ + public void show(Object o) { + show(o, maxResult); + } + + /** + * show DataFrame or SchemaRDD + * @param o DataFrame or SchemaRDD object + * @param maxResult maximum number of rows to display + */ + public void show(Object o, int maxResult) { + Class cls = null; + try { + cls = this.getClass().forName("org.apache.spark.sql.DataFrame"); + } catch (ClassNotFoundException e) { + } + + if (cls == null) { + try { + cls = this.getClass().forName("org.apache.spark.sql.SchemaRDD"); + } catch (ClassNotFoundException e) { + } + } + + if (cls == null) { + throw new InterpreterException("Can not road DataFrame/SchemaRDD class"); + } + + if (cls.isInstance(o)) { + out.print(showRDD(sc, interpreterContext, o, maxResult)); + } else { + out.print(o.toString()); + } + } + + public static String showRDD(SparkContext sc, + InterpreterContext interpreterContext, + Object rdd, int maxResult) { + Object[] rows = null; + Method take; + String jobGroup = "zeppelin-" + interpreterContext.getParagraphId(); + sc.setJobGroup(jobGroup, "Zeppelin", false); + + try { + take = rdd.getClass().getMethod("take", int.class); + rows = (Object[]) take.invoke(rdd, maxResult + 1); + + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + sc.clearJobGroup(); + throw new InterpreterException(e); + } + + String msg = null; + + // get field names + Method queryExecution; + QueryExecution qe; + try { + queryExecution = rdd.getClass().getMethod("queryExecution"); + qe = (QueryExecution) queryExecution.invoke(rdd); + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } + + List<Attribute> columns = + scala.collection.JavaConverters.asJavaListConverter( + qe.analyzed().output()).asJava(); + + for (Attribute col : columns) { + if (msg == null) { + msg = col.name(); + } else { + msg += "\t" + col.name(); + } + } + + msg += "\n"; + + // ArrayType, BinaryType, BooleanType, ByteType, DecimalType, DoubleType, DynamicType, + // FloatType, FractionalType, IntegerType, IntegralType, LongType, MapType, NativeType, + // NullType, NumericType, ShortType, StringType, StructType + + try { + for (int r = 0; r < maxResult && r < rows.length; r++) { + Object row = rows[r]; + Method isNullAt = row.getClass().getMethod("isNullAt", int.class); + Method apply = row.getClass().getMethod("apply", int.class); + + for (int i = 0; i < columns.size(); i++) { + if (!(Boolean) isNullAt.invoke(row, i)) { + msg += apply.invoke(row, i).toString(); + } else { + msg += "null"; + } + if (i != columns.size() - 1) { + msg += "\t"; + } + } + msg += "\n"; + } + } catch (NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + throw new InterpreterException(e); + } + + if (rows.length > maxResult) { + msg += "\n<font color=red>Results are limited by " + maxResult + ".</font>"; + } + sc.clearJobGroup(); + return "%table " + msg; + } + + /** + * Run paragraph by id + * @param id + */ + public void run(String id) { + run(id, interpreterContext); + } + + /** + * Run paragraph by id + * @param id + * @param context + */ + public void run(String id, InterpreterContext context) { + if (id.equals(context.getParagraphId())) { + throw new InterpreterException("Can not run current Paragraph"); + } + + for (InterpreterContextRunner r : context.getRunners()) { + if (id.equals(r.getParagraphId())) { + r.run(); + return; + } + } + + throw new InterpreterException("Paragraph " + id + " not found"); + } + + /** + * Run paragraph at idx + * @param idx + */ + public void run(int idx) { + run(idx, interpreterContext); + } + + /** + * Run paragraph at index + * @param idx index starting from 0 + * @param context interpreter context + */ + public void run(int idx, InterpreterContext context) { + if (idx >= context.getRunners().size()) { + throw new InterpreterException("Index out of bound"); + } + + InterpreterContextRunner runner = context.getRunners().get(idx); + if (runner.getParagraphId().equals(context.getParagraphId())) { + throw new InterpreterException("Can not run current Paragraph"); + } + + runner.run(); + } + + public void run(List<Object> paragraphIdOrIdx) { + run(paragraphIdOrIdx, interpreterContext); + } + + /** + * Run paragraphs + * @param paragraphIdOrIdxs list of paragraph id or idx + */ + public void run(List<Object> paragraphIdOrIdx, InterpreterContext context) { + for (Object idOrIdx : paragraphIdOrIdx) { + if (idOrIdx instanceof String) { + String id = (String) idOrIdx; + run(id, context); + } else if (idOrIdx instanceof Integer) { + Integer idx = (Integer) idOrIdx; + run(idx, context); + } else { + throw new InterpreterException("Paragraph " + idOrIdx + " not found"); + } + } + } + + public void runAll() { + runAll(interpreterContext); + } + + /** + * Run all paragraphs. except this. + */ + public void runAll(InterpreterContext context) { + for (InterpreterContextRunner r : context.getRunners()) { + if (r.getParagraphId().equals(context.getParagraphId())) { + // skip itself + continue; + } + r.run(); + } + } + + public List<String> listParagraphs() { + List<String> paragraphs = new LinkedList<String>(); + + for (InterpreterContextRunner r : interpreterContext.getRunners()) { + paragraphs.add(r.getParagraphId()); + } + + return paragraphs; + } + + + + public Object angular(String name) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + AngularObject ao = registry.get(name); + if (ao == null) { + return null; + } else { + return ao.get(); + } + } + + public void angularBind(String name, Object o) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + if (registry.get(name) == null) { + registry.add(name, o); + } else { + registry.get(name).set(o); + } + } + + public void angularBind(String name, Object o, AngularObjectWatcher w) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + if (registry.get(name) == null) { + registry.add(name, o); + } else { + registry.get(name).set(o); + } + angularWatch(name, w); + } + + public void angularWatch(String name, AngularObjectWatcher w) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + if (registry.get(name) != null) { + registry.get(name).addWatcher(w); + } + } + + + public void angularWatch(String name, + final scala.Function2<Object, Object, Unit> func) { + AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) { + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + func.apply(newObject, newObject); + } + }; + angularWatch(name, w); + } + + public void angularWatch( + String name, + final scala.Function3<Object, Object, InterpreterContext, Unit> func) { + AngularObjectWatcher w = new AngularObjectWatcher(getInterpreterContext()) { + @Override + public void watch(Object oldObject, Object newObject, + InterpreterContext context) { + func.apply(oldObject, newObject, context); + } + }; + angularWatch(name, w); + } + + public void angularUnwatch(String name, AngularObjectWatcher w) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + if (registry.get(name) != null) { + registry.get(name).removeWatcher(w); + } + } + + public void angularUnwatch(String name) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + if (registry.get(name) != null) { + registry.get(name).clearAllWatchers(); + } + } + + public void angularUnbind(String name) { + AngularObjectRegistry registry = interpreterContext.getAngularObjectRegistry(); + registry.remove(name); + } } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java index 8d24cc4..2f8254d 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/DepInterpreterTest.java @@ -21,15 +21,16 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.util.HashMap; +import java.util.LinkedList; import java.util.Properties; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.spark.DepInterpreter; -import org.apache.zeppelin.spark.SparkInterpreter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -57,7 +58,9 @@ public class DepInterpreterTest { intpGroup.add(dep); dep.setInterpreterGroup(intpGroup); - context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI()); + context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 20f7fa4..a5e0fe2 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -22,13 +22,16 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.util.HashMap; +import java.util.LinkedList; import java.util.Properties; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.spark.SparkInterpreter; import org.junit.After; import org.junit.Before; import org.junit.FixMethodOrder; @@ -55,7 +58,10 @@ public class SparkInterpreterTest { repl.open(); } - context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI()); + InterpreterGroup intpGroup = new InterpreterGroup(); + context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); } @After http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java index 71f088d..27198b3 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkSqlInterpreterTest.java @@ -20,15 +20,16 @@ package org.apache.zeppelin.spark; import static org.junit.Assert.assertEquals; import java.util.HashMap; +import java.util.LinkedList; import java.util.Properties; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Type; -import org.apache.zeppelin.spark.SparkInterpreter; -import org.apache.zeppelin.spark.SparkSqlInterpreter; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -38,6 +39,7 @@ public class SparkSqlInterpreterTest { private SparkSqlInterpreter sql; private SparkInterpreter repl; private InterpreterContext context; + private InterpreterGroup intpGroup; @Before public void setUp() throws Exception { @@ -55,13 +57,15 @@ public class SparkSqlInterpreterTest { sql = new SparkSqlInterpreter(p); - InterpreterGroup intpGroup = new InterpreterGroup(); + intpGroup = new InterpreterGroup(); intpGroup.add(repl); intpGroup.add(sql); sql.setInterpreterGroup(intpGroup); sql.open(); } - context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI()); + context = new InterpreterContext("id", "title", "text", new HashMap<String, Object>(), new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList<InterpreterContextRunner>()); } @After @@ -79,7 +83,7 @@ public class SparkSqlInterpreterTest { assertEquals(Type.TABLE, ret.type()); assertEquals("name\tage\nmoon\t33\npark\t34\n", ret.message()); - assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); + assertEquals(InterpreterResult.Code.ERROR, sql.interpret("select wrong syntax", context).code()); assertEquals(InterpreterResult.Code.SUCCESS, sql.interpret("select case when name==\"aa\" then name else name end from people", context).code()); } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java new file mode 100644 index 0000000..bbfcd1b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObject.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; + +import org.apache.zeppelin.scheduler.ExecutorFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + * + * @param <T> + */ +public class AngularObject<T> { + private String name; + private T object; + private transient AngularObjectListener listener; + private transient List<AngularObjectWatcher> watchers + = new LinkedList<AngularObjectWatcher>(); + + protected AngularObject(String name, T o, + AngularObjectListener listener) { + this.name = name; + this.listener = listener; + object = o; + } + + public String getName() { + return name; + } + + @Override + public boolean equals(Object o) { + if (o instanceof AngularObject) { + return name.equals(((AngularObject) o).name); + } else { + return false; + } + } + + public Object get() { + return object; + } + + public void emit(){ + if (listener != null) { + listener.updated(this); + } + } + + public void set(T o) { + set(o, true); + } + + public void set(T o, boolean emit) { + final T before = object; + final T after = o; + object = o; + if (emit) { + emit(); + } + + final Logger logger = LoggerFactory.getLogger(AngularObject.class); + List<AngularObjectWatcher> ws = new LinkedList<AngularObjectWatcher>(); + synchronized (watchers) { + ws.addAll(watchers); + } + + ExecutorService executor = ExecutorFactory.singleton().createOrGet("angularObjectWatcher", 50); + for (final AngularObjectWatcher w : ws) { + executor.submit(new Runnable() { + @Override + public void run() { + try { + w.watch(before, after); + } catch (Exception e) { + logger.error("Exception on watch", e); + } + } + }); + } + } + + public void setListener(AngularObjectListener listener) { + this.listener = listener; + } + + public AngularObjectListener getListener() { + return listener; + } + + public void addWatcher(AngularObjectWatcher watcher) { + synchronized (watchers) { + watchers.add(watcher); + } + } + + public void removeWatcher(AngularObjectWatcher watcher) { + synchronized (watchers) { + watchers.remove(watcher); + } + } + + public void clearAllWatchers() { + synchronized (watchers) { + watchers.clear(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java new file mode 100644 index 0000000..880e487 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectListener.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +/** + * + */ +public interface AngularObjectListener { + public void updated(AngularObject updatedObject); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java new file mode 100644 index 0000000..56eca22 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistry.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * + * + */ +public class AngularObjectRegistry { + Map<String, AngularObject> registry = new HashMap<String, AngularObject>(); + private AngularObjectRegistryListener listener; + private String interpreterId; + + AngularObjectListener angularObjectListener; + + public AngularObjectRegistry(final String interpreterId, + final AngularObjectRegistryListener listener) { + this.interpreterId = interpreterId; + this.listener = listener; + angularObjectListener = new AngularObjectListener() { + @Override + public void updated(AngularObject updatedObject) { + if (listener != null) { + listener.onUpdate(interpreterId, updatedObject); + } + } + }; + } + + public AngularObjectRegistryListener getListener() { + return listener; + } + + public AngularObject add(String name, Object o) { + return add(name, o, true); + } + + public AngularObject add(String name, Object o, boolean emit) { + AngularObject ao = createNewAngularObject(name, o); + + synchronized (registry) { + registry.put(name, ao); + if (listener != null && emit) { + listener.onAdd(interpreterId, ao); + } + } + + return ao; + } + + protected AngularObject createNewAngularObject(String name, Object o) { + return new AngularObject(name, o, angularObjectListener); + } + + protected AngularObjectListener getAngularObjectListener() { + return angularObjectListener; + } + + public AngularObject remove(String name) { + synchronized (registry) { + AngularObject o = registry.remove(name); + if (listener != null) { + listener.onRemove(interpreterId, o);; + } + return o; + } + } + + public AngularObject get(String name) { + synchronized (registry) { + return registry.get(name); + } + } + + public List<AngularObject> getAll() { + List<AngularObject> all = new LinkedList<AngularObject>(); + synchronized (registry) { + all.addAll(registry.values()); + } + return all; + } + + public String getInterpreterGroupId() { + return interpreterId; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java new file mode 100644 index 0000000..3f08efa --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectRegistryListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +/** + * + * + */ +public interface AngularObjectRegistryListener { + public void onAdd(String interpreterGroupId, AngularObject object); + public void onUpdate(String interpreterGroupId, AngularObject object); + public void onRemove(String interpreterGroupId, AngularObject object); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java new file mode 100644 index 0000000..c5bd5e2 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/display/AngularObjectWatcher.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.display; + +import org.apache.zeppelin.interpreter.InterpreterContext; + +/** + * + */ +public abstract class AngularObjectWatcher { + private InterpreterContext context; + + public AngularObjectWatcher(InterpreterContext context) { + this.context = context; + } + + void watch(Object oldObject, Object newObject) { + watch(oldObject, newObject, context); + } + + public abstract void watch(Object oldObject, Object newObject, InterpreterContext context); +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 2d70c8e..2e4564e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -17,8 +17,10 @@ package org.apache.zeppelin.interpreter; +import java.util.List; import java.util.Map; +import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; /** @@ -30,19 +32,24 @@ public class InterpreterContext { private final String paragraphText; private final Map<String, Object> config; private GUI gui; - + private AngularObjectRegistry angularObjectRegistry; + private List<InterpreterContextRunner> runners; public InterpreterContext(String paragraphId, String paragraphTitle, String paragraphText, Map<String, Object> config, - GUI gui + GUI gui, + AngularObjectRegistry angularObjectRegistry, + List<InterpreterContextRunner> runners ) { this.paragraphId = paragraphId; this.paragraphTitle = paragraphTitle; this.paragraphText = paragraphText; this.config = config; this.gui = gui; + this.angularObjectRegistry = angularObjectRegistry; + this.runners = runners; } public String getParagraphId() { @@ -65,4 +72,12 @@ public class InterpreterContext { return gui; } + public AngularObjectRegistry getAngularObjectRegistry() { + return angularObjectRegistry; + } + + public List<InterpreterContextRunner> getRunners() { + return runners; + } + } http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java new file mode 100644 index 0000000..7a2df10 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContextRunner.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter; + +/** + */ +public abstract class InterpreterContextRunner implements Runnable { + String noteId; + private String paragraphId; + + public InterpreterContextRunner(String noteId, String paragraphId) { + this.noteId = noteId; + this.paragraphId = paragraphId; + } + + @Override + public boolean equals(Object o) { + if (o instanceof InterpreterContextRunner) { + InterpreterContextRunner io = ((InterpreterContextRunner) o); + if (io.getParagraphId().equals(paragraphId) && + io.getNoteId().equals(noteId)) { + return true; + } else { + return false; + } + + } else { + return false; + } + } + + @Override + public abstract void run(); + + public String getNoteId() { + return noteId; + } + + public String getParagraphId() { + return paragraphId; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java index 834630a..9baaef3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterGroup.java @@ -21,6 +21,8 @@ import java.util.LinkedList; import java.util.Properties; import java.util.Random; +import org.apache.zeppelin.display.AngularObjectRegistry; + /** * InterpreterGroup is list of interpreters in the same group. * And unit of interpreter instantiate, restart, bind, unbind. @@ -28,6 +30,16 @@ import java.util.Random; public class InterpreterGroup extends LinkedList<Interpreter>{ String id; + AngularObjectRegistry angularObjectRegistry; + + public InterpreterGroup(String id) { + this.id = id; + } + + public InterpreterGroup() { + getId(); + } + private static String generateId() { return "InterpreterGroup_" + System.currentTimeMillis() + "_" + new Random().nextInt(); @@ -42,7 +54,6 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ } } - public Properties getProperty() { Properties p = new Properties(); for (Interpreter intp : this) { @@ -51,6 +62,14 @@ public class InterpreterGroup extends LinkedList<Interpreter>{ return p; } + public AngularObjectRegistry getAngularObjectRegistry() { + return angularObjectRegistry; + } + + public void setAngularObjectRegistry(AngularObjectRegistry angularObjectRegistry) { + this.angularObjectRegistry = angularObjectRegistry; + } + public void close() { for (Interpreter intp : this) { intp.close(); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java index 0659a47..5d8d96f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterResult.java @@ -48,6 +48,7 @@ public class InterpreterResult implements Serializable { public static enum Type { TEXT, HTML, + ANGULAR, TABLE, IMG, SVG, http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java new file mode 100644 index 0000000..ca2df12 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class InterpreterContextRunnerPool { + Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class); + private Map<String, List<InterpreterContextRunner>> interpreterContextRunners; + + public InterpreterContextRunnerPool() { + interpreterContextRunners = new HashMap<String, List<InterpreterContextRunner>>(); + + } + + // add runner + public void add(String noteId, InterpreterContextRunner runner) { + synchronized (interpreterContextRunners) { + if (!interpreterContextRunners.containsKey(noteId)) { + interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>()); + } + + interpreterContextRunners.get(noteId).add(runner); + } + } + + // replace all runners to noteId + public void addAll(String noteId, List<InterpreterContextRunner> runners) { + synchronized (interpreterContextRunners) { + if (!interpreterContextRunners.containsKey(noteId)) { + interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>()); + } + + interpreterContextRunners.get(noteId).addAll(runners); + } + } + + public void clear(String noteId) { + synchronized (interpreterContextRunners) { + interpreterContextRunners.remove(noteId); + } + } + + + public void run(String noteId, String paragraphId) { + synchronized (interpreterContextRunners) { + List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId); + if (list != null) { + for (InterpreterContextRunner r : list) { + if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) { + logger.info("run paragraph {} on note {} from InterpreterContext", + r.getParagraphId(), r.getNoteId()); + r.run(); + return; + } + } + } + + throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java new file mode 100644 index 0000000..3abd764 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectListener; + +/** + * + */ +public class RemoteAngularObject extends AngularObject { + + private transient RemoteInterpreterProcess remoteInterpreterProcess; + + RemoteAngularObject(String name, Object o, String interpreterGroupId, + AngularObjectListener listener, + RemoteInterpreterProcess remoteInterpreterProcess) { + super(name, o, listener); + this.remoteInterpreterProcess = remoteInterpreterProcess; + } + + @Override + public void set(Object o, boolean emit) { + set(o, emit, true); + } + + public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) { + super.set(o, emitWeb); + + if (emitRemoteProcess) { + // send updated value to remote interpreter + remoteInterpreterProcess.updateRemoteAngularObject(getName(), o); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java new file mode 100644 index 0000000..c711f69 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.WrappedInterpreter; + +/** + * + */ +public class RemoteAngularObjectRegistry extends AngularObjectRegistry { + + private InterpreterGroup interpreterGroup; + + public RemoteAngularObjectRegistry(String interpreterId, + AngularObjectRegistryListener listener, + InterpreterGroup interpreterGroup) { + super(interpreterId, listener); + this.interpreterGroup = interpreterGroup; + } + + private RemoteInterpreterProcess getRemoteInterpreterProcess() { + if (interpreterGroup.size() == 0) { + throw new RuntimeException("Can't get remoteInterpreterProcess"); + } + Interpreter p = interpreterGroup.get(0); + while (p instanceof WrappedInterpreter) { + p = ((WrappedInterpreter) p).getInnerInterpreter(); + } + + if (p instanceof RemoteInterpreter) { + return ((RemoteInterpreter) p).getInterpreterProcess(); + } else { + throw new RuntimeException("Can't get remoteInterpreterProcess"); + } + } + + @Override + protected AngularObject createNewAngularObject(String name, Object o) { + RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + throw new RuntimeException("Remote Interpreter process not found"); + } + return new RemoteAngularObject(name, o, getInterpreterGroupId(), + getAngularObjectListener(), + getRemoteInterpreterProcess()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index e905d5f..3e6128f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -26,6 +26,7 @@ import org.apache.thrift.TException; import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; @@ -56,6 +57,8 @@ public class RemoteInterpreter extends Interpreter { static Map<String, RemoteInterpreterProcess> interpreterGroupReference = new HashMap<String, RemoteInterpreterProcess>(); + private InterpreterContextRunnerPool interpreterContextRunnerPool; + public RemoteInterpreter(Properties property, String className, String interpreterRunner, @@ -67,6 +70,7 @@ public class RemoteInterpreter extends Interpreter { this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; env = new HashMap<String, String>(); + interpreterContextRunnerPool = new InterpreterContextRunnerPool(); } public RemoteInterpreter(Properties property, @@ -119,7 +123,7 @@ public class RemoteInterpreter extends Interpreter { } } - int rc = interpreterProcess.reference(); + int rc = interpreterProcess.reference(getInterpreterGroup()); synchronized (interpreterProcess) { // when first process created @@ -187,6 +191,15 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e1); } + List<InterpreterContextRunner> runners = context.getRunners(); + if (runners != null && runners.size() != 0) { + // assume all runners in this InterpreterContext have the same note id + String noteId = runners.get(0).getNoteId(); + + interpreterContextRunnerPool.clear(noteId); + interpreterContextRunnerPool.addAll(noteId, runners); + } + try { GUI settings = context.getGui(); RemoteInterpreterResult remoteResult = client.interpret(className, st, convert(context)); @@ -316,7 +329,7 @@ public class RemoteInterpreter extends Interpreter { .containsKey(getInterpreterGroupKey(interpreterGroup))) { interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env)); + interpreterPath, env, interpreterContextRunnerPool)); logger.info("setInterpreterGroup = " + getInterpreterGroupKey(interpreterGroup) + " class=" + className @@ -335,7 +348,8 @@ public class RemoteInterpreter extends Interpreter { ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getConfig()), - gson.toJson(ic.getGui())); + gson.toJson(ic.getGui()), + gson.toJson(ic.getRunners())); } private InterpreterResult convert(RemoteInterpreterResult result) { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java new file mode 100644 index 0000000..8d16ec5 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterContextRunner.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; + +/** + * + */ +public class RemoteInterpreterContextRunner extends InterpreterContextRunner { + + public RemoteInterpreterContextRunner(String noteId, String paragraphId) { + super(noteId, paragraphId); + } + + @Override + public void run() { + // this class should be used only for gson deserialize abstract class + // code should not reach here + throw new InterpreterException("Assert"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java new file mode 100644 index 0000000..dc9ef0b --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.thrift.TException; +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; + +/** + * + */ +public class RemoteInterpreterEventPoller extends Thread { + Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); + private RemoteInterpreterProcess interpreterProcess; + boolean shutdown; + private InterpreterGroup interpreterGroup; + + public RemoteInterpreterEventPoller( + InterpreterGroup interpreterGroup, + RemoteInterpreterProcess interpreterProcess) { + this.interpreterGroup = interpreterGroup; + this.interpreterProcess = interpreterProcess; + shutdown = false; + } + + @Override + public void run() { + Client client = null; + + while (shutdown == false) { + try { + client = interpreterProcess.getClient(); + } catch (Exception e1) { + logger.error("Can't get RemoteInterpreterEvent", e1); + try { + synchronized (this) { + wait(1000); + } + } catch (InterruptedException e) { + } + continue; + } + + RemoteInterpreterEvent event = null; + try { + event = client.getEvent(); + } catch (TException e) { + logger.error("Can't get RemoteInterpreterEvent", e); + try { + synchronized (this) { + wait(1000); + } + } catch (InterruptedException e1) { + } + continue; + } + + interpreterProcess.releaseClient(client); + + Gson gson = new Gson(); + + AngularObjectRegistry angularObjectRegistry = interpreterGroup.getAngularObjectRegistry(); + + try { + if (event.getType() == RemoteInterpreterEventType.NO_OP) { + continue; + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_ADD) { + AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class); + angularObjectRegistry.add(angularObject.getName(), angularObject.get()); + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_UPDATE) { + AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class); + AngularObject localAngularObject = angularObjectRegistry.get(angularObject.getName()); + if (localAngularObject instanceof RemoteAngularObject) { + // to avoid ping-pong loop + ((RemoteAngularObject) localAngularObject).set( + angularObject.get(), true, false); + } else { + localAngularObject.set(angularObject.get()); + } + } else if (event.getType() == RemoteInterpreterEventType.ANGULAR_OBJECT_REMOVE) { + AngularObject angularObject = gson.fromJson(event.getData(), AngularObject.class); + angularObjectRegistry.remove(angularObject.getName()); + } else if (event.getType() == RemoteInterpreterEventType.RUN_INTERPRETER_CONTEXT_RUNNER) { + InterpreterContextRunner runnerFromRemote = gson.fromJson( + event.getData(), RemoteInterpreterContextRunner.class); + + interpreterProcess.getInterpreterContextRunnerPool().run( + runnerFromRemote.getNoteId(), runnerFromRemote.getParagraphId()); + } + } catch (Exception e) { + logger.error("Can't handle event " + event, e); + } + } + } + + public void shutdown() { + shutdown = true; + synchronized (this) { + notify(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/58b70e3b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index a128cd7..dbfaa35 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -28,11 +28,15 @@ import org.apache.commons.exec.ExecuteResultHandler; import org.apache.commons.exec.ExecuteWatchdog; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.pool2.impl.GenericObjectPool; +import org.apache.thrift.TException; import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.gson.Gson; + /** * */ @@ -48,11 +52,17 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { private GenericObjectPool<Client> clientPool; private Map<String, String> env; + private RemoteInterpreterEventPoller remoteInterpreterEventPoller; + private InterpreterContextRunnerPool interpreterContextRunnerPool; - public RemoteInterpreterProcess(String intpRunner, String intpDir, Map<String, String> env) { + public RemoteInterpreterProcess(String intpRunner, + String intpDir, + Map<String, String> env, + InterpreterContextRunnerPool interpreterContextRunnerPool) { this.interpreterRunner = intpRunner; this.interpreterDir = intpDir; this.env = env; + this.interpreterContextRunnerPool = interpreterContextRunnerPool; referenceCount = new AtomicInteger(0); } @@ -60,7 +70,7 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { return port; } - public int reference() { + public int reference(InterpreterGroup interpreterGroup) { synchronized (referenceCount) { if (executor == null) { // start server process @@ -108,6 +118,9 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { } clientPool = new GenericObjectPool<Client>(new ClientFactory("localhost", port)); + + remoteInterpreterEventPoller = new RemoteInterpreterEventPoller(interpreterGroup, this); + remoteInterpreterEventPoller.start(); } return referenceCount.incrementAndGet(); } @@ -126,6 +139,8 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { int r = referenceCount.decrementAndGet(); if (r == 0) { logger.info("shutdown interpreter process"); + remoteInterpreterEventPoller.shutdown(); + // first try shutdown try { Client client = getClient(); @@ -205,4 +220,35 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { return clientPool.getNumIdle(); } } + + /** + * Called when angular object is updated in client side to propagate + * change to the remote process + * @param name + * @param o + */ + public void updateRemoteAngularObject(String name, Object o) { + Client client = null; + try { + client = getClient(); + } catch (NullPointerException e) { + // remote process not started + return; + } catch (Exception e) { + logger.error("Can't update angular object", e); + } + + try { + Gson gson = new Gson(); + client.angularObjectUpdate(name, gson.toJson(o)); + } catch (TException e) { + logger.error("Can't update angular object", e); + } finally { + releaseClient(client); + } + } + + public InterpreterContextRunnerPool getInterpreterContextRunnerPool() { + return interpreterContextRunnerPool; + } }
