Repository: zeppelin Updated Branches: refs/heads/master cbe68d227 -> e9caebcfa
ZEPPELIN-1692: Ability to access Spark jobs UI from the paragraph ### What is this PR for? A paragraph execution may result in spark job(s). Adding ability to access the spark job UI(corresponding to the job generated by the paragraph run), directly from the paragraph. ### What type of PR is it? Improvement ### Todos * [x] Write tests ### What is the Jira issue? ZEPPELIN-1692 ### How should this be tested? Run paragraphs with spark code(scala, pyspark, sql, R). The paragraph will display a button on the top right corner, which on click will open up the corresponding job UI ### Screenshots (if appropriate)  ### Questions: * Does the licenses files need update? NA * Is there breaking changes for older versions? NA * Does this needs documentation? NA Author: Karup <[email protected]> Author: karuppayya <[email protected]> Author: karuppayya <[email protected]> Closes #1663 from karuppayya/ZEPPELIN-1692 and squashes the following commits: 4253d0b [Karup] Fix bad rebase d7eb3b6 [Karup] Fix paragraph.js 8e2cd85 [Karup] tryout: fix selenium tests based on moons suggstion 732b0a4 [karuppayya] Fix test 890107d [Karup] Fix test - tryout ed4685c [Karup] Fix tooltip d27221d [Karup] Adding license header 87214a7 [Karup] Fix incorrect rebase 19513a6 [Karup] Send para runtimeinfos via websocker, but dont persist in json 09fc0e2 [Karup] Fix compilation fc44d9b [Karup] Address review comments b837c6c [karuppayya] Fix incorrect variable used 42d92ac [karuppayya] Fix test d4e54e8 [karuppayya] Address review feedbacks 1a45284 [Karup] Fix test 717eedf [Karup] Add tests , refactor 25379aa [Karup] Clear job urls when we clear output 7383c0a [Karup] Address review comments e2cd4db [karuppayya] Fix NPE in tests 3d9a573 [karuppayya] Fix NPE and some refactoring 9b3a3e2 [karuppayya] Fix checkstyle f16422f [karuppayya] Ability to view spark job urls in each paragraph Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/e9caebcf Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/e9caebcf Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/e9caebcf Branch: refs/heads/master Commit: e9caebcfa3ca2af272f3d396363052dc27f3945c Parents: cbe68d2 Author: Karup <[email protected]> Authored: Fri Feb 3 18:05:01 2017 +0530 Committer: Lee moon soo <[email protected]> Committed: Sun Feb 5 15:51:35 2017 +0900 ---------------------------------------------------------------------- .../zeppelin/spark/PySparkInterpreter.java | 2 +- .../apache/zeppelin/spark/SparkInterpreter.java | 54 ++++++++++--- .../zeppelin/spark/SparkRInterpreter.java | 8 +- .../zeppelin/spark/SparkSqlInterpreter.java | 11 +-- .../java/org/apache/zeppelin/spark/Utils.java | 17 ++++ .../apache/zeppelin/spark/ZeppelinContext.java | 21 ++++- .../zeppelin/spark/SparkInterpreterTest.java | 55 ++++++++++++- .../interpreter/remote/RemoteEventClient.java | 10 +++ .../remote/RemoteEventClientWrapper.java | 3 + .../remote/RemoteInterpreterEventClient.java | 4 + .../remote/RemoteInterpreterEventPoller.java | 16 +++- .../RemoteInterpreterProcessListener.java | 2 + .../remote/RemoteInterpreterUtils.java | 10 +++ .../thrift/RemoteInterpreterEventType.java | 5 +- .../main/thrift/RemoteInterpreterService.thrift | 3 +- .../RemoteInterpreterOutputTestStream.java | 6 ++ .../zeppelin/scheduler/RemoteSchedulerTest.java | 4 + .../json/NotebookTypeAdapterFactory.java | 85 ++++++++++++++++++++ .../zeppelin/rest/InterpreterRestApi.java | 10 ++- .../apache/zeppelin/server/ZeppelinServer.java | 2 +- .../apache/zeppelin/socket/NotebookServer.java | 70 +++++++++++++++- .../org/apache/zeppelin/AbstractZeppelinIT.java | 4 +- .../notebook/paragraph/paragraph-control.html | 36 ++++++++- .../notebook/paragraph/paragraph.controller.js | 80 +++++++++--------- .../websocketEvents/websocketEvents.factory.js | 2 + .../interpreter/InterpreterSetting.java | 27 ++++++- .../java/org/apache/zeppelin/notebook/Note.java | 2 + .../org/apache/zeppelin/notebook/Notebook.java | 1 + .../org/apache/zeppelin/notebook/Paragraph.java | 51 +++++++++++- .../zeppelin/notebook/ParagraphRuntimeInfo.java | 39 +++++++++ .../zeppelin/notebook/socket/Message.java | 3 +- 31 files changed, 569 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 5a8e040..0679fcc 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -377,7 +377,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand "pyspark " + sparkInterpreter.getSparkContext().version() + " is not supported")); return new InterpreterResult(Code.ERROR, errorMessage); } - String jobGroup = sparkInterpreter.getJobGroup(context); + String jobGroup = Utils.buildJobGroupId(context); ZeppelinContext z = sparkInterpreter.getZeppelinContext(); z.setInterpreterContext(context); z.setGui(context.getGui()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 0584a30..3c1288e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -42,6 +42,8 @@ import org.apache.spark.repl.SparkILoop; import org.apache.spark.scheduler.ActiveJob; import org.apache.spark.scheduler.DAGScheduler; import org.apache.spark.scheduler.Pool; +import org.apache.spark.scheduler.SparkListenerApplicationEnd; +import org.apache.spark.scheduler.SparkListenerJobStart; import org.apache.spark.sql.SQLContext; import org.apache.spark.ui.SparkUI; import org.apache.spark.ui.jobs.JobProgressListener; @@ -57,6 +59,7 @@ import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.util.InterpreterOutputStream; import org.apache.zeppelin.resource.ResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -112,7 +115,7 @@ public class SparkInterpreter extends Interpreter { private InterpreterOutputStream out; private SparkDependencyResolver dep; - private String sparkUrl; + private static String sparkUrl; /** * completer - org.apache.spark.repl.SparkJLineCompletion (scala 2.10) @@ -156,7 +159,36 @@ public class SparkInterpreter extends Interpreter { } static JobProgressListener setupListeners(SparkContext context) { - JobProgressListener pl = new JobProgressListener(context.getConf()); + JobProgressListener pl = new JobProgressListener(context.getConf()) { + @Override + public synchronized void onJobStart(SparkListenerJobStart jobStart) { + super.onJobStart(jobStart); + int jobId = jobStart.jobId(); + String jobGroupId = jobStart.properties().getProperty("spark.jobGroup.id"); + String jobUrl = getJobUrl(jobId); + String noteId = Utils.getNoteId(jobGroupId); + String paragraphId = Utils.getParagraphId(jobGroupId); + if (jobUrl != null && noteId != null && paragraphId != null) { + RemoteEventClientWrapper eventClient = ZeppelinContext.getEventClient(); + Map<String, String> infos = new java.util.HashMap<>(); + infos.put("jobUrl", jobUrl); + infos.put("label", "SPARK JOB"); + infos.put("tooltip", "View in Spark web UI"); + if (eventClient != null) { + eventClient.onParaInfosReceived(noteId, paragraphId, infos); + } + } + } + + private String getJobUrl(int jobId) { + String jobUrl = null; + if (sparkUrl != null) { + jobUrl = sparkUrl + "/jobs/job?id=" + jobId; + } + return jobUrl; + } + + }; try { Object listenerBus = context.getClass().getMethod("listenerBus").invoke(context); @@ -950,7 +982,10 @@ public class SparkInterpreter extends Interpreter { numReferenceOfSparkContext.incrementAndGet(); } - private String getSparkUIUrl() { + public String getSparkUIUrl() { + if (sparkUrl != null) { + return sparkUrl; + } Option<SparkUI> sparkUiOption = (Option<SparkUI>) Utils.invokeMethod(sc, "ui"); SparkUI sparkUi = sparkUiOption.get(); String sparkWebUrl = sparkUi.appUIAddress(); @@ -971,8 +1006,9 @@ public class SparkInterpreter extends Interpreter { Map<String, String> infos = new java.util.HashMap<>(); if (sparkUrl != null) { infos.put("url", sparkUrl); - logger.info("Sending metainfos to Zeppelin server: {}", infos.toString()); if (ctx != null && ctx.getClient() != null) { + logger.info("Sending metainfos to Zeppelin server: {}", infos.toString()); + getZeppelinContext().setEventClient(ctx.getClient()); ctx.getClient().onMetaInfosReceived(infos); } } @@ -1105,10 +1141,6 @@ public class SparkInterpreter extends Interpreter { return obj; } - String getJobGroup(InterpreterContext context){ - return "zeppelin-" + context.getParagraphId(); - } - /** * Interpret a single line. */ @@ -1129,7 +1161,7 @@ public class SparkInterpreter extends Interpreter { public InterpreterResult interpret(String[] lines, InterpreterContext context) { synchronized (this) { z.setGui(context.getGui()); - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); InterpreterResult r = interpretInput(lines, context); sc.clearJobGroup(); return r; @@ -1252,12 +1284,12 @@ public class SparkInterpreter extends Interpreter { @Override public void cancel(InterpreterContext context) { - sc.cancelJobGroup(getJobGroup(context)); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } @Override public int getProgress(InterpreterContext context) { - String jobGroup = getJobGroup(context); + String jobGroup = Utils.buildJobGroupId(context); int completedTasks = 0; int totalTasks = 0; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8f3e93c..16b1a21 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -102,7 +102,12 @@ public class SparkRInterpreter extends Interpreter { @Override public InterpreterResult interpret(String lines, InterpreterContext interpreterContext) { - getSparkInterpreter().populateSparkWebUrl(interpreterContext); + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + sparkInterpreter.populateSparkWebUrl(interpreterContext); + + String jobGroup = Utils.buildJobGroupId(interpreterContext); + sparkInterpreter.getSparkContext().setJobGroup(jobGroup, "Zeppelin", false); + String imageWidth = getProperty("zeppelin.R.image.width"); String[] sl = lines.split("\n"); @@ -122,7 +127,6 @@ public class SparkRInterpreter extends Interpreter { } } - String jobGroup = getJobGroup(interpreterContext); String setJobGroup = ""; // assign setJobGroup to dummy__, otherwise it would print NULL for this statement if (Utils.isSpark2()) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java index e6fe137..1d5282f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkSqlInterpreter.java @@ -45,10 +45,6 @@ public class SparkSqlInterpreter extends Interpreter { Logger logger = LoggerFactory.getLogger(SparkSqlInterpreter.class); AtomicInteger num = new AtomicInteger(0); - private String getJobGroup(InterpreterContext context){ - return "zeppelin-" + context.getParagraphId(); - } - private int maxResult; public SparkSqlInterpreter(Properties property) { @@ -105,7 +101,7 @@ public class SparkSqlInterpreter extends Interpreter { sc.setLocalProperty("spark.scheduler.pool", null); } - sc.setJobGroup(getJobGroup(context), "Zeppelin", false); + sc.setJobGroup(Utils.buildJobGroupId(context), "Zeppelin", false); Object rdd = null; try { // method signature of sqlc.sql() is changed @@ -134,10 +130,11 @@ public class SparkSqlInterpreter extends Interpreter { @Override public void cancel(InterpreterContext context) { - SQLContext sqlc = getSparkInterpreter().getSQLContext(); + SparkInterpreter sparkInterpreter = getSparkInterpreter(); + SQLContext sqlc = sparkInterpreter.getSQLContext(); SparkContext sc = sqlc.sparkContext(); - sc.cancelJobGroup(getJobGroup(context)); + sc.cancelJobGroup(Utils.buildJobGroupId(context)); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/Utils.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java index 78304fd..17edb0d 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/Utils.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/Utils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.spark; +import org.apache.zeppelin.interpreter.InterpreterContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,4 +107,20 @@ class Utils { return false; } } + + public static String buildJobGroupId(InterpreterContext context) { + return "zeppelin-" + context.getNoteId() + "-" + context.getParagraphId(); + } + + public static String getNoteId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(indexOf + 1, secondIndex); + } + + public static String getParagraphId(String jobgroupId) { + int indexOf = jobgroupId.indexOf("-"); + int secondIndex = jobgroupId.indexOf("-", indexOf + 1); + return jobgroupId.substring(secondIndex + 1, jobgroupId.length()); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java ---------------------------------------------------------------------- diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java index d1234df..d62b68e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinContext.java @@ -46,6 +46,7 @@ import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterHookRegistry; import org.apache.zeppelin.interpreter.RemoteWorksController; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.spark.dep.SparkDependencyResolver; import org.apache.zeppelin.resource.Resource; import org.apache.zeppelin.resource.ResourcePool; @@ -61,6 +62,7 @@ public class ZeppelinContext { // Map interpreter class name (to be used by hook registry) from // given replName in parapgraph private static final Map<String, String> interpreterClassMap; + private static RemoteEventClientWrapper eventClient; static { interpreterClassMap = new HashMap<>(); interpreterClassMap.put("spark", "org.apache.zeppelin.spark.SparkInterpreter"); @@ -221,7 +223,7 @@ public class ZeppelinContext { Object df, int maxResult) { Object[] rows = null; Method take; - String jobGroup = "zeppelin-" + interpreterContext.getParagraphId(); + String jobGroup = Utils.buildJobGroupId(interpreterContext); sc.setJobGroup(jobGroup, "Zeppelin", false); try { @@ -930,4 +932,21 @@ public class ZeppelinContext { return resourcePool.getAll(); } + /** + * Get the event client + */ + @ZeppelinApi + public static RemoteEventClientWrapper getEventClient() { + return eventClient; + } + + /** + * Set event client + */ + @ZeppelinApi + public void setEventClient(RemoteEventClientWrapper eventClient) { + if (ZeppelinContext.eventClient == null) { + ZeppelinContext.eventClient = eventClient; + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java ---------------------------------------------------------------------- diff --git a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java index 1410890..8c78b66 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/SparkInterpreterTest.java @@ -23,11 +23,13 @@ import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteEventClientWrapper; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.resource.LocalResourcePool; import org.apache.zeppelin.resource.WellKnownResourceName; @@ -54,6 +56,8 @@ public class SparkInterpreterTest { public static InterpreterGroup intpGroup; private InterpreterContext context; public static Logger LOGGER = LoggerFactory.getLogger(SparkInterpreterTest.class); + private static Map<String, Map<String, String>> paraIdToInfosMap = + new HashMap<>(); /** * Get spark version number as a numerical value. @@ -92,6 +96,20 @@ public class SparkInterpreterTest { repl.open(); } + final RemoteEventClientWrapper remoteEventClientWrapper = new RemoteEventClientWrapper() { + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + Map<String, String> infos) { + if (infos != null) { + paraIdToInfosMap.put(paragraphId, infos); + } + } + + @Override + public void onMetaInfosReceived(Map<String, String> infos) { + } + }; context = new InterpreterContext("note", "id", null, "title", "text", new AuthenticationInfo(), new HashMap<String, Object>(), @@ -99,7 +117,19 @@ public class SparkInterpreterTest { new AngularObjectRegistry(intpGroup.getId(), null), new LocalResourcePool("id"), new LinkedList<InterpreterContextRunner>(), - new InterpreterOutput(null)); + new InterpreterOutput(null)) { + + @Override + public RemoteEventClientWrapper getClient() { + return remoteEventClientWrapper; + } + }; + // The first para interpretdr will set the Eventclient wrapper + //SparkInterpreter.interpret(String, InterpreterContext) -> + //SparkInterpreter.populateSparkWebUrl(InterpreterContext) -> + //ZeppelinContext.setEventClient(RemoteEventClientWrapper) + //running a dummy to ensure that we dont have any race conditions among tests + repl.interpret("sc", context); } @Test @@ -273,4 +303,27 @@ public class SparkInterpreterTest { List<InterpreterCompletion> completions = repl.completion("sc.", "sc.".length()); assertTrue(completions.size() > 0); } + + @Test + public void testParagraphUrls() { + String paraId = "test_para_job_url"; + InterpreterContext intpCtx = new InterpreterContext("note", paraId, null, "title", "text", + new AuthenticationInfo(), + new HashMap<String, Object>(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList<InterpreterContextRunner>(), + new InterpreterOutput(null)); + repl.interpret("sc.parallelize(1 to 10).map(x => {x}).collect", intpCtx); + Map<String, String> paraInfos = paraIdToInfosMap.get(intpCtx.getParagraphId()); + String jobUrl = null; + if (paraInfos != null) { + jobUrl = paraInfos.get("jobUrl"); + } + String sparkUIUrl = repl.getSparkUIUrl(); + assertNotNull(jobUrl); + assertTrue(jobUrl.startsWith(sparkUIUrl + "/jobs/job?id=")); + + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java index 3585a59..5015a3f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClient.java @@ -1,5 +1,6 @@ package org.apache.zeppelin.interpreter.remote; +import java.util.HashMap; import java.util.Map; /** @@ -21,4 +22,13 @@ public class RemoteEventClient implements RemoteEventClientWrapper { client.onMetaInfosReceived(infos); } + @Override + public void onParaInfosReceived(String noteId, String paragraphId, Map<String, String> infos) { + Map<String, String> paraInfos = new HashMap<String, String>(infos); + paraInfos.put("noteId", noteId); + paraInfos.put("paraId", paragraphId); + client.onParaInfosReceived(paraInfos); + } + + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java index 339f771..bf36cd6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteEventClientWrapper.java @@ -12,4 +12,7 @@ public interface RemoteEventClientWrapper { public void onMetaInfosReceived(Map<String, String> infos); + public void onParaInfosReceived(String noteId, String paragraphId, + Map<String, String> infos); + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 606d35f..4b721f5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -470,6 +470,10 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector { gson.toJson(infos))); } + public void onParaInfosReceived(Map<String, String> infos) { + sendEvent(new RemoteInterpreterEvent(RemoteInterpreterEventType.PARA_INFOS, + gson.toJson(infos))); + } /** * Wait for eventQueue becomes empty */ http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index e794140..f46d31a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -243,10 +243,20 @@ public class RemoteInterpreterEventPoller extends Thread { Map<String, String> metaInfos = gson.fromJson(event.getData(), new TypeToken<Map<String, String>>() { }.getType()); - String id = interpreterGroup.getId(); - int indexOfColon = id.indexOf(":"); - String settingId = id.substring(0, indexOfColon); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); listener.onMetaInfosReceived(settingId, metaInfos); + } else if (event.getType() == RemoteInterpreterEventType.PARA_INFOS) { + Map<String, String> paraInfos = gson.fromJson(event.getData(), + new TypeToken<Map<String, String>>() { + }.getType()); + String noteId = paraInfos.get("noteId"); + String paraId = paraInfos.get("paraId"); + String settingId = RemoteInterpreterUtils. + getInterpreterSettingId(interpreterGroup.getId()); + if (noteId != null && paraId != null && settingId != null) { + listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos); + } } logger.debug("Event from remote process {}", event.getType()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index 66b08c9..0e9dc51 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -40,4 +40,6 @@ public interface RemoteInterpreterProcessListener { public void onFinished(Object resultObject); public void onError(); } + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java index 2937e2d..8308222 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterUtils.java @@ -17,6 +17,7 @@ package org.apache.zeppelin.interpreter.remote; +import org.apache.zeppelin.interpreter.InterpreterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,4 +64,13 @@ public class RemoteInterpreterUtils { return false; } } + + public static String getInterpreterSettingId(String intpGrpId) { + String settingId = null; + if (intpGrpId != null) { + int indexOfColon = intpGrpId.indexOf(":"); + settingId = intpGrpId.substring(0, indexOfColon); + } + return settingId; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 7ca406c..9e5a0b4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -43,7 +43,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { APP_STATUS_UPDATE(12), META_INFOS(13), REMOTE_ZEPPELIN_SERVER_RESOURCE(14), - RESOURCE_INVOKE_METHOD(15); + RESOURCE_INVOKE_METHOD(15), + PARA_INFOS(16); private final int value; @@ -94,6 +95,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { return REMOTE_ZEPPELIN_SERVER_RESOURCE; case 15: return RESOURCE_INVOKE_METHOD; + case 16: + return PARA_INFOS; default: return null; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift index 08a15ad..fc09ade 100644 --- a/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift +++ b/zeppelin-interpreter/src/main/thrift/RemoteInterpreterService.thrift @@ -56,7 +56,8 @@ enum RemoteInterpreterEventType { APP_STATUS_UPDATE = 12, META_INFOS = 13, REMOTE_ZEPPELIN_SERVER_RESOURCE = 14, - RESOURCE_INVOKE_METHOD = 15 + RESOURCE_INVOKE_METHOD = 15, + PARA_INFOS = 16 } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java index e3dc6b4..3f865cb 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterOutputTestStream.java @@ -182,4 +182,10 @@ public class RemoteInterpreterOutputTestStream implements RemoteInterpreterProce public void onRemoteRunParagraph(String noteId, String ParagraphID) throws Exception { } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index d7b2007..ebb5100 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -355,6 +355,10 @@ public class RemoteSchedulerTest implements RemoteInterpreterProcessListener { @Override public void onRemoteRunParagraph(String noteId, String PsaragraphID) throws Exception { + } + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java b/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java new file mode 100644 index 0000000..a22c03b --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/json/NotebookTypeAdapterFactory.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.json; + +import java.io.IOException; + +import org.apache.zeppelin.socket.NotebookServer; + +import com.google.gson.Gson; +import com.google.gson.JsonElement; +import com.google.gson.TypeAdapter; +import com.google.gson.TypeAdapterFactory; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; +import com.google.gson.stream.JsonWriter; + +/** + * Custom adapter type factory + * Modify the jsonObject before serailaization/deserialization + * Check sample implementation at {@link NotebookServer} + * @param <C> the type whose json is to be customized for serialization/deserialization + */ +public class NotebookTypeAdapterFactory<C> implements TypeAdapterFactory { + private final Class<C> customizedClass; + + public NotebookTypeAdapterFactory(Class<C> customizedClass) { + this.customizedClass = customizedClass; + } + + @SuppressWarnings("unchecked") + // we use a runtime check to guarantee that 'C' and 'T' are equal + public final <T> TypeAdapter<T> create(Gson gson, TypeToken<T> type) { + return type.getRawType() == customizedClass ? (TypeAdapter<T>) customizeTypeAdapter(gson, + (TypeToken<C>) type) : null; + } + + private TypeAdapter<C> customizeTypeAdapter(Gson gson, TypeToken<C> type) { + final TypeAdapter<C> delegate = gson.getDelegateAdapter(this, type); + final TypeAdapter<JsonElement> elementAdapter = gson.getAdapter(JsonElement.class); + return new TypeAdapter<C>() { + @Override + public void write(JsonWriter out, C value) throws IOException { + JsonElement tree = delegate.toJsonTree(value); + beforeWrite(value, tree); + elementAdapter.write(out, tree); + } + + @Override + public C read(JsonReader in) throws IOException { + JsonElement tree = elementAdapter.read(in); + afterRead(tree); + return delegate.fromJsonTree(tree); + } + }; + } + + /** + * Override this to change {@code toSerialize} before it is written to the + * outgoing JSON stream. + */ + protected void beforeWrite(C source, JsonElement toSerialize) { + } + + /** + * Override this to change {@code deserialized} before it parsed into the + * application type. + */ + protected void afterRead(JsonElement deserialized) { + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java index 0928007..57eb851 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/InterpreterRestApi.java @@ -51,6 +51,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.rest.message.NewInterpreterSettingRequest; import org.apache.zeppelin.rest.message.UpdateInterpreterSettingRequest; import org.apache.zeppelin.server.JsonResponse; +import org.apache.zeppelin.socket.NotebookServer; /** * Interpreter Rest API @@ -61,14 +62,17 @@ public class InterpreterRestApi { private static final Logger logger = LoggerFactory.getLogger(InterpreterRestApi.class); private InterpreterFactory interpreterFactory; + private NotebookServer notebookServer; Gson gson = new Gson(); public InterpreterRestApi() { } - public InterpreterRestApi(InterpreterFactory interpreterFactory) { + public InterpreterRestApi(InterpreterFactory interpreterFactory, + NotebookServer notebookWsServer) { this.interpreterFactory = interpreterFactory; + this.notebookServer = notebookWsServer; } /** @@ -179,18 +183,20 @@ public class InterpreterRestApi { @ZeppelinApi public Response restartSetting(String message, @PathParam("settingId") String settingId) { logger.info("Restart interpreterSetting {}, msg={}", settingId, message); + + InterpreterSetting setting = interpreterFactory.get(settingId); try { RestartInterpreterRequest request = gson.fromJson(message, RestartInterpreterRequest.class); String noteId = request == null ? null : request.getNoteId(); interpreterFactory.restart(settingId, noteId, SecurityUtils.getPrincipal()); + notebookServer.clearParagraphRuntimeInfo(setting); } catch (InterpreterException e) { logger.error("Exception in InterpreterRestApi while restartSetting ", e); return new JsonResponse<>(Status.NOT_FOUND, e.getMessage(), ExceptionUtils.getStackTrace(e)) .build(); } - InterpreterSetting setting = interpreterFactory.get(settingId); if (setting == null) { return new JsonResponse<>(Status.NOT_FOUND, "", settingId).build(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 0657127..d236214 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -377,7 +377,7 @@ public class ZeppelinServer extends Application { HeliumRestApi heliumApi = new HeliumRestApi(helium, notebook); singletons.add(heliumApi); - InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory); + InterpreterRestApi interpreterApi = new InterpreterRestApi(replFactory, notebookWsServer); singletons.add(interpreterApi); CredentialRestApi credentialApi = new CredentialRestApi(credentials); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 68b015d..e692b12 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -54,6 +55,7 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.json.NotebookTypeAdapterFactory; import org.apache.zeppelin.notebook.JobListenerFactory; import org.apache.zeppelin.notebook.Folder; import org.apache.zeppelin.notebook.Note; @@ -62,6 +64,7 @@ import org.apache.zeppelin.notebook.NotebookAuthorization; import org.apache.zeppelin.notebook.NotebookEventListener; import org.apache.zeppelin.notebook.Paragraph; import org.apache.zeppelin.notebook.ParagraphJobListener; +import org.apache.zeppelin.notebook.ParagraphRuntimeInfo; import org.apache.zeppelin.notebook.repo.NotebookRepo.Revision; import org.apache.zeppelin.notebook.socket.Message; import org.apache.zeppelin.notebook.socket.Message.OP; @@ -86,6 +89,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.Queues; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; import com.google.gson.reflect.TypeToken; /** @@ -113,7 +120,20 @@ public class NotebookServer extends WebSocketServlet private static final Logger LOG = LoggerFactory.getLogger(NotebookServer.class); - Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); + Gson gson = new GsonBuilder() + .registerTypeAdapterFactory(new NotebookTypeAdapterFactory<Paragraph>(Paragraph.class) { + @Override + protected void beforeWrite(Paragraph source, JsonElement toSerialize) { + Map<String, ParagraphRuntimeInfo> runtimeInfos = source.getRuntimeInfos(); + if (runtimeInfos != null) { + JsonElement jsonTree = gson.toJsonTree(runtimeInfos); + if (toSerialize instanceof JsonObject) { + JsonObject jsonObj = (JsonObject) toSerialize; + jsonObj.add("runtimeInfos", jsonTree); + } + } + } + }).setDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").create(); final Map<String, List<NotebookSocket>> noteSocketMap = new HashMap<>(); final Queue<NotebookSocket> connectedSockets = new ConcurrentLinkedQueue<>(); final Map<String, Queue<NotebookSocket>> userConnectedSockets = new ConcurrentHashMap<>(); @@ -2314,4 +2334,52 @@ public class NotebookServer extends WebSocketServlet } } } + + @Override + public void onParaInfosReceived(String noteId, String paragraphId, + String interpreterSettingId, Map<String, String> metaInfos) { + Note note = notebook().getNote(noteId); + if (note != null) { + Paragraph paragraph = note.getParagraph(paragraphId); + if (paragraph != null) { + InterpreterSetting setting = notebook().getInterpreterFactory() + .get(interpreterSettingId); + setting.addNoteToPara(noteId, paragraphId); + String label = metaInfos.get("label"); + String tooltip = metaInfos.get("tooltip"); + List<String> keysToRemove = Arrays.asList("noteId", "paraId", "label", "tooltip"); + for (String removeKey : keysToRemove) { + metaInfos.remove(removeKey); + } + paragraph + .updateRuntimeInfos(label, tooltip, metaInfos, setting.getGroup(), setting.getId()); + broadcast( + note.getId(), + new Message(OP.PARAS_INFO).put("id", paragraphId).put("infos", + paragraph.getRuntimeInfos())); + } + } + } + + public void clearParagraphRuntimeInfo(InterpreterSetting setting) { + Map<String, Set<String>> noteIdAndParaMap = setting.getNoteIdAndParaMap(); + if (noteIdAndParaMap != null && !noteIdAndParaMap.isEmpty()) { + for (String noteId : noteIdAndParaMap.keySet()) { + Set<String> paraIdSet = noteIdAndParaMap.get(noteId); + if (paraIdSet != null && !paraIdSet.isEmpty()) { + for (String paraId : paraIdSet) { + Note note = notebook().getNote(noteId); + if (note != null) { + Paragraph paragraph = note.getParagraph(paraId); + if (paragraph != null) { + paragraph.clearRuntimeInfo(setting.getId()); + broadcast(noteId, new Message(OP.PARAGRAPH).put("paragraph", paragraph)); + } + } + } + } + } + } + setting.clearNoteIdAndParaMap(); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java b/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java index f849868..41f4cec 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/AbstractZeppelinIT.java @@ -64,14 +64,14 @@ abstract public class AbstractZeppelinIT { protected boolean waitForParagraph(final int paragraphNo, final String state) { By locator = By.xpath(getParagraphXPath(paragraphNo) - + "//div[contains(@class, 'control')]//span[1][contains(.,'" + state + "')]"); + + "//div[contains(@class, 'control')]//span[2][contains(.,'" + state + "')]"); WebElement element = pollingWait(locator, MAX_PARAGRAPH_TIMEOUT_SEC); return element.isDisplayed(); } protected String getParagraphStatus(final int paragraphNo) { By locator = By.xpath(getParagraphXPath(paragraphNo) - + "//div[contains(@class, 'control')]//span[1]"); + + "//div[contains(@class, 'control')]/span[2]"); return driver.findElement(locator).getText(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html index 351fb5f..5f9c462 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph-control.html @@ -13,7 +13,27 @@ limitations under the License. --> <div id="{{paragraph.id}}_control" class="control" ng-show="!asIframe"> - + <span> + <span ng-show="paragraph.runtimeInfos.jobUrl.values.length == 1"> + <a href="{{paragraph.runtimeInfos.jobUrl.values[0]}}" target="_blank" style="text-decoration: none;" + tooltip-placement="top" tooltip="{{paragraph.runtimeInfos.jobUrl.tooltip}}" > + <span class="fa fa-tasks"></span> + {{paragraph.runtimeInfos.jobUrl.label}} + </a> + </span> + <span class="dropdown" ng-show="paragraph.runtimeInfos.jobUrl.values.length > 1"> + <span style="cursor:pointer;color:#3071A9" tooltip-placement="top" tooltip="{{paragraph.runtimeInfos.jobUrl.tooltip}}" + data-toggle="dropdown" type="button"> + <span class="fa fa-tasks"></span> + {{paragraph.runtimeInfos.jobUrl.label}}S + </span> + <ul class="dropdown-menu" role="menu" style="width:200px;z-index:1002"> + <li ng-class="{'option-disabled': !noteOperationsAllowed()}" ng-repeat="url in paragraph.runtimeInfos.jobUrl.values"> + <a href="{{url}}" target="_blank"><span class="fa fa-external-link-square"></span> Jobs #{{$index}}</a> + </li> + </ul> + </span> + </span> <span> {{paragraph.status}} </span> @@ -31,6 +51,20 @@ limitations under the License. tooltip="Cancel (Ctrl+{{ (isMac ? 'Option' : 'Alt') }}+c)" ng-click="cancelParagraph(paragraph)" ng-show="paragraph.status=='RUNNING' || paragraph.status=='PENDING'"></span> + <span ng-show="paragraph.runtimeInfos.jobUrl.length == 1"> + <a href="{{paragraph.runtimeInfos.jobUrl[0]}}" target="_blank"><span class="fa fa-tasks"></span> Spark job </a> + </span> + <span class="dropdown" ng-show="paragraph.runtimeInfos.jobUrl.length > 1"> + <span class="fa fa-tasks" style="cursor:pointer;color:#3071A9" tooltip-placement="top" tooltip="Run this paragraph (Shift+Enter)" + data-toggle="dropdown" + type="button"> Spark Jobs + </span> + <ul class="dropdown-menu" role="menu" style="width:200px;z-index:1002"> + <li ng-class="{'option-disabled': !noteOperationsAllowed()}" ng-repeat="url in paragraph.runtimeInfos.jobUrl"> + <a href="{{url}}" target="_blank"><span class="fa fa-external-link-square"></span> Jobs #{{$index}}</a> + </li> + </ul> + </span> <span class="{{paragraph.config.editorHide ? 'icon-size-fullscreen' : 'icon-size-actual'}}" style="cursor:pointer" tooltip-placement="top" tooltip="{{(paragraph.config.editorHide ? 'Show' : 'Hide')}} editor (Ctrl+{{ (isMac ? 'Option' : 'Alt') }}+e)" ng-click="toggleEditor(paragraph)"></span> http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js index 342d41f..e6f3244 100644 --- a/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js +++ b/zeppelin-web/src/app/notebook/paragraph/paragraph.controller.js @@ -1036,6 +1036,12 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat } }); + $scope.$on('updateParaInfos', function(event, data) { + if (data.id === $scope.paragraph.id) { + $scope.paragraph.runtimeInfos = data.infos; + } + }); + $scope.$on('angularObjectRemove', function(event, data) { var noteId = $route.current.pathParams.noteId; if (!data.noteId || data.noteId === noteId) { @@ -1083,7 +1089,8 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat isEmpty(newPara.results) !== isEmpty(oldPara.results) || newPara.errorMessage !== oldPara.errorMessage || !angular.equals(newPara.settings, oldPara.settings) || - !angular.equals(newPara.config, oldPara.config))) + !angular.equals(newPara.config, oldPara.config) || + !angular.equals(newPara.runtimeInfos, oldPara.runtimeInfos))) } $scope.updateAllScopeTexts = function(oldPara, newPara) { @@ -1126,46 +1133,47 @@ function ParagraphCtrl($scope, $rootScope, $route, $window, $routeParams, $locat $scope.paragraph.results = newPara.results; } $scope.paragraph.settings = newPara.settings; + $scope.paragraph.runtimeInfos = newPara.runtimeInfos; if ($scope.editor) { $scope.editor.setReadOnly($scope.isRunning(newPara)); } if (!$scope.asIframe) { - $scope.paragraph.config = newPara.config; - initializeDefault(newPara.config); - } else { - newPara.config.editorHide = true; - newPara.config.tableHide = false; - $scope.paragraph.config = newPara.config; - } - }; - - $scope.updateParagraph = function(oldPara, newPara, updateCallback) { - // 1. get status, refreshed - const statusChanged = (newPara.status !== oldPara.status); - const resultRefreshed = (newPara.dateFinished !== oldPara.dateFinished) || - isEmpty(newPara.results) !== isEmpty(oldPara.results) || - newPara.status === 'ERROR' || (newPara.status === 'FINISHED' && statusChanged); - - // 2. update texts managed by $scope - $scope.updateAllScopeTexts(oldPara, newPara); - - // 3. execute callback to update result - updateCallback(); - - // 4. update remaining paragraph objects - $scope.updateParagraphObjectWhenUpdated(newPara); - - // 5. handle scroll down by key properly if new paragraph is added - if (statusChanged || resultRefreshed) { - // when last paragraph runs, zeppelin automatically appends new paragraph. - // this broadcast will focus to the newly inserted paragraph - const paragraphs = angular.element('div[id$="_paragraphColumn_main"]'); - if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) { - // rendering output can took some time. So delay scrolling event firing for sometime. - setTimeout(() => { $rootScope.$broadcast('scrollToCursor'); }, 500); - } - } + $scope.paragraph.config = newPara.config; + initializeDefault(newPara.config); + } else { + newPara.config.editorHide = true; + newPara.config.tableHide = false; + $scope.paragraph.config = newPara.config; + } + }; + + $scope.updateParagraph = function(oldPara, newPara, updateCallback) { + // 1. get status, refreshed + const statusChanged = (newPara.status !== oldPara.status); + const resultRefreshed = (newPara.dateFinished !== oldPara.dateFinished) || + isEmpty(newPara.results) !== isEmpty(oldPara.results) || + newPara.status === 'ERROR' || (newPara.status === 'FINISHED' && statusChanged); + + // 2. update texts managed by $scope + $scope.updateAllScopeTexts(oldPara, newPara); + + // 3. execute callback to update result + updateCallback(); + + // 4. update remaining paragraph objects + $scope.updateParagraphObjectWhenUpdated(newPara); + + // 5. handle scroll down by key properly if new paragraph is added + if (statusChanged || resultRefreshed) { + // when last paragraph runs, zeppelin automatically appends new paragraph. + // this broadcast will focus to the newly inserted paragraph + const paragraphs = angular.element('div[id$="_paragraphColumn_main"]'); + if (paragraphs.length >= 2 && paragraphs[paragraphs.length - 2].id.indexOf($scope.paragraph.id) === 0) { + // rendering output can took some time. So delay scrolling event firing for sometime. + setTimeout(() => { $rootScope.$broadcast('scrollToCursor'); }, 500); + } + } }; $scope.$on('runParagraphUsingSpell', function(event, data) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js index aceffbb..df0ea48 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js +++ b/zeppelin-web/src/components/websocketEvents/websocketEvents.factory.js @@ -164,6 +164,8 @@ function websocketEvents($rootScope, $websocket, $location, baseUrlSrv) { $rootScope.$broadcast('updateNote', data.name, data.config, data.info); } else if (op === 'SET_NOTE_REVISION') { $rootScope.$broadcast('setNoteRevisionResult', data); + } else if (op === 'PARAS_INFO') { + $rootScope.$broadcast('updateParaInfos', data); } }); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index bd7d664..7442430 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -25,6 +25,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.gson.annotations.SerializedName; @@ -48,6 +49,10 @@ public class InterpreterSetting { private String group; private transient Map<String, String> infos; + // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. + // This map is used to clear the infos in paragraph when the interpretersetting is restarted + private transient Map<String, Set<String>> runtimeInfosToBeCleared; + /** * properties can be either Properties or Map<String, InterpreterProperty> * properties should be: @@ -125,7 +130,7 @@ public class InterpreterSetting { return name; } - String getGroup() { + public String getGroup() { return group; } @@ -401,4 +406,24 @@ public class InterpreterSetting { public void setInterpreterRunner(InterpreterRunner interpreterRunner) { this.interpreterRunner = interpreterRunner; } + + public void addNoteToPara(String noteId, String paraId) { + if (runtimeInfosToBeCleared == null) { + runtimeInfosToBeCleared = new HashMap<>(); + } + Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId); + if (paraIdSet == null) { + paraIdSet = new HashSet<>(); + runtimeInfosToBeCleared.put(noteId, paraIdSet); + } + paraIdSet.add(paraId); + } + + public Map<String, Set<String>> getNoteIdAndParaMap() { + return runtimeInfosToBeCleared; + } + + public void clearNoteIdAndParaMap() { + runtimeInfosToBeCleared = null; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index 26f4e1a..224dd4b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -379,6 +379,7 @@ public class Note implements Serializable, ParagraphJobListener { for (Paragraph p : paragraphs) { if (p.getId().equals(paragraphId)) { p.setReturn(null, null); + p.clearRuntimeInfo(null); return p; } } @@ -563,6 +564,7 @@ public class Note implements Serializable, ParagraphJobListener { return; } + p.clearRuntimeInfo(null); String requiredReplName = p.getRequiredReplName(); Interpreter intp = factory.getInterpreter(p.getUser(), getId(), requiredReplName); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 8b946f2..b853e07 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -480,6 +480,7 @@ public class Notebook implements NoteEventListener { if (p.getDateFinished() != null && lastUpdatedDate.before(p.getDateFinished())) { lastUpdatedDate = p.getDateFinished(); } + p.clearRuntimeInfo(null); } Map<String, List<AngularObject>> savedObjects = note.getAngularObjects(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 27a7071..1250194 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -70,9 +70,10 @@ public class Paragraph extends Job implements Serializable, Cloneable { // For backward compatibility of note.json format after ZEPPELIN-212 Object result; + private Map<String, ParagraphRuntimeInfo> runtimeInfos; /** - * Applicaiton states in this paragraph + * Application states in this paragraph */ private final List<ApplicationState> apps = new LinkedList<>(); @@ -676,4 +677,52 @@ public class Paragraph extends Job implements Serializable, Cloneable { return false; } } + + public void updateRuntimeInfos(String label, String tooltip, Map<String, String> infos, + String group, String intpSettingId) { + if (this.runtimeInfos == null) { + this.runtimeInfos = new HashMap<String, ParagraphRuntimeInfo>(); + } + + if (infos != null) { + for (String key : infos.keySet()) { + ParagraphRuntimeInfo info = this.runtimeInfos.get(key); + if (info == null) { + info = new ParagraphRuntimeInfo(key, label, tooltip, group, intpSettingId); + this.runtimeInfos.put(key, info); + } + info.addValue(infos.get(key)); + } + } + } + + /** + * Remove runtimeinfo taht were got from the setting with id settingId + * @param settingId + */ + public void clearRuntimeInfo(String settingId) { + if (settingId != null) { + Set<String> keys = runtimeInfos.keySet(); + if (keys.size() > 0) { + List<String> infosToRemove = new ArrayList<>(); + for (String key : keys) { + ParagraphRuntimeInfo paragraphRuntimeInfo = runtimeInfos.get(key); + if (paragraphRuntimeInfo.getInterpreterSettingId().equals(settingId)) { + infosToRemove.add(key); + } + } + if (infosToRemove.size() > 0) { + for (String info : infosToRemove) { + runtimeInfos.remove(info); + } + } + } + } else { + this.runtimeInfos = null; + } + } + + public Map<String, ParagraphRuntimeInfo> getRuntimeInfos() { + return runtimeInfos; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java new file mode 100644 index 0000000..0042023 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ParagraphRuntimeInfo.java @@ -0,0 +1,39 @@ +package org.apache.zeppelin.notebook; + +import java.util.ArrayList; +import java.util.List; + +/** + * Store runtime infos of each para + * + */ +public class ParagraphRuntimeInfo { + + private String propertyName; // Name of the property + private String label; // Label to be used in UI + private String tooltip; // Tooltip text toshow in UI + private String group; // The interpretergroup from which the info was derived + private List<String> values; // values for the property + private String interpreterSettingId; + + public ParagraphRuntimeInfo(String propertyName, String label, + String tooltip, String group, String intpSettingId) { + if (intpSettingId == null) { + throw new IllegalArgumentException("Interpreter setting Id cannot be null"); + } + this.propertyName = propertyName; + this.label = label; + this.tooltip = tooltip; + this.group = group; + this.interpreterSettingId = intpSettingId; + this.values = new ArrayList<>(); + } + + public void addValue(String value) { + values.add(value); + } + + public String getInterpreterSettingId() { + return interpreterSettingId; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/e9caebcf/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index a6d1546..d40afc2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -174,7 +174,8 @@ public class Message { NOTE_UPDATED, // [s-c] paragraph updated(name, config) RUN_ALL_PARAGRAPHS, // [c-s] run all paragraphs PARAGRAPH_EXECUTED_BY_SPELL, // [c-s] paragraph was executed by spell - RUN_PARAGRAPH_USING_SPELL // [s-c] run paragraph using spell + RUN_PARAGRAPH_USING_SPELL, // [s-c] run paragraph using spell + PARAS_INFO // [s-c] paragraph runtime infos } public static final Message EMPTY = new Message(null);
