Repository: zeppelin
Updated Branches:
  refs/heads/master 70bec2b2e -> 3e863a67c


[ZEPPELIN-966] job manager change information collection process

### What is this PR for?
JOB menu offers traditional data collection has `Polling Get` way.
The PR is switched to `Event` process.
WebSocket is no longer periodically transmit the JOB communications.

### What type of PR is it?
Improvement

### Todos
- [x] - chage to event process from polling get on job manager

### What is the Jira issue?
https://issues.apache.org/jira/browse/ZEPPELIN-966

### How should this be tested?
Create to pargaraph, and notebook or run job.

### Screenshots (if appropriate)
<img width="683" alt="job manger-basic" 
src="https://cloud.githubusercontent.com/assets/10525473/16113612/0120dec8-33f8-11e6-8dec-c74048fae637.png";>

### Questions:
* Does the licenses files need update? no
* Is there breaking changes for older versions? no
* Does this needs documentation? no

Author: CloverHearts <[email protected]>

Closes #1359 from cloverhearts/dev/eventBaseJob and squashes the following 
commits:

ab58436 [CloverHearts] change to Packet Key Commnet
c57a7f9 [CloverHearts] Merge branch 'master' into dev/eventBaseJob
5b635af [CloverHearts] added comment for update job api on method
a50a369 [CloverHearts] Merge branch 'master' into dev/eventBaseJob
e91b9c3 [CloverHearts] comment and changed method name.
5cdb826 [CloverHearts] Merge branch 'master' into dev/eventBaseJob
67b96ed [CloverHearts] Merge branch 'master' into dev/eventBaseJob
fd8eda7 [CloverHearts] Merge branch 'master' into dev/eventBaseJob
2bd26d7 [CloverHearts] remove comment in jobmanager.js
899951f [CloverHearts] after change - change log type and message
740f1fd [CloverHearts] Merge branch 'master' into dev/eventBaseJob
30ffbb9 [CloverHearts] change event process for job manger information


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/3e863a67
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/3e863a67
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/3e863a67

Branch: refs/heads/master
Commit: 3e863a67cc90a12dd0a38438cdcb3153076e92aa
Parents: 70bec2b
Author: CloverHearts <[email protected]>
Authored: Sat Sep 10 11:07:31 2016 +0900
Committer: Lee moon soo <[email protected]>
Committed: Sun Sep 11 15:38:11 2016 -0700

----------------------------------------------------------------------
 .../apache/zeppelin/rest/NotebookRestApi.java   |   7 +-
 .../apache/zeppelin/server/ZeppelinServer.java  |   1 +
 .../apache/zeppelin/socket/NotebookServer.java  | 152 +++++++++++++++++--
 .../src/app/jobmanager/jobmanager.controller.js |   6 -
 .../websocketEvents/websocketMsg.service.js     |   2 +-
 .../org/apache/zeppelin/notebook/Notebook.java  |  74 ++++++++-
 .../zeppelin/notebook/socket/Message.java       |   4 +-
 7 files changed, 219 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
