Repository: zeppelin Updated Branches: refs/heads/master 70bec2b2e -> 3e863a67c
[ZEPPELIN-966] job manager change information collection process ### What is this PR for? JOB menu offers traditional data collection has `Polling Get` way. The PR is switched to `Event` process. WebSocket is no longer periodically transmit the JOB communications. ### What type of PR is it? Improvement ### Todos - [x] - chage to event process from polling get on job manager ### What is the Jira issue? https://issues.apache.org/jira/browse/ZEPPELIN-966 ### How should this be tested? Create to pargaraph, and notebook or run job. ### Screenshots (if appropriate) <img width="683" alt="job manger-basic" src="https://cloud.githubusercontent.com/assets/10525473/16113612/0120dec8-33f8-11e6-8dec-c74048fae637.png"> ### Questions: * Does the licenses files need update? no * Is there breaking changes for older versions? no * Does this needs documentation? no Author: CloverHearts <[email protected]> Closes #1359 from cloverhearts/dev/eventBaseJob and squashes the following commits: ab58436 [CloverHearts] change to Packet Key Commnet c57a7f9 [CloverHearts] Merge branch 'master' into dev/eventBaseJob 5b635af [CloverHearts] added comment for update job api on method a50a369 [CloverHearts] Merge branch 'master' into dev/eventBaseJob e91b9c3 [CloverHearts] comment and changed method name. 5cdb826 [CloverHearts] Merge branch 'master' into dev/eventBaseJob 67b96ed [CloverHearts] Merge branch 'master' into dev/eventBaseJob fd8eda7 [CloverHearts] Merge branch 'master' into dev/eventBaseJob 2bd26d7 [CloverHearts] remove comment in jobmanager.js 899951f [CloverHearts] after change - change log type and message 740f1fd [CloverHearts] Merge branch 'master' into dev/eventBaseJob 30ffbb9 [CloverHearts] change event process for job manger information Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3e863a67 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3e863a67 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3e863a67 Branch: refs/heads/master Commit: 3e863a67cc90a12dd0a38438cdcb3153076e92aa Parents: 70bec2b Author: CloverHearts <[email protected]> Authored: Sat Sep 10 11:07:31 2016 +0900 Committer: Lee moon soo <[email protected]> Committed: Sun Sep 11 15:38:11 2016 -0700 ---------------------------------------------------------------------- .../apache/zeppelin/rest/NotebookRestApi.java | 7 +- .../apache/zeppelin/server/ZeppelinServer.java | 1 + .../apache/zeppelin/socket/NotebookServer.java | 152 +++++++++++++++++-- .../src/app/jobmanager/jobmanager.controller.js | 6 - .../websocketEvents/websocketMsg.service.js | 2 +- .../org/apache/zeppelin/notebook/Notebook.java | 74 ++++++++- .../zeppelin/notebook/socket/Message.java | 4 +- 7 files changed, 219 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index c93a222..7272112 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -779,7 +779,8 @@ public class NotebookRestApi { LOG.info("Get notebook jobs for job manager"); AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - List<Map<String, Object>> notebookJobs = notebook.getJobListforNotebook(false, 0, subject); + List<Map<String, Object>> notebookJobs = notebook + .getJobListByUnixTime(false, 0, subject); Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); @@ -791,6 +792,8 @@ public class NotebookRestApi { /** * Get updated notebook jobs for job manager * + * Return the `Note` change information within the post unix timestamp. + * * @return JSON with status.OK * @throws IOException, IllegalArgumentException */ @@ -804,7 +807,7 @@ public class NotebookRestApi { List<Map<String, Object>> notebookJobs; AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime, subject); + notebookJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject); Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index 0f7d8a1..d352c08 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -104,6 +104,7 @@ public class ZeppelinServer extends Application { heliumApplicationFactory.setApplicationEventListener(notebookWsServer); notebook.addNotebookEventListener(heliumApplicationFactory); + notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener()); } public static void main(String[] args) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 880b0f3..e332802 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -241,8 +241,8 @@ public class NotebookServer extends WebSocketServlet implements case LIST_NOTEBOOK_JOBS: unicastNotebookJobInfo(conn, messagereceived); break; - case LIST_UPDATE_NOTEBOOK_JOBS: - unicastUpdateNotebookJobInfo(conn, messagereceived); + case UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS: + unsubscribeNotebookJobInfo(conn); break; case GET_INTERPRETER_BINDINGS: getInterpreterBindings(conn, messagereceived); @@ -398,9 +398,10 @@ public class NotebookServer extends WebSocketServlet implements } public void unicastNotebookJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { - + addConnectionToNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn); AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - List<Map<String, Object>> notebookJobs = notebook().getJobListforNotebook(false, 0, subject); + List<Map<String, Object>> notebookJobs = notebook() + .getJobListByUnixTime(false, 0, subject); Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); @@ -410,21 +411,25 @@ public class NotebookServer extends WebSocketServlet implements .put("notebookJobs", response))); } - public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message fromMessage) - throws IOException { - double lastUpdateUnixTimeRaw = (double) fromMessage.get("lastUpdateUnixTime"); - long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue(); - - List<Map<String, Object>> notebookJobs; - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime, subject); + public void broadcastUpdateNotebookJobInfo(long lastUpdateUnixTime) throws IOException { + List<Map<String, Object>> notebookJobs = new LinkedList<>(); + Notebook notebookObject = notebook(); + List<Map<String, Object>> jobNotes = null; + if (notebookObject != null) { + jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null); + notebookJobs = jobNotes == null ? notebookJobs : jobNotes; + } Map<String, Object> response = new HashMap<>(); response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notebookJobs); + response.put("jobs", notebookJobs != null ? notebookJobs : new LinkedList<>()); + + broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + } - conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS) - .put("notebookRunningJobs", response))); + public void unsubscribeNotebookJobInfo(NotebookSocket conn) { + removeConnectionFromNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn); } public void saveInterpreterBindings(NotebookSocket conn, Message fromMessage) { @@ -1292,6 +1297,113 @@ public class NotebookServer extends WebSocketServlet implements } /** + * Notebook Information Change event + */ + public static class NotebookInformationListener implements NotebookEventListener { + private NotebookServer notebookServer; + + public NotebookInformationListener(NotebookServer notebookServer) { + this.notebookServer = notebookServer; + } + + @Override + public void onParagraphRemove(Paragraph p) { + try { + notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000); + } catch (IOException ioe) { + LOG.error("can not broadcast for job manager {}", ioe.getMessage()); + } + } + + @Override + public void onNoteRemove(Note note) { + try { + notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000); + } catch (IOException ioe) { + LOG.error("can not broadcast for job manager {}", ioe.getMessage()); + } + + List<Map<String, Object>> notesInfo = new LinkedList<>(); + Map<String, Object> info = new HashMap<>(); + info.put("notebookId", note.getId()); + // set paragraphs + List<Map<String, Object>> paragraphsInfo = new LinkedList<>(); + + // notebook json object root information. + info.put("isRunningJob", false); + info.put("unixTimeLastRun", 0); + info.put("isRemoved", true); + info.put("paragraphs", paragraphsInfo); + notesInfo.add(info); + + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesInfo); + + notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + + } + + @Override + public void onParagraphCreate(Paragraph p) { + Notebook notebook = notebookServer.notebook(); + List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId( + p.getId() + ); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + } + + @Override + public void onNoteCreate(Note note) { + Notebook notebook = notebookServer.notebook(); + List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId( + note.getId() + ); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + } + + @Override + public void onParagraphStatusChange(Paragraph p, Status status) { + Notebook notebook = notebookServer.notebook(); + List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId( + p.getId() + ); + + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + } + + @Override + public void onUnbindInterpreter(Note note, InterpreterSetting setting) { + Notebook notebook = notebookServer.notebook(); + List<Map<String, Object>> notebookJobs = notebook.getJobListBymNotebookId( + note.getId() + ); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notebookJobs); + + notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response)); + } + } + + /** * Need description here. * */ @@ -1334,6 +1446,12 @@ public class NotebookServer extends WebSocketServlet implements } } notebookServer.broadcastNote(note); + + try { + notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 5000); + } catch (IOException e) { + LOG.error("can not broadcast for job manager {}", e); + } } /** @@ -1374,6 +1492,10 @@ public class NotebookServer extends WebSocketServlet implements return new ParagraphListenerImpl(this, note); } + public NotebookEventListener getNotebookInformationListener() { + return new NotebookInformationListener(this); + } + private void sendAllAngularObjects(Note note, NotebookSocket conn) throws IOException { List<InterpreterSetting> settings = notebook().getInterpreterFactory().getInterpreterSettings(note.getId()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-web/src/app/jobmanager/jobmanager.controller.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/app/jobmanager/jobmanager.controller.js b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js index 81696b5..85c68c6 100644 --- a/zeppelin-web/src/app/jobmanager/jobmanager.controller.js +++ b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js @@ -32,14 +32,8 @@ angular.module('zeppelinWebApp') $scope.JobInfomationsByFilter = $scope.jobInfomations; websocketMsgSrv.getNotebookJobsList(); - var refreshObj = $interval(function() { - if ($scope.lastJobServerUnixTime !== undefined) { - websocketMsgSrv.getUpdateNotebookJobsList($scope.lastJobServerUnixTime); - } - }, 1000); $scope.$on('$destroy', function() { - $interval.cancel(refreshObj); websocketMsgSrv.unsubscribeJobManager(); }); }; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js ---------------------------------------------------------------------- diff --git a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js index 473299e..3dcdc3e 100644 --- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js +++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js @@ -195,7 +195,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', function($rootScope, }, unsubscribeJobManager: function() { - websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_JOBMANAGER'}); + websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS'}); }, getInterpreterBindings: function(noteID) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index f1dee48..ae448e3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.stream.JsonReader; +import org.apache.commons.codec.binary.StringUtils; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; @@ -573,7 +574,78 @@ public class Notebook implements NoteEventListener { return lastRunningUnixTime; } - public List<Map<String, Object>> getJobListforNotebook(boolean needsReload, + public List<Map<String, Object>> getJobListByParagraphId(String paragraphID) { + String gotNoteId = null; + List<Note> notes = getAllNotes(); + for (Note note : notes) { + Paragraph p = note.getParagraph(paragraphID); + if (p != null) { + gotNoteId = note.getId(); + } + } + return getJobListBymNotebookId(gotNoteId); + } + + public List<Map<String, Object>> getJobListBymNotebookId(String notebookID) { + final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; + long lastRunningUnixTime = 0; + boolean isNotebookRunning = false; + Note jobNote = getNote(notebookID); + List<Map<String, Object>> notesInfo = new LinkedList<>(); + if (jobNote == null) { + return notesInfo; + } + + Map<String, Object> info = new HashMap<>(); + + info.put("notebookId", jobNote.getId()); + String notebookName = jobNote.getName(); + if (notebookName != null && !notebookName.equals("")) { + info.put("notebookName", jobNote.getName()); + } else { + info.put("notebookName", "Note " + jobNote.getId()); + } + // set notebook type ( cron or normal ) + if (jobNote.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && !jobNote.getConfig() + .get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) { + info.put("notebookType", "cron"); + } else { + info.put("notebookType", "normal"); + } + + // set paragraphs + List<Map<String, Object>> paragraphsInfo = new LinkedList<>(); + for (Paragraph paragraph : jobNote.getParagraphs()) { + // check paragraph's status. + if (paragraph.getStatus().isRunning()) { + isNotebookRunning = true; + } + + // get data for the job manager. + Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph); + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + + paragraphsInfo.add(paragraphItem); + } + + // set interpreter bind type + String interpreterGroupName = null; + if (replFactory.getInterpreterSettings(jobNote.getId()) != null + && replFactory.getInterpreterSettings(jobNote.getId()).size() >= 1) { + interpreterGroupName = replFactory.getInterpreterSettings(jobNote.getId()).get(0).getName(); + } + + // notebook json object root information. + info.put("interpreter", interpreterGroupName); + info.put("isRunningJob", isNotebookRunning); + info.put("unixTimeLastRun", lastRunningUnixTime); + info.put("paragraphs", paragraphsInfo); + notesInfo.add(info); + + return notesInfo; + }; + + public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload, long lastUpdateServerUnixTime, AuthenticationInfo subject) { final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron"; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index e91dfbb..9175e30 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -127,8 +127,8 @@ public class Message { APP_STATUS_CHANGE, // [s-c] on app status change LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations - LIST_UPDATE_NOTEBOOK_JOBS, // [c-s] get job management informations for until unixtime - // @param unixTime + LIST_UPDATE_NOTEBOOK_JOBS, // [s-c] get job management informations + UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS, // [c-s] unsubscribe job information for job management GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings // @param noteID SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings
