Repository: zeppelin Updated Branches: refs/heads/master bfa84f5b8 -> 001c621c7
ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService ### What is this PR for? This is refactoring PR which move all JobManager page related stuff into class JobManagerService. ### What type of PR is it? [Refactoring] ### Todos * [ ] - Task ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3737 ### How should this be tested? * CI pass ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jeff Zhang <[email protected]> Closes #3154 from zjffdu/ZEPPELIN-3737 and squashes the following commits: 8e334e81b [Jeff Zhang] ZEPPELIN-3737. Wrap JobManager page related stuff into class JobManagerService Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/001c621c Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/001c621c Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/001c621c Branch: refs/heads/master Commit: 001c621c773db837b4c90bbf7868c94f284247ce Parents: bfa84f5 Author: Jeff Zhang <[email protected]> Authored: Mon Aug 20 17:32:26 2018 +0800 Committer: Jeff Zhang <[email protected]> Committed: Wed Aug 22 11:31:10 2018 +0800 ---------------------------------------------------------------------- .../apache/zeppelin/rest/NotebookRestApi.java | 20 +- .../zeppelin/service/JobManagerService.java | 161 +++++++++++++++ .../apache/zeppelin/socket/NotebookServer.java | 168 ++++++++------- .../org/apache/zeppelin/notebook/Notebook.java | 204 ------------------- 4 files changed, 260 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java index f831f3f..8411263 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/rest/NotebookRestApi.java @@ -41,6 +41,7 @@ import org.apache.zeppelin.rest.message.RunParagraphWithParametersRequest; import org.apache.zeppelin.rest.message.UpdateParagraphRequest; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.server.JsonResponse; +import org.apache.zeppelin.service.JobManagerService; import org.apache.zeppelin.service.NotebookService; import org.apache.zeppelin.service.ServiceContext; import org.apache.zeppelin.socket.NotebookServer; @@ -83,12 +84,14 @@ public class NotebookRestApi extends AbstractRestApi { private SearchService noteSearchService; private NotebookAuthorization notebookAuthorization; private NotebookService notebookService; + private JobManagerService jobManagerService; @Inject public NotebookRestApi(Notebook notebook, NotebookServer notebookServer, SearchService search) { this.notebook = notebook; this.notebookServer = notebookServer; this.notebookService = new NotebookService(notebook); + this.jobManagerService = new JobManagerService(notebook); this.noteSearchService = search; this.notebookAuthorization = notebook.getNotebookAuthorization(); this.zConf = notebook.getConf(); @@ -919,15 +922,11 @@ public class NotebookRestApi extends AbstractRestApi { @ZeppelinApi public Response getJobListforNote() throws IOException, IllegalArgumentException { LOG.info("Get note jobs for job manager"); - - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - List<Map<String, Object>> noteJobs = notebook - .getJobListByUnixTime(false, 0, subject); + List<JobManagerService.NoteJobInfo> noteJobs = jobManagerService + .getNoteJobInfoByUnixTime(0, getServiceContext(), new RestServiceCallback<>()); Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", noteJobs); - return new JsonResponse<>(Status.OK, response).build(); } @@ -946,15 +945,12 @@ public class NotebookRestApi extends AbstractRestApi { public Response getUpdatedJobListforNote(@PathParam("lastUpdateUnixtime") long lastUpdateUnixTime) throws IOException, IllegalArgumentException { LOG.info("Get updated note jobs lastUpdateTime {}", lastUpdateUnixTime); - - List<Map<String, Object>> noteJobs; - AuthenticationInfo subject = new AuthenticationInfo(SecurityUtils.getPrincipal()); - noteJobs = notebook.getJobListByUnixTime(false, lastUpdateUnixTime, subject); + List<JobManagerService.NoteJobInfo> noteJobs = + jobManagerService.getNoteJobInfoByUnixTime(lastUpdateUnixTime, getServiceContext(), + new RestServiceCallback<>()); Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); response.put("jobs", noteJobs); - return new JsonResponse<>(Status.OK, response).build(); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java new file mode 100644 index 0000000..374d8ff --- /dev/null +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/JobManagerService.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.service; + +import org.apache.commons.lang3.StringUtils; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * Service class for JobManager Page + */ +public class JobManagerService { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobManagerService.class); + + private Notebook notebook; + + public JobManagerService(Notebook notebook) { + this.notebook = notebook; + } + + public List<NoteJobInfo> getNoteJobInfo(String noteId, + ServiceContext context, + ServiceCallback<List<NoteJobInfo>> callback) + throws IOException { + List<NoteJobInfo> notesJobInfo = new ArrayList<>(); + Note jobNote = notebook.getNote(noteId); + notesJobInfo.add(new NoteJobInfo(jobNote)); + callback.onSuccess(notesJobInfo, context); + return notesJobInfo; + } + + /** + * Get all NoteJobInfo after lastUpdateServerUnixTime + */ + public List<NoteJobInfo> getNoteJobInfoByUnixTime(long lastUpdateServerUnixTime, + ServiceContext context, + ServiceCallback<List<NoteJobInfo>> callback) + throws IOException { + List<Note> notes = notebook.getAllNotes(); + List<NoteJobInfo> notesJobInfo = new ArrayList<>(); + for (Note note : notes) { + NoteJobInfo noteJobInfo = new NoteJobInfo(note); + if (noteJobInfo.unixTimeLastRun > lastUpdateServerUnixTime) { + notesJobInfo.add(noteJobInfo); + } + } + callback.onSuccess(notesJobInfo, context); + return notesJobInfo; + } + + public void removeNoteJobInfo(String noteId, + ServiceContext context, + ServiceCallback<List<NoteJobInfo>> callback) throws IOException { + List<NoteJobInfo> notesJobInfo = new ArrayList<>(); + notesJobInfo.add(new NoteJobInfo(noteId, true)); + callback.onSuccess(notesJobInfo, context); + } + + private static long getUnixTimeLastRunParagraph(Paragraph paragraph) { + if (paragraph.isTerminated() && paragraph.getDateFinished() != null) { + return paragraph.getDateFinished().getTime(); + } else if (paragraph.isRunning()) { + return new Date().getTime(); + } else { + return paragraph.getDateCreated().getTime(); + } + } + + + public static class ParagraphJobInfo { + private String id; + private String name; + private Job.Status status; + + public ParagraphJobInfo(Paragraph p) { + this.id = p.getId(); + if (StringUtils.isBlank(p.getTitle())) { + this.name = p.getId(); + } else { + this.name = p.getTitle(); + } + this.status = p.getStatus(); + } + } + + public static class NoteJobInfo { + private String noteId; + private String noteName; + private String noteType; + private String interpreter; + private boolean isRunningJob; + private boolean isRemoved = false; + private long unixTimeLastRun; + private List<ParagraphJobInfo> paragraphs; + + public NoteJobInfo(Note note) { + boolean isNoteRunning = false; + long lastRunningUnixTime = 0; + this.noteId = note.getId(); + this.noteName = note.getName(); + // set note type ( cron or normal ) + if (isCron(note)) { + this.noteType = "cron"; + } else { + this.noteType = "normal"; + } + this.interpreter = note.getDefaultInterpreterGroup(); + + // set paragraphs + this.paragraphs = new ArrayList<>(); + for (Paragraph paragraph : note.getParagraphs()) { + // check paragraph's status. + if (paragraph.getStatus().isRunning()) { + isNoteRunning = true; + } + // get data for the job manager. + ParagraphJobInfo paragraphItem = new ParagraphJobInfo(paragraph); + lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); + paragraphs.add(paragraphItem); + } + + this.isRunningJob = isNoteRunning; + this.unixTimeLastRun = lastRunningUnixTime; + } + + private boolean isCron(Note note) { + return note.getConfig().containsKey("cron") && + !StringUtils.isBlank(note.getConfig().get("cron").toString()); + } + + public NoteJobInfo(String noteId, boolean isRemoved) { + this.noteId = noteId; + this.isRemoved = isRemoved; + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java index 35da481..16719f3 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/socket/NotebookServer.java @@ -57,6 +57,7 @@ import org.apache.zeppelin.rest.exception.ForbiddenException; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.server.ZeppelinServer; import org.apache.zeppelin.service.ConfigurationService; +import org.apache.zeppelin.service.JobManagerService; import org.apache.zeppelin.service.NotebookService; import org.apache.zeppelin.service.ServiceContext; import org.apache.zeppelin.service.SimpleServiceCallback; @@ -115,11 +116,11 @@ public class NotebookServer extends WebSocketServlet /** * Job manager service type. */ - protected enum JobManagerService { + protected enum JobManagerServiceType { JOB_MANAGER_PAGE("JOB_MANAGER_PAGE"); private String serviceTypeKey; - JobManagerService(String serviceType) { + JobManagerServiceType(String serviceType) { this.serviceTypeKey = serviceType; } @@ -146,6 +147,7 @@ public class NotebookServer extends WebSocketServlet private NotebookService notebookService; private ConfigurationService configurationService; + private JobManagerService jobManagerService; private ExecutorService executorService = Executors.newFixedThreadPool(10); @@ -175,6 +177,13 @@ public class NotebookServer extends WebSocketServlet return this.configurationService; } + public synchronized JobManagerService getJobManagerService() { + if (this.jobManagerService == null) { + this.jobManagerService = new JobManagerService(notebook()); + } + return this.jobManagerService; + } + @Override public void configure(WebSocketServletFactory factory) { factory.setCreator(new NotebookWebSocketCreator(this)); @@ -609,36 +618,49 @@ public class NotebookServer extends WebSocketServlet } public void unicastNoteJobInfo(NotebookSocket conn, Message fromMessage) throws IOException { - addConnectionToNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn); - AuthenticationInfo subject = new AuthenticationInfo(fromMessage.principal); - List<Map<String, Object>> noteJobs = notebook().getJobListByUnixTime(false, 0, subject); - Map<String, Object> response = new HashMap<>(); - - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", noteJobs); + addConnectionToNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); + getJobManagerService().getNoteJobInfoByUnixTime(0, getServiceContext(fromMessage), + new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(conn) { + @Override + public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesJobInfo, context); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesJobInfo); + conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response))); + } - conn.send(serializeMessage(new Message(OP.LIST_NOTE_JOBS).put("noteJobs", response))); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + LOG.warn(ex.getMessage()); + } + }); } public void broadcastUpdateNoteJobInfo(long lastUpdateUnixTime) throws IOException { - List<Map<String, Object>> noteJobs = new LinkedList<>(); - Notebook notebookObject = notebook(); - List<Map<String, Object>> jobNotes; - if (notebookObject != null) { - jobNotes = notebook().getJobListByUnixTime(false, lastUpdateUnixTime, null); - noteJobs = jobNotes == null ? noteJobs : jobNotes; - } - - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", noteJobs != null ? noteJobs : new LinkedList<>()); + getJobManagerService().getNoteJobInfoByUnixTime(lastUpdateUnixTime, null, + new WebSocketServiceCallback<List<JobManagerService.NoteJobInfo>>(null) { + @Override + public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesJobInfo, context); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesJobInfo); + broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + } - broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + @Override + public void onFailure(Exception ex, ServiceContext context) throws IOException { + LOG.warn(ex.getMessage()); + } + }); } public void unsubscribeNoteJobInfo(NotebookSocket conn) { - removeConnectionFromNote(JobManagerService.JOB_MANAGER_PAGE.getKey(), conn); + removeConnectionFromNote(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), conn); } public void getInterpreterBindings(NotebookSocket conn, Message fromMessage) throws IOException { @@ -2033,7 +2055,7 @@ public class NotebookServer extends WebSocketServlet /** * Notebook Information Change event. */ - public static class NotebookInformationListener implements NotebookEventListener { + public class NotebookInformationListener implements NotebookEventListener { private NotebookServer notebookServer; public NotebookInformationListener(NotebookServer notebookServer) { @@ -2043,9 +2065,10 @@ public class NotebookServer extends WebSocketServlet @Override public void onParagraphRemove(Paragraph p) { try { - notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000); - } catch (IOException ioe) { - LOG.error("can not broadcast for job manager {}", ioe.getMessage()); + getJobManagerService().getNoteJobInfoByUnixTime(System.currentTimeMillis() - 5000, null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); } } @@ -2053,72 +2076,63 @@ public class NotebookServer extends WebSocketServlet public void onNoteRemove(Note note) { try { notebookServer.broadcastUpdateNoteJobInfo(System.currentTimeMillis() - 5000); - } catch (IOException ioe) { - LOG.error("can not broadcast for job manager {}", ioe.getMessage()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); } - List<Map<String, Object>> notesInfo = new LinkedList<>(); - Map<String, Object> info = new HashMap<>(); - info.put("noteId", note.getId()); - // set paragraphs - List<Map<String, Object>> paragraphsInfo = new LinkedList<>(); - - // notebook json object root information. - info.put("isRunningJob", false); - info.put("unixTimeLastRun", 0); - info.put("isRemoved", true); - info.put("paragraphs", paragraphsInfo); - notesInfo.add(info); - - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notesInfo); - - notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + try { + getJobManagerService().removeNoteJobInfo(note.getId(), null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); + } } @Override public void onParagraphCreate(Paragraph p) { - Notebook notebook = notebookServer.notebook(); - List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId()); - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notebookJobs); - - notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + try { + notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); + } } @Override public void onNoteCreate(Note note) { - Notebook notebook = notebookServer.notebook(); - List<Map<String, Object>> notebookJobs = notebook.getJobListByNoteId(note.getId()); - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notebookJobs); - - notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + try { + notebookServer.getJobManagerService().getNoteJobInfo(note.getId(), null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); + } } @Override public void onParagraphStatusChange(Paragraph p, Status status) { - Notebook notebook = notebookServer.notebook(); - List<Map<String, Object>> notebookJobs = notebook.getJobListByParagraphId(p.getId()); - - Map<String, Object> response = new HashMap<>(); - response.put("lastResponseUnixTime", System.currentTimeMillis()); - response.put("jobs", notebookJobs); - - notebookServer.broadcast(JobManagerService.JOB_MANAGER_PAGE.getKey(), - new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + try { + notebookServer.getJobManagerService().getNoteJobInfo(p.getNote().getId(), null, + new JobManagerServiceCallback()); + } catch (IOException e) { + LOG.warn("can not broadcast for job manager: " + e.getMessage(), e); + } } + private class JobManagerServiceCallback + extends SimpleServiceCallback<List<JobManagerService.NoteJobInfo>> { + @Override + public void onSuccess(List<JobManagerService.NoteJobInfo> notesJobInfo, + ServiceContext context) throws IOException { + super.onSuccess(notesJobInfo, context); + Map<String, Object> response = new HashMap<>(); + response.put("lastResponseUnixTime", System.currentTimeMillis()); + response.put("jobs", notesJobInfo); + broadcast(JobManagerServiceType.JOB_MANAGER_PAGE.getKey(), + new Message(OP.LIST_UPDATE_NOTE_JOBS).put("noteRunningJobs", response)); + } + } } - - @Override public void onProgressUpdate(Paragraph p, int progress) { broadcast(p.getNote().getId(), @@ -2458,7 +2472,7 @@ public class NotebookServer extends WebSocketServlet return new ServiceContext(authInfo, userAndRoles); } - private class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> { + public class WebSocketServiceCallback<T> extends SimpleServiceCallback<T> { private NotebookSocket conn; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/001c621c/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 7cf0f54..b7dcdc3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -613,210 +613,6 @@ public class Notebook implements NoteEventListener { } } - private Map<String, Object> getParagraphForJobManagerItem(Paragraph paragraph) { - Map<String, Object> paragraphItem = new HashMap<>(); - - // set paragraph id - paragraphItem.put("id", paragraph.getId()); - - // set paragraph name - String paragraphName = paragraph.getTitle(); - if (paragraphName != null) { - paragraphItem.put("name", paragraphName); - } else { - paragraphItem.put("name", paragraph.getId()); - } - - // set status for paragraph. - paragraphItem.put("status", paragraph.getStatus().toString()); - - return paragraphItem; - } - - private long getUnixTimeLastRunParagraph(Paragraph paragraph) { - - Date lastRunningDate; - long lastRunningUnixTime; - - Date paragaraphDate = paragraph.getDateStarted(); - // diff started time <-> finishied time - if (paragaraphDate == null) { - paragaraphDate = paragraph.getDateFinished(); - } else { - if (paragraph.getDateFinished() != null && paragraph.getDateFinished() - .after(paragaraphDate)) { - paragaraphDate = paragraph.getDateFinished(); - } - } - - // finished time and started time is not exists. - if (paragaraphDate == null) { - paragaraphDate = paragraph.getDateCreated(); - } - - // set last update unixtime(ms). - lastRunningDate = paragaraphDate; - - lastRunningUnixTime = lastRunningDate.getTime(); - - return lastRunningUnixTime; - } - - public List<Map<String, Object>> getJobListByParagraphId(String paragraphId) { - String gotNoteId = null; - List<Note> notes = getAllNotes(); - for (Note note : notes) { - Paragraph p = note.getParagraph(paragraphId); - if (p != null) { - gotNoteId = note.getId(); - } - } - return getJobListByNoteId(gotNoteId); - } - - public List<Map<String, Object>> getJobListByNoteId(String noteId) { - final String CRON_TYPE_NOTE_KEYWORD = "cron"; - long lastRunningUnixTime = 0; - boolean isNoteRunning = false; - Note jobNote = getNote(noteId); - List<Map<String, Object>> notesInfo = new LinkedList<>(); - if (jobNote == null) { - return notesInfo; - } - - Map<String, Object> info = new HashMap<>(); - - info.put("noteId", jobNote.getId()); - String noteName = jobNote.getName(); - if (noteName != null && !noteName.equals("")) { - info.put("noteName", jobNote.getName()); - } else { - info.put("noteName", "Note " + jobNote.getId()); - } - // set note type ( cron or normal ) - if (jobNote.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !jobNote.getConfig() - .get(CRON_TYPE_NOTE_KEYWORD).equals("")) { - info.put("noteType", "cron"); - } else { - info.put("noteType", "normal"); - } - - // set paragraphs - List<Map<String, Object>> paragraphsInfo = new LinkedList<>(); - for (Paragraph paragraph : jobNote.getParagraphs()) { - // check paragraph's status. - if (paragraph.getStatus().isRunning()) { - isNoteRunning = true; - } - - // get data for the job manager. - Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph); - lastRunningUnixTime = getUnixTimeLastRunParagraph(paragraph); - - paragraphsInfo.add(paragraphItem); - } - - // set interpreter bind type - String interpreterGroupName = null; - if (interpreterSettingManager.getInterpreterSettings(jobNote.getId()) != null - && interpreterSettingManager.getInterpreterSettings(jobNote.getId()).size() >= 1) { - interpreterGroupName = - interpreterSettingManager.getInterpreterSettings(jobNote.getId()).get(0).getName(); - } - - // note json object root information. - info.put("interpreter", interpreterGroupName); - info.put("isRunningJob", isNoteRunning); - info.put("unixTimeLastRun", lastRunningUnixTime); - info.put("paragraphs", paragraphsInfo); - notesInfo.add(info); - - return notesInfo; - }; - - public List<Map<String, Object>> getJobListByUnixTime(boolean needsReload, - long lastUpdateServerUnixTime, AuthenticationInfo subject) { - final String CRON_TYPE_NOTE_KEYWORD = "cron"; - - if (needsReload) { - try { - reloadAllNotes(subject); - } catch (IOException e) { - logger.error("Fail to reload notes from repository"); - } - } - - List<Note> notes = getAllNotes(); - List<Map<String, Object>> notesInfo = new LinkedList<>(); - for (Note note : notes) { - boolean isNoteRunning = false; - boolean isUpdateNote = false; - long lastRunningUnixTime = 0; - Map<String, Object> info = new HashMap<>(); - - // set note ID - info.put("noteId", note.getId()); - - // set note Name - String noteName = note.getName(); - if (noteName != null && !noteName.equals("")) { - info.put("noteName", note.getName()); - } else { - info.put("noteName", "Note " + note.getId()); - } - - // set note type ( cron or normal ) - if (note.getConfig().containsKey(CRON_TYPE_NOTE_KEYWORD) && !note.getConfig() - .get(CRON_TYPE_NOTE_KEYWORD).equals("")) { - info.put("noteType", "cron"); - } else { - info.put("noteType", "normal"); - } - - // set paragraphs - List<Map<String, Object>> paragraphsInfo = new LinkedList<>(); - for (Paragraph paragraph : note.getParagraphs()) { - // check paragraph's status. - if (paragraph.getStatus().isRunning()) { - isNoteRunning = true; - isUpdateNote = true; - } - - // get data for the job manager. - Map<String, Object> paragraphItem = getParagraphForJobManagerItem(paragraph); - lastRunningUnixTime = Math.max(getUnixTimeLastRunParagraph(paragraph), lastRunningUnixTime); - - // is update note for last server update time. - if (lastRunningUnixTime > lastUpdateServerUnixTime) { - isUpdateNote = true; - } - paragraphsInfo.add(paragraphItem); - } - - // set interpreter bind type - String interpreterGroupName = null; - if (interpreterSettingManager.getInterpreterSettings(note.getId()) != null - && interpreterSettingManager.getInterpreterSettings(note.getId()).size() >= 1) { - interpreterGroupName = - interpreterSettingManager.getInterpreterSettings(note.getId()).get(0).getName(); - } - - // not update and not running -> pass - if (!isUpdateNote && !isNoteRunning) { - continue; - } - - // note json object root information. - info.put("interpreter", interpreterGroupName); - info.put("isRunningJob", isNoteRunning); - info.put("unixTimeLastRun", lastRunningUnixTime); - info.put("paragraphs", paragraphsInfo); - notesInfo.add(info); - } - - return notesInfo; - } - /** * Cron task for the note. */