index c93a222..7272112 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java
@@ -779,7 +779,8 @@ public class NotebookRestApi {
     LOG.info("Get notebook jobs for job manager");
 
     AuthenticationInfo subject = new 
AuthenticationInfo(SecurityUtils.getPrincipal());
-    List<Map<String, Object>> notebookJobs = 
notebook.getJobListforNotebook(false, 0, subject);
+    List<Map<String, Object>> notebookJobs = notebook
+      .getJobListByUnixTime(false, 0, subject);
     Map<String, Object> response = new HashMap<>();
 
     response.put("lastResponseUnixTime", System.currentTimeMillis());
@@ -791,6 +792,8 @@ public class NotebookRestApi {
   /**
    * Get updated notebook jobs for job manager
    *
+   * Return the `Note` change information within the post unix timestamp.
+   *
    * @return JSON with status.OK
    * @throws IOException, IllegalArgumentException
    */
@@ -804,7 +807,7 @@ public class NotebookRestApi {
 
     List<Map<String, Object>> notebookJobs;
     AuthenticationInfo subject = new 
AuthenticationInfo(SecurityUtils.getPrincipal());
-    notebookJobs = notebook.getJobListforNotebook(false, lastUpdateUnixTime, 
subject);
+    notebookJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, 
subject);
     Map<String, Object> response = new HashMap<>();
 
     response.put("lastResponseUnixTime", System.currentTimeMillis());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
index 0f7d8a1..d352c08 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java
@@ -104,6 +104,7 @@ public class ZeppelinServer extends Application {
     heliumApplicationFactory.setApplicationEventListener(notebookWsServer);
 
     notebook.addNotebookEventListener(heliumApplicationFactory);
+    
notebook.addNotebookEventListener(notebookWsServer.getNotebookInformationListener());
   }
 
   public static void main(String[] args) throws InterruptedException {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
index 880b0f3..e332802 100644
--- 
a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
+++ 
b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java
@@ -241,8 +241,8 @@ public class NotebookServer extends WebSocketServlet 
implements
           case LIST_NOTEBOOK_JOBS:
             unicastNotebookJobInfo(conn, messagereceived);
             break;
-          case LIST_UPDATE_NOTEBOOK_JOBS:
-            unicastUpdateNotebookJobInfo(conn, messagereceived);
+          case UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS:
+            unsubscribeNotebookJobInfo(conn);
             break;
           case GET_INTERPRETER_BINDINGS:
             getInterpreterBindings(conn, messagereceived);
@@ -398,9 +398,10 @@ public class NotebookServer extends WebSocketServlet 
implements
   }
 
   public void unicastNotebookJobInfo(NotebookSocket conn, Message fromMessage) 
throws IOException {
-
+    addConnectionToNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), conn);
     AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    List<Map<String, Object>> notebookJobs = 
notebook().getJobListforNotebook(false, 0, subject);
+    List<Map<String, Object>> notebookJobs = notebook()
+      .getJobListByUnixTime(false, 0, subject);
     Map<String, Object> response = new HashMap<>();
 
     response.put("lastResponseUnixTime", System.currentTimeMillis());
@@ -410,21 +411,25 @@ public class NotebookServer extends WebSocketServlet 
implements
       .put("notebookJobs", response)));
   }
 
-  public void unicastUpdateNotebookJobInfo(NotebookSocket conn, Message 
fromMessage)
-      throws IOException {
-    double lastUpdateUnixTimeRaw = (double) 
fromMessage.get("lastUpdateUnixTime");
-    long lastUpdateUnixTime = new Double(lastUpdateUnixTimeRaw).longValue();
-
-    List<Map<String, Object>> notebookJobs;
-    AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal);
-    notebookJobs = notebook().getJobListforNotebook(false, lastUpdateUnixTime, 
subject);
+  public void broadcastUpdateNotebookJobInfo(long lastUpdateUnixTime) throws 
IOException {
+    List<Map<String, Object>> notebookJobs = new LinkedList<>();
+    Notebook notebookObject = notebook();
+    List<Map<String, Object>> jobNotes = null;
+    if (notebookObject != null) {
+      jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, 
null);
+      notebookJobs = jobNotes == null ? notebookJobs : jobNotes;
+    }
 
     Map<String, Object> response = new HashMap<>();
     response.put("lastResponseUnixTime", System.currentTimeMillis());
-    response.put("jobs", notebookJobs);
+    response.put("jobs", notebookJobs != null ? notebookJobs : new 
LinkedList<>());
+
+    broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+      new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", 
response));
+  }
 
-    conn.send(serializeMessage(new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS)
-            .put("notebookRunningJobs", response)));
+  public void unsubscribeNotebookJobInfo(NotebookSocket conn) {
+    removeConnectionFromNote(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(), 
conn);
   }
 
   public void saveInterpreterBindings(NotebookSocket conn, Message 
fromMessage) {
@@ -1292,6 +1297,113 @@ public class NotebookServer extends WebSocketServlet 
implements
   }
 
   /**
+   * Notebook Information Change event
+   */
+  public static class NotebookInformationListener implements 
NotebookEventListener {
+    private NotebookServer notebookServer;
+
+    public NotebookInformationListener(NotebookServer notebookServer) {
+      this.notebookServer = notebookServer;
+    }
+
+    @Override
+    public void onParagraphRemove(Paragraph p) {
+      try {
+        
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 
5000);
+      } catch (IOException ioe) {
+        LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+      }
+    }
+
+    @Override
+    public void onNoteRemove(Note note) {
+      try {
+        
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 
5000);
+      } catch (IOException ioe) {
+        LOG.error("can not broadcast for job manager {}", ioe.getMessage());
+      }
+
+      List<Map<String, Object>> notesInfo = new LinkedList<>();
+      Map<String, Object> info = new HashMap<>();
+      info.put("notebookId", note.getId());
+      // set paragraphs
+      List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
+
+      // notebook json object root information.
+      info.put("isRunningJob", false);
+      info.put("unixTimeLastRun", 0);
+      info.put("isRemoved", true);
+      info.put("paragraphs", paragraphsInfo);
+      notesInfo.add(info);
+
+      Map<String, Object> response = new HashMap<>();
+      response.put("lastResponseUnixTime", System.currentTimeMillis());
+      response.put("jobs", notesInfo);
+
+      notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+        new Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", 
response));
+
+    }
+
+    @Override
+    public void onParagraphCreate(Paragraph p) {
+      Notebook notebook = notebookServer.notebook();
+      List<Map<String, Object>> notebookJobs = 
notebook.getJobListByParagraphId(
+              p.getId()
+      );
+      Map<String, Object> response = new HashMap<>();
+      response.put("lastResponseUnixTime", System.currentTimeMillis());
+      response.put("jobs", notebookJobs);
+
+      notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+              new 
Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
+    }
+
+    @Override
+    public void onNoteCreate(Note note) {
+      Notebook notebook = notebookServer.notebook();
+      List<Map<String, Object>> notebookJobs = 
notebook.getJobListBymNotebookId(
+              note.getId()
+      );
+      Map<String, Object> response = new HashMap<>();
+      response.put("lastResponseUnixTime", System.currentTimeMillis());
+      response.put("jobs", notebookJobs);
+
+      notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+              new 
Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
+    }
+
+    @Override
+    public void onParagraphStatusChange(Paragraph p, Status status) {
+      Notebook notebook = notebookServer.notebook();
+      List<Map<String, Object>> notebookJobs = 
notebook.getJobListByParagraphId(
+        p.getId()
+      );
+
+      Map<String, Object> response = new HashMap<>();
+      response.put("lastResponseUnixTime", System.currentTimeMillis());
+      response.put("jobs", notebookJobs);
+
+      notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+              new 
Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
+    }
+
+    @Override
+    public void onUnbindInterpreter(Note note, InterpreterSetting setting) {
+      Notebook notebook = notebookServer.notebook();
+      List<Map<String, Object>> notebookJobs = 
notebook.getJobListBymNotebookId(
+              note.getId()
+      );
+      Map<String, Object> response = new HashMap<>();
+      response.put("lastResponseUnixTime", System.currentTimeMillis());
+      response.put("jobs", notebookJobs);
+
+      notebookServer.broadcast(JOB_MANAGER_SERVICE.JOB_MANAGER_PAGE.getKey(),
+              new 
Message(OP.LIST_UPDATE_NOTEBOOK_JOBS).put("notebookRunningJobs", response));
+    }
+  }
+
+  /**
    * Need description here.
    *
    */
@@ -1334,6 +1446,12 @@ public class NotebookServer extends WebSocketServlet 
implements
         }
       }
       notebookServer.broadcastNote(note);
+
+      try {
+        
notebookServer.broadcastUpdateNotebookJobInfo(System.currentTimeMillis() - 
5000);
+      } catch (IOException e) {
+        LOG.error("can not broadcast for job manager {}", e);
+      }
     }
 
     /**
@@ -1374,6 +1492,10 @@ public class NotebookServer extends WebSocketServlet 
implements
     return new ParagraphListenerImpl(this, note);
   }
 
+  public NotebookEventListener getNotebookInformationListener() {
+    return new NotebookInformationListener(this);
+  }
+
   private void sendAllAngularObjects(Note note, NotebookSocket conn) throws 
IOException {
     List<InterpreterSetting> settings =
         
notebook().getInterpreterFactory().getInterpreterSettings(note.getId());

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-web/src/app/jobmanager/jobmanager.controller.js
----------------------------------------------------------------------
diff --git a/zeppelin-web/src/app/jobmanager/jobmanager.controller.js 
b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js
index 81696b5..85c68c6 100644
--- a/zeppelin-web/src/app/jobmanager/jobmanager.controller.js
+++ b/zeppelin-web/src/app/jobmanager/jobmanager.controller.js
@@ -32,14 +32,8 @@ angular.module('zeppelinWebApp')
         $scope.JobInfomationsByFilter = $scope.jobInfomations;
 
         websocketMsgSrv.getNotebookJobsList();
-        var refreshObj = $interval(function() {
-          if ($scope.lastJobServerUnixTime !== undefined) {
-            
websocketMsgSrv.getUpdateNotebookJobsList($scope.lastJobServerUnixTime);
-          }
-        }, 1000);
 
         $scope.$on('$destroy', function() {
-          $interval.cancel(refreshObj);
           websocketMsgSrv.unsubscribeJobManager();
         });
       };

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
----------------------------------------------------------------------
diff --git 
a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js 
b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
index 473299e..3dcdc3e 100644
--- a/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
+++ b/zeppelin-web/src/components/websocketEvents/websocketMsg.service.js
@@ -195,7 +195,7 @@ angular.module('zeppelinWebApp').service('websocketMsgSrv', 
function($rootScope,
     },
 
     unsubscribeJobManager: function() {
-      websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_JOBMANAGER'});
+      websocketEvents.sendNewEvent({op: 'UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS'});
     },
 
     getInterpreterBindings: function(noteID) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
index f1dee48..ae448e3 100644
--- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
+++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.stream.JsonReader;
+import org.apache.commons.codec.binary.StringUtils;
 import org.quartz.CronScheduleBuilder;
 import org.quartz.CronTrigger;
 import org.quartz.JobBuilder;
@@ -573,7 +574,78 @@ public class Notebook implements NoteEventListener {
     return lastRunningUnixTime;
   }
 
-  public List<Map<String, Object>> getJobListforNotebook(boolean needsReload,
+  public List<Map<String, Object>> getJobListByParagraphId(String paragraphID) 
{
+    String gotNoteId = null;
+    List<Note> notes = getAllNotes();
+    for (Note note : notes) {
+      Paragraph p = note.getParagraph(paragraphID);
+      if (p != null) {
+        gotNoteId = note.getId();
+      }
+    }
+    return getJobListBymNotebookId(gotNoteId);
+  }
+
+  public List<Map<String, Object>> getJobListBymNotebookId(String notebookID) {
+    final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
+    long lastRunningUnixTime = 0;
+    boolean isNotebookRunning = false;
+    Note jobNote = getNote(notebookID);
+    List<Map<String, Object>> notesInfo = new LinkedList<>();
+    if (jobNote == null) {
+      return notesInfo;
+    }
+
+    Map<String, Object> info = new HashMap<>();
+
+    info.put("notebookId", jobNote.getId());
+    String notebookName = jobNote.getName();
+    if (notebookName != null && !notebookName.equals("")) {
+      info.put("notebookName", jobNote.getName());
+    } else {
+      info.put("notebookName", "Note " + jobNote.getId());
+    }
+    // set notebook type ( cron or normal )
+    if (jobNote.getConfig().containsKey(CRON_TYPE_NOTEBOOK_KEYWORD) && 
!jobNote.getConfig()
+            .get(CRON_TYPE_NOTEBOOK_KEYWORD).equals("")) {
+      info.put("notebookType", "cron");
+    } else {
+      info.put("notebookType", "normal");
+    }
+
+    // set paragraphs
+    List<Map<String, Object>> paragraphsInfo = new LinkedList<>();
+    for (Paragraph paragraph : jobNote.getParagraphs()) {
+      // check paragraph's status.
+      if (paragraph.getStatus().isRunning()) {
+        isNotebookRunning = true;
+      }
+
+      // get data for the job manager.
+      Map<String, Object> paragraphItem = 
getParagraphForJobManagerItem(paragraph);
+      lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph);
+
+      paragraphsInfo.add(paragraphItem);
+    }
+
+    // set interpreter bind type
+    String interpreterGroupName = null;
+    if (replFactory.getInterpreterSettings(jobNote.getId()) != null
+            && replFactory.getInterpreterSettings(jobNote.getId()).size() >= 
1) {
+      interpreterGroupName = 
replFactory.getInterpreterSettings(jobNote.getId()).get(0).getName();
+    }
+
+    // notebook json object root information.
+    info.put("interpreter", interpreterGroupName);
+    info.put("isRunningJob", isNotebookRunning);
+    info.put("unixTimeLastRun", lastRunningUnixTime);
+    info.put("paragraphs", paragraphsInfo);
+    notesInfo.add(info);
+
+    return notesInfo;
+  };
+
+  public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload,
       long lastUpdateServerUnixTime, AuthenticationInfo subject) {
     final String CRON_TYPE_NOTEBOOK_KEYWORD = "cron";
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/3e863a67/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
index e91dfbb..9175e30 100644
--- 
a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
+++ 
b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java
@@ -127,8 +127,8 @@ public class Message {
     APP_STATUS_CHANGE,      // [s-c] on app status change
 
     LIST_NOTEBOOK_JOBS,     // [c-s] get notebook job management infomations
-    LIST_UPDATE_NOTEBOOK_JOBS, // [c-s] get job management informations for 
until unixtime
-                               // @param unixTime
+    LIST_UPDATE_NOTEBOOK_JOBS, // [s-c] get job management informations
+    UNSUBSCRIBE_UPDATE_NOTEBOOK_JOBS, // [c-s] unsubscribe job information for 
job management
     GET_INTERPRETER_BINDINGS, // [c-s] get interpreter bindings
                               // @param noteID
     SAVE_INTERPRETER_BINDINGS, // [c-s] save interpreter bindings

Reply via email to