http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 7c849d2..9e7eb35 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -17,23 +17,7 @@ package org.apache.zeppelin.notebook; -import com.google.common.base.Preconditions; -import com.google.common.base.Predicate; -import com.google.common.collect.FluentIterable; import com.google.common.collect.Sets; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.TimeUnit; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; @@ -46,12 +30,10 @@ import org.apache.zeppelin.interpreter.InterpreterNotFoundException; import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl; import org.apache.zeppelin.notebook.repo.NotebookRepoWithVersionControl.Revision; -import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -68,19 +50,32 @@ import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + /** - * Collection of Notes. + * High level api of Notebook related operations, such as create, move & delete note/folder. + * It will also do other thing which is caused by these operation, such as update index, + * refresh cron and update InterpreterSetting, these are done through NoteEventListener. + * */ -public class Notebook implements NoteEventListener { - private static final Logger logger = LoggerFactory.getLogger(Notebook.class); +public class Notebook { + private static final Logger LOGGER = LoggerFactory.getLogger(Notebook.class); + + private NoteManager noteManager; private InterpreterFactory replFactory; private InterpreterSettingManager interpreterSettingManager; - /** - * Keep the order. - */ - private final Map<String, Note> notes = new LinkedHashMap<>(); - private final FolderView folders = new FolderView(); private ZeppelinConfiguration conf; private StdSchedulerFactory quertzSchedFact; org.quartz.Scheduler quartzSched; @@ -88,14 +83,12 @@ public class Notebook implements NoteEventListener { private NotebookRepo notebookRepo; private SearchService noteSearchService; private NotebookAuthorization notebookAuthorization; - private final List<NotebookEventListener> notebookEventListeners = - Collections.synchronizedList(new LinkedList<>()); + private List<NoteEventListener> noteEventListeners = new ArrayList<>(); private Credentials credentials; /** * Main constructor \w manual Dependency Injection * - * @param noteSearchService - (nullable) for indexing all notebooks on creating. * @throws IOException * @throws SchedulerException */ @@ -106,8 +99,8 @@ public class Notebook implements NoteEventListener { InterpreterSettingManager interpreterSettingManager, SearchService noteSearchService, NotebookAuthorization notebookAuthorization, - Credentials credentials) - throws IOException, SchedulerException { + Credentials credentials) throws IOException, SchedulerException { + this.noteManager = new NoteManager(notebookRepo); this.conf = conf; this.notebookRepo = notebookRepo; this.replFactory = replFactory; @@ -120,15 +113,9 @@ public class Notebook implements NoteEventListener { quartzSched.start(); CronJob.notebook = this; - AuthenticationInfo anonymous = AuthenticationInfo.ANONYMOUS; - loadAllNotes(anonymous); - if (this.noteSearchService != null) { - long start = System.nanoTime(); - logger.info("Notebook indexing started..."); - noteSearchService.addIndexDocs(notes.values()); - logger.info("Notebook indexing finished: {} indexed in {}s", notes.size(), - TimeUnit.NANOSECONDS.toSeconds(start - System.nanoTime())); - } + this.noteEventListeners.add(this.noteSearchService); + this.noteEventListeners.add(this.notebookAuthorization); + this.noteEventListeners.add(this.interpreterSettingManager); } /** @@ -140,33 +127,37 @@ public class Notebook implements NoteEventListener { } /** - * Create new note. + * Creating new note. defaultInterpreterGroup is not provided, so the global + * defaultInterpreterGroup (zeppelin.interpreter.group.default) is used * + * @param notePath + * @param subject + * @return * @throws IOException */ - public Note createNote(AuthenticationInfo subject) throws IOException { - return createNote("", interpreterSettingManager.getDefaultInterpreterSetting().getName(), subject); + public Note createNote(String notePath, + AuthenticationInfo subject) throws IOException { + return createNote(notePath, interpreterSettingManager.getDefaultInterpreterSetting().getName(), + subject); } /** - * Create new note. + * Creating new note. * + * @param notePath + * @param defaultInterpreterGroup + * @param subject + * @return * @throws IOException */ - public Note createNote(String name, String defaultInterpreterGroup, AuthenticationInfo subject) - throws IOException { + public Note createNote(String notePath, + String defaultInterpreterGroup, + AuthenticationInfo subject) throws IOException { Note note = - new Note(name, defaultInterpreterGroup, notebookRepo, replFactory, interpreterSettingManager, - paragraphJobListener, noteSearchService, credentials, this); - note.setNoteNameListener(folders); - - synchronized (notes) { - notes.put(note.getId(), note); - } - notebookAuthorization.setNewNotePermissions(note.getId(), subject); - noteSearchService.addIndexDoc(note); - note.persist(subject); - fireNoteCreateEvent(note); + new Note(notePath, defaultInterpreterGroup, replFactory, interpreterSettingManager, + paragraphJobListener, credentials, noteEventListeners); + saveNote(note, subject); + fireNoteCreateEvent(note, subject); return note; } @@ -177,10 +168,10 @@ public class Notebook implements NoteEventListener { * @return Note JSON * @throws IOException, IllegalArgumentException */ - public String exportNote(String noteId) throws IOException, IllegalArgumentException { + public String exportNote(String noteId) throws IOException { Note note = getNote(noteId); if (note == null) { - throw new IllegalArgumentException(noteId + " not found"); + throw new IOException(noteId + " not found"); } return note.toJson(); } @@ -189,34 +180,21 @@ public class Notebook implements NoteEventListener { * import JSON as a new note. * * @param sourceJson - the note JSON to import - * @param noteName - the name of the new note + * @param notePath - the path of the new note * @return note ID * @throws IOException */ - public Note importNote(String sourceJson, String noteName, AuthenticationInfo subject) + public Note importNote(String sourceJson, String notePath, AuthenticationInfo subject) throws IOException { - Note newNote; - try { - Note oldNote = Note.fromJson(sourceJson); - newNote = createNote(subject); - if (noteName != null) { - newNote.setName(noteName); - } else { - newNote.setName(oldNote.getName()); - } - newNote.setCronSupported(getConf()); - List<Paragraph> paragraphs = oldNote.getParagraphs(); - for (Paragraph p : paragraphs) { - newNote.addCloneParagraph(p, subject); - } - - notebookAuthorization.setNewNotePermissions(newNote.getId(), subject); - newNote.persist(subject); - } catch (IOException e) { - logger.error(e.toString(), e); - throw e; + Note oldNote = Note.fromJson(sourceJson); + if (notePath == null) { + notePath = oldNote.getName(); + } + Note newNote = createNote(notePath, subject); + List<Paragraph> paragraphs = oldNote.getParagraphs(); + for (Paragraph p : paragraphs) { + newNote.addCloneParagraph(p, subject); } - return newNote; } @@ -224,183 +202,138 @@ public class Notebook implements NoteEventListener { * Clone existing note. * * @param sourceNoteId - the note ID to clone - * @param newNoteName - the name of the new note + * @param newNotePath - the path of the new note * @return noteId * @throws IOException, CloneNotSupportedException, IllegalArgumentException */ - public Note cloneNote(String sourceNoteId, String newNoteName, AuthenticationInfo subject) - throws IOException, IllegalArgumentException { - + public Note cloneNote(String sourceNoteId, String newNotePath, AuthenticationInfo subject) + throws IOException { Note sourceNote = getNote(sourceNoteId); if (sourceNote == null) { - throw new IllegalArgumentException(sourceNoteId + "not found"); - } - Note newNote = createNote(subject); - if (newNoteName != null) { - newNote.setName(newNoteName); - } else { - newNote.setName("Note " + newNote.getId()); + throw new IOException("Source note: " + sourceNoteId + " not found"); } - newNote.setCronSupported(getConf()); - + Note newNote = createNote(newNotePath, subject); List<Paragraph> paragraphs = sourceNote.getParagraphs(); for (Paragraph p : paragraphs) { newNote.addCloneParagraph(p, subject); } - - noteSearchService.addIndexDoc(newNote); - newNote.persist(subject); return newNote; } - public List<InterpreterSetting> getBindedInterpreterSettings(String noteId) { + public void removeNote(String noteId, AuthenticationInfo subject) throws IOException { + LOGGER.info("Remove note " + noteId); Note note = getNote(noteId); - if (note != null) { - Set<InterpreterSetting> settings = new HashSet<>(); - for (Paragraph p : note.getParagraphs()) { - try { - Interpreter intp = p.getBindedInterpreter(); - settings.add(( - (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting()); - } catch (InterpreterNotFoundException e) { - // ignore this - } - } - // add the default interpreter group - InterpreterSetting defaultIntpSetting = - interpreterSettingManager.getByName(note.getDefaultInterpreterGroup()); - if (defaultIntpSetting != null) { - settings.add(defaultIntpSetting); - } - return new ArrayList<>(settings); - } else { - return new LinkedList<>(); - } + noteManager.removeNote(noteId, subject); + fireNoteRemoveEvent(note, subject); } public Note getNote(String id) { - synchronized (notes) { - return notes.get(id); + try { + Note note = noteManager.getNote(id); + if (note == null) { + return null; + } + note.setInterpreterFactory(replFactory); + note.setInterpreterSettingManager(interpreterSettingManager); + note.setParagraphJobListener(paragraphJobListener); + note.setNoteEventListeners(noteEventListeners); + note.setCredentials(credentials); + for (Paragraph p : note.getParagraphs()) { + p.setNote(note); + } + return note; + } catch (IOException e) { + LOGGER.warn("Fail to get note: " + id, e); + return null; } } - public Folder getFolder(String folderId) { - synchronized (folders) { - return folders.getFolder(folderId); - } + public void saveNote(Note note, AuthenticationInfo subject) throws IOException { + noteManager.saveNote(note, subject); + fireNoteUpdateEvent(note, subject); } - public boolean hasFolder(String folderId) { - synchronized (folders) { - return folders.hasFolder(folderId); - } + public boolean containsNote(String notePath) { + return noteManager.containsNote(notePath); } - public void moveNoteToTrash(String noteId) { -// try { -//// interpreterSettingManager.setInterpreterBinding("", noteId, new ArrayList<String>()); -// } catch (IOException e) { -// e.printStackTrace(); -// } + public boolean containsFolder(String folderPath) { + return noteManager.containsFolder(folderPath); } - public void removeNote(String id, AuthenticationInfo subject) { - Preconditions.checkNotNull(subject, "AuthenticationInfo should not be null"); + public void moveNote(String noteId, String newNotePath, AuthenticationInfo subject) throws IOException { + LOGGER.info("Move note " + noteId + " to " + newNotePath); + noteManager.moveNote(noteId, newNotePath, subject); + } - Note note; + public void moveFolder(String folderPath, String newFolderPath, AuthenticationInfo subject) throws IOException { + LOGGER.info("Move folder from " + folderPath + " to " + newFolderPath); + noteManager.moveFolder(folderPath, newFolderPath, subject); + } - synchronized (notes) { - note = notes.remove(id); - folders.removeNote(note); + public void removeFolder(String folderPath, AuthenticationInfo subject) throws IOException { + LOGGER.info("Remove folder " + folderPath); + // TODO(zjffdu) NotebookRepo.remove is called twice here + List<Note> notes = noteManager.removeFolder(folderPath, subject); + for (Note note : notes) { + fireNoteRemoveEvent(note, subject); } + } - noteSearchService.deleteIndexDocs(note); - notebookAuthorization.removeNote(id); - - // remove from all interpreter instance's angular object registry - for (InterpreterSetting settings : interpreterSettingManager.get()) { - InterpreterGroup interpreterGroup = settings.getInterpreterGroup(subject.getUser(), id); - if (interpreterGroup != null) { - AngularObjectRegistry registry = interpreterGroup.getAngularObjectRegistry(); - if (registry instanceof RemoteAngularObjectRegistry) { - // remove paragraph scope object - for (Paragraph p : note.getParagraphs()) { - ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId()); + public void emptyTrash(AuthenticationInfo subject) throws IOException { + LOGGER.info("Empty Trash"); + removeFolder("/" + NoteManager.TRASH_FOLDER, subject); + } - // remove app scope object - List<ApplicationState> appStates = p.getAllApplicationStates(); - if (appStates != null) { - for (ApplicationState app : appStates) { - ((RemoteAngularObjectRegistry) registry) - .removeAllAndNotifyRemoteProcess(id, app.getId()); - } - } - } - // remove note scope object - ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null); - } else { - // remove paragraph scope object - for (Paragraph p : note.getParagraphs()) { - registry.removeAll(id, p.getId()); - - // remove app scope object - List<ApplicationState> appStates = p.getAllApplicationStates(); - if (appStates != null) { - for (ApplicationState app : appStates) { - registry.removeAll(id, app.getId()); - } - } - } - // remove note scope object - registry.removeAll(id, null); - } - } + public void restoreAll(AuthenticationInfo subject) throws IOException { + NoteManager.Folder trash = noteManager.getTrashFolder(); + // restore notes under trash folder + for (NoteManager.NoteNode noteNode : trash.getNotes().values()) { + moveNote(noteNode.getNoteId(), noteNode.getNotePath().replace("/~Trash", ""), subject); } - - interpreterSettingManager.removeResourcesBelongsToNote(id); - - fireNoteRemoveEvent(note); - - try { - note.unpersist(subject); - } catch (IOException e) { - logger.error(e.toString(), e); + // restore folders under trash folder + for (NoteManager.Folder folder : trash.getFolders().values()) { + moveFolder(folder.getPath(), folder.getPath().replace("/~Trash", ""), subject); } } - public Revision checkpointNote(String noteId, String checkpointMessage, + public Revision checkpointNote(String noteId, String noteName, String checkpointMessage, AuthenticationInfo subject) throws IOException { if (((NotebookRepoSync) notebookRepo).isRevisionSupportedInDefaultRepo()) { return ((NotebookRepoWithVersionControl) notebookRepo) - .checkpoint(noteId, checkpointMessage, subject); + .checkpoint(noteId, noteName, checkpointMessage, subject); } else { return null; - } } - public List<Revision> listRevisionHistory(String noteId, AuthenticationInfo subject) { + public List<Revision> listRevisionHistory(String noteId, + String noteName, + AuthenticationInfo subject) throws IOException { if (((NotebookRepoSync) notebookRepo).isRevisionSupportedInDefaultRepo()) { - return ((NotebookRepoWithVersionControl) notebookRepo).revisionHistory(noteId, subject); + return ((NotebookRepoWithVersionControl) notebookRepo) + .revisionHistory(noteId, noteName, subject); } else { return null; } } - public Note setNoteRevision(String noteId, String revisionId, AuthenticationInfo subject) + public Note setNoteRevision(String noteId, String noteName, String revisionId, AuthenticationInfo subject) throws IOException { if (((NotebookRepoSync) notebookRepo).isRevisionSupportedInDefaultRepo()) { return ((NotebookRepoWithVersionControl) notebookRepo) - .setNoteRevision(noteId, revisionId, subject); + .setNoteRevision(noteId, noteName, revisionId, subject); } else { return null; } } - public Note getNoteByRevision(String noteId, String revisionId, AuthenticationInfo subject) + public Note getNoteByRevision(String noteId, String noteName, + String revisionId, AuthenticationInfo subject) throws IOException { if (((NotebookRepoSync) notebookRepo).isRevisionSupportedInDefaultRepo()) { - return ((NotebookRepoWithVersionControl) notebookRepo).get(noteId, revisionId, subject); + return ((NotebookRepoWithVersionControl) notebookRepo).get(noteId, noteName, + revisionId, subject); } else { return null; } @@ -410,23 +343,21 @@ public class Notebook implements NoteEventListener { public Note loadNoteFromRepo(String id, AuthenticationInfo subject) { Note note = null; try { - note = notebookRepo.get(id, subject); + note = noteManager.getNote(id); } catch (IOException e) { - logger.error("Failed to load " + id, e); + LOGGER.error("Failed to load " + id, e); } if (note == null) { return null; } //Manually inject ALL dependencies, as DI constructor was NOT used - note.setIndex(this.noteSearchService); note.setCredentials(this.credentials); note.setInterpreterFactory(replFactory); note.setInterpreterSettingManager(interpreterSettingManager); note.setParagraphJobListener(this.paragraphJobListener); - note.setNotebookRepo(notebookRepo); note.setCronSupported(getConf()); if (note.getDefaultInterpreterGroup() == null) { @@ -460,14 +391,7 @@ public class Notebook implements NoteEventListener { } } - note.setNoteEventListener(this); - note.setNoteNameListener(folders); - - synchronized (notes) { - notes.put(note.getId(), note); - folders.putNote(note); - refreshCron(note.getId()); - } + note.setNoteEventListeners(this.noteEventListeners); for (String name : angularObjectSnapshot.keySet()) { SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); @@ -491,14 +415,6 @@ public class Notebook implements NoteEventListener { return note; } - void loadAllNotes(AuthenticationInfo subject) throws IOException { - List<NoteInfo> noteInfos = notebookRepo.list(subject); - - for (NoteInfo info : noteInfos) { - loadNoteFromRepo(info.getId(), subject); - } - } - /** * Reload all notes from repository after clearing `notes` and `folders` * to reflect the changes of added/deleted/modified notes on file system level. @@ -506,12 +422,7 @@ public class Notebook implements NoteEventListener { * @throws IOException */ public void reloadAllNotes(AuthenticationInfo subject) throws IOException { - synchronized (notes) { - notes.clear(); - } - synchronized (folders) { - folders.clear(); - } + this.noteManager.reloadNotes(); if (notebookRepo instanceof NotebookRepoSync) { NotebookRepoSync mainRepo = (NotebookRepoSync) notebookRepo; @@ -519,12 +430,6 @@ public class Notebook implements NoteEventListener { mainRepo.sync(subject); } } - - List<NoteInfo> noteInfos = notebookRepo.list(subject); - - for (NoteInfo info : noteInfos) { - loadNoteFromRepo(info.getId(), subject); - } } private class SnapshotAngularObject { @@ -552,73 +457,87 @@ public class Notebook implements NoteEventListener { } } - public Folder renameFolder(String oldFolderId, String newFolderId) { - return folders.renameFolder(oldFolderId, newFolderId); - } - - public List<NotebookEventListener> getNotebookEventListeners() { - return notebookEventListeners; + public List<NoteInfo> getNotesInfo() { + return noteManager.getNotesInfo().entrySet().stream() + .map(entry -> new NoteInfo(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); } - public List<Note> getNotesUnderFolder(String folderId) { - return folders.getFolder(folderId).getNotesRecursively(); - } - - public List<Note> getNotesUnderFolder(String folderId, - Set<String> userAndRoles) { - return folders.getFolder(folderId).getNotesRecursively(userAndRoles, notebookAuthorization); + public List<Note> getAllNotes() { + List<Note> noteList = noteManager.getAllNotes(); + Collections.sort(noteList, Comparator.comparing(Note::getPath)); + return noteList; } - public List<Note> getAllNotes() { - synchronized (notes) { - List<Note> noteList = new ArrayList<>(notes.values()); - Collections.sort(noteList, new Comparator<Note>() { - @Override - public int compare(Note note1, Note note2) { - String name1 = note1.getId(); - if (note1.getName() != null) { - name1 = note1.getName(); - } - String name2 = note2.getId(); - if (note2.getName() != null) { - name2 = note2.getName(); - } - return name1.compareTo(name2); - } - }); - return noteList; + public List<Note> getAllNotes(Set<String> userAndRoles) { + final Set<String> entities = Sets.newHashSet(); + if (userAndRoles != null) { + entities.addAll(userAndRoles); } + return getAllNotes().stream() + .filter(note -> notebookAuthorization.isReader(note.getId(), entities)) + .collect(Collectors.toList()); } - public List<Note> getAllNotes(Set<String> userAndRoles) { + public List<NoteInfo> getNotesInfo(Set<String> userAndRoles) { final Set<String> entities = Sets.newHashSet(); if (userAndRoles != null) { entities.addAll(userAndRoles); } + String homescreenNoteId = conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN); + boolean hideHomeScreenNotebookFromList = + conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_HOMESCREEN_HIDE); + + synchronized (noteManager.getNotesInfo()) { + List<NoteInfo> notesInfo = noteManager.getNotesInfo().entrySet().stream().filter(entry -> + notebookAuthorization.isReader(entry.getKey(), entities) && + ((!hideHomeScreenNotebookFromList) || + ((hideHomeScreenNotebookFromList) && !entry.getKey().equals(homescreenNoteId)))) + .map(entry -> new NoteInfo(entry.getKey(), entry.getValue())) + .collect(Collectors.toList()); + + notesInfo.sort((note1, note2) -> { + String name1 = note1.getId(); + if (note1.getPath() != null) { + name1 = note1.getPath(); + } + String name2 = note2.getId(); + if (note2.getPath() != null) { + name2 = note2.getPath(); + } + return name1.compareTo(name2); + }); + return notesInfo; + } + } - synchronized (notes) { - return FluentIterable.from(notes.values()).filter(new Predicate<Note>() { - @Override - public boolean apply(Note input) { - return input != null && notebookAuthorization.isReader(input.getId(), entities); - } - }).toSortedList(new Comparator<Note>() { - @Override - public int compare(Note note1, Note note2) { - String name1 = note1.getId(); - if (note1.getName() != null) { - name1 = note1.getName(); - } - String name2 = note2.getId(); - if (note2.getName() != null) { - name2 = note2.getName(); - } - return name1.compareTo(name2); + + public List<InterpreterSetting> getBindedInterpreterSettings(String noteId) { + Note note = getNote(noteId); + if (note != null) { + Set<InterpreterSetting> settings = new HashSet<>(); + for (Paragraph p : note.getParagraphs()) { + try { + Interpreter intp = p.getBindedInterpreter(); + settings.add(( + (ManagedInterpreterGroup) intp.getInterpreterGroup()).getInterpreterSetting()); + } catch (InterpreterNotFoundException e) { + // ignore this } - }); + } + // add the default interpreter group + InterpreterSetting defaultIntpSetting = + interpreterSettingManager.getByName(note.getDefaultInterpreterGroup()); + if (defaultIntpSetting != null) { + settings.add(defaultIntpSetting); + } + return new ArrayList<>(settings); + } else { + return new LinkedList<>(); } } + /** * Cron task for the note. */ @@ -630,15 +549,14 @@ public class Notebook implements NoteEventListener { String noteId = context.getJobDetail().getJobDataMap().getString("noteId"); Note note = notebook.getNote(noteId); - if (note.isRunningOrPending()) { - logger.warn("execution of the cron job is skipped because there is a running or pending " + + LOGGER.warn("execution of the cron job is skipped because there is a running or pending " + "paragraph (note id: {})", noteId); return; } if (!note.isCronSupported(notebook.getConf())) { - logger.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); return; } @@ -655,7 +573,7 @@ public class Notebook implements NoteEventListener { cronExecutingUser = (String) config.get("cronExecutingUser"); } } catch (ClassCastException e) { - logger.error(e.getMessage(), e); + LOGGER.error(e.getMessage(), e); } if (releaseResource) { for (InterpreterSetting setting : notebook.getInterpreterSettingManager() @@ -664,7 +582,7 @@ public class Notebook implements NoteEventListener { notebook.getInterpreterSettingManager().restart(setting.getId(), noteId, cronExecutingUser != null ? cronExecutingUser : "anonymous"); } catch (InterpreterException e) { - logger.error("Fail to restart interpreter: " + setting.getId(), e); + LOGGER.error("Fail to restart interpreter: " + setting.getId(), e); } } } @@ -673,61 +591,59 @@ public class Notebook implements NoteEventListener { public void refreshCron(String id) { removeCron(id); - synchronized (notes) { - - Note note = notes.get(id); - if (note == null || note.isTrash()) { - return; - } - Map<String, Object> config = note.getConfig(); - if (config == null) { - return; - } + Note note = getNote(id); + if (note == null || note.isTrash()) { + return; + } + Map<String, Object> config = note.getConfig(); + if (config == null) { + return; + } - if (!note.isCronSupported(getConf())) { - logger.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); - return; - } + if (!note.isCronSupported(getConf())) { + LOGGER.warn("execution of the cron job is skipped cron is not enabled from Zeppelin server"); + return; + } - String cronExpr = (String) note.getConfig().get("cron"); - if (cronExpr == null || cronExpr.trim().length() == 0) { - return; - } + String cronExpr = (String) note.getConfig().get("cron"); + if (cronExpr == null || cronExpr.trim().length() == 0) { + return; + } - JobDetail newJob = - JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id) - .build(); + JobDetail newJob = + JobBuilder.newJob(CronJob.class).withIdentity(id, "note").usingJobData("noteId", id) + .build(); - Map<String, Object> info = note.getInfo(); - info.put("cron", null); + Map<String, Object> info = note.getInfo(); + info.put("cron", null); - CronTrigger trigger = null; - try { - trigger = TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note") - .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note").build(); - } catch (Exception e) { - logger.error("Error", e); - info.put("cron", e.getMessage()); - } + CronTrigger trigger = null; + try { + trigger = TriggerBuilder.newTrigger().withIdentity("trigger_" + id, "note") + .withSchedule(CronScheduleBuilder.cronSchedule(cronExpr)).forJob(id, "note").build(); + } catch (Exception e) { + LOGGER.error("Error", e); + info.put("cron", e.getMessage()); + } - try { - if (trigger != null) { - quartzSched.scheduleJob(newJob, trigger); - } - } catch (SchedulerException e) { - logger.error("Error", e); - info.put("cron", "Scheduler Exception"); + try { + if (trigger != null) { + quartzSched.scheduleJob(newJob, trigger); } + } catch (SchedulerException e) { + LOGGER.error("Error", e); + info.put("cron", "Scheduler Exception"); } + } public void removeCron(String id) { try { quartzSched.deleteJob(new JobKey(id, "note")); } catch (SchedulerException e) { - logger.error("Can't remove quertz " + id, e); + LOGGER.error("Can't remove quertz " + id, e); } } @@ -752,19 +668,25 @@ public class Notebook implements NoteEventListener { this.noteSearchService.close(); } - public void addNotebookEventListener(NotebookEventListener listener) { - notebookEventListeners.add(listener); + public void addNotebookEventListener(NoteEventListener listener) { + noteEventListeners.add(listener); } - private void fireNoteCreateEvent(Note note) { - for (NotebookEventListener listener : notebookEventListeners) { - listener.onNoteCreate(note); + private void fireNoteCreateEvent(Note note, AuthenticationInfo subject) throws IOException { + for (NoteEventListener listener : noteEventListeners) { + listener.onNoteCreate(note, subject); } } - private void fireNoteRemoveEvent(Note note) { - for (NotebookEventListener listener : notebookEventListeners) { - listener.onNoteRemove(note); + private void fireNoteUpdateEvent(Note note, AuthenticationInfo subject) throws IOException { + for (NoteEventListener listener : noteEventListeners) { + listener.onNoteUpdate(note, subject); + } + } + + private void fireNoteRemoveEvent(Note note, AuthenticationInfo subject) throws IOException { + for (NoteEventListener listener : noteEventListeners) { + listener.onNoteRemove(note, subject); } } @@ -777,25 +699,4 @@ public class Notebook implements NoteEventListener { return false; } } - - @Override - public void onParagraphRemove(Paragraph p) { - for (NotebookEventListener listener : notebookEventListeners) { - listener.onParagraphRemove(p); - } - } - - @Override - public void onParagraphCreate(Paragraph p) { - for (NotebookEventListener listener : notebookEventListeners) { - listener.onParagraphCreate(p); - } - } - - @Override - public void onParagraphStatusChange(Paragraph p, Job.Status status) { - for (NotebookEventListener listener : notebookEventListeners) { - listener.onParagraphStatusChange(p, status); - } - } }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java index 137af65..0443891 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookAuthorization.java @@ -17,13 +17,7 @@ package org.apache.zeppelin.notebook; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -35,6 +29,7 @@ import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.storage.ConfigStorage; import org.apache.zeppelin.user.AuthenticationInfo; import org.slf4j.Logger; @@ -43,13 +38,11 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; /** * Contains authorization information for notes */ -public class NotebookAuthorization { +public class NotebookAuthorization implements NoteEventListener { private static final Logger LOG = LoggerFactory.getLogger(NotebookAuthorization.class); private static NotebookAuthorization instance = null; /* @@ -410,4 +403,39 @@ public class NotebookAuthorization { } } } + + @Override + public void onNoteCreate(Note note, AuthenticationInfo subject) { + setNewNotePermissions(note.getId(), subject); + } + + @Override + public void onNoteRemove(Note note, AuthenticationInfo subject) { + removeNote(note.getId()); + } + + @Override + public void onNoteUpdate(Note note, AuthenticationInfo subject) { + + } + + @Override + public void onParagraphRemove(Paragraph p) { + + } + + @Override + public void onParagraphCreate(Paragraph p) { + + } + + @Override + public void onParagraphUpdate(Paragraph p) throws IOException { + + } + + @Override + public void onParagraphStatusChange(Paragraph p, Job.Status status) { + + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java deleted file mode 100644 index 01ebec6..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.notebook; - -import org.apache.zeppelin.interpreter.InterpreterSetting; - -/** - * Notebook event - */ -public interface NotebookEventListener extends NoteEventListener { - public void onNoteRemove(Note note); - public void onNoteCreate(Note note); -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index 288bb71..cfec18b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -95,7 +95,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen /************** Transient fields which are not serializabled into note json **************/ private transient String intpText; private transient String scriptText; - private transient InterpreterFactory interpreterFactory; private transient Interpreter interpreter; private transient Note note; private transient AuthenticationInfo subject; @@ -110,23 +109,19 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen super(generateId(), null); } - public Paragraph(String paragraphId, Note note, JobListener listener, - InterpreterFactory interpreterFactory) { + public Paragraph(String paragraphId, Note note, JobListener listener) { super(paragraphId, generateId(), listener); this.note = note; - this.interpreterFactory = interpreterFactory; } - public Paragraph(Note note, JobListener listener, InterpreterFactory interpreterFactory) { + public Paragraph(Note note, JobListener listener) { super(generateId(), listener); this.note = note; - this.interpreterFactory = interpreterFactory; } // used for clone paragraph public Paragraph(Paragraph p2) { super(p2.getId(), null); - this.interpreterFactory = p2.interpreterFactory; this.note = p2.note; this.settings.setParams(Maps.newHashMap(p2.settings.getParams())); this.settings.setForms(Maps.newLinkedHashMap(p2.settings.getForms())); @@ -273,7 +268,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen } public Interpreter getBindedInterpreter() throws InterpreterNotFoundException { - return this.interpreterFactory.getInterpreter(user, note.getId(), intpText, + return this.note.getInterpreterFactory().getInterpreter(user, note.getId(), intpText, note.getDefaultInterpreterGroup()); } @@ -312,10 +307,6 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen return cursor; } - public void setInterpreterFactory(InterpreterFactory factory) { - this.interpreterFactory = factory; - } - @Override public InterpreterResult getReturn() { return results; @@ -381,14 +372,14 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen @Override protected InterpreterResult jobRun() throws Throwable { - LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]", - getId(), intpText, note.getId(), subject.getUser()); this.runtimeInfos.clear(); this.interpreter = getBindedInterpreter(); if (this.interpreter == null) { LOGGER.error("Can not find interpreter name " + intpText); throw new RuntimeException("Can not find interpreter for " + intpText); } + LOGGER.info("Run paragraph [paragraph_id: {}, interpreter: {}, note_id: {}, user: {}]", + getId(), this.interpreter.getClassName(), note.getId(), subject.getUser()); InterpreterSetting interpreterSetting = ((ManagedInterpreterGroup) interpreter.getInterpreterGroup()).getInterpreterSetting(); if (interpreterSetting != null) { @@ -643,7 +634,7 @@ public class Paragraph extends JobWithProgressPoller<InterpreterResult> implemen public boolean isValidInterpreter(String replName) { try { - return interpreterFactory.getInterpreter(user, note.getId(), replName, + return note.getInterpreterFactory().getInterpreter(user, note.getId(), replName, note.getDefaultInterpreterGroup()) != null; } catch (InterpreterNotFoundException e) { return false; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/InMemoryNotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/InMemoryNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/InMemoryNotebookRepo.java new file mode 100644 index 0000000..64e1f38 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/InMemoryNotebookRepo.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.notebook.repo; + +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.user.AuthenticationInfo; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class InMemoryNotebookRepo implements NotebookRepo { + + private Map<String, Note> notes = new HashMap<>(); + + @Override + public void init(ZeppelinConfiguration zConf) throws IOException { + + } + + @Override + public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException { + Map<String, NoteInfo> notesInfo = new HashMap<>(); + for (Note note : notes.values()) { + notesInfo.put(note.getId(), new NoteInfo(note)); + } + return notesInfo; + } + + @Override + public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + return notes.get(noteId); + } + + @Override + public void save(Note note, AuthenticationInfo subject) throws IOException { + notes.put(note.getId(), note); + } + + @Override + public void move(String noteId, String notePath, String newNotePath, AuthenticationInfo subject) { + + } + + @Override + public void move(String folderPath, String newFolderPath, AuthenticationInfo subject) { + + } + + @Override + public void remove(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + notes.remove(noteId); + } + + @Override + public void remove(String folderPath, AuthenticationInfo subject) { + + } + + @Override + public void close() { + + } + + @Override + public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject) { + return null; + } + + @Override + public void updateSettings(Map<String, String> settings, AuthenticationInfo subject) { + + } + + public void reset() { + this.notes.clear(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java index 02e5114..c7956e3 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -17,17 +17,16 @@ package org.apache.zeppelin.notebook.repo; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.commons.lang.StringUtils; import org.apache.zeppelin.annotation.ZeppelinApi; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.user.AuthenticationInfo; +import java.io.IOException; +import java.util.List; +import java.util.Map; + /** * Notebook repository (persistence layer) abstraction */ @@ -36,46 +35,89 @@ public interface NotebookRepo { void init(ZeppelinConfiguration zConf) throws IOException; /** - * Lists notebook information about all notebooks in storage. + * Lists notebook information about all notebooks in storage. This method should only read + * the metadata of note, rather than reading all notes which usually takes long time. + * * @param subject contains user information. * @return * @throws IOException */ - @ZeppelinApi public List<NoteInfo> list(AuthenticationInfo subject) throws IOException; + @ZeppelinApi + Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException; /** - * Get the notebook with the given id. - * @param noteId is note id. - * @param subject contains user information. + * Get the notebook with the given id and given notePath. + * + * @param noteId is note id. + * @param notePath is note path + * @param subject contains user information. * @return * @throws IOException */ - @ZeppelinApi public Note get(String noteId, AuthenticationInfo subject) throws IOException; + @ZeppelinApi + Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException; /** * Save given note in storage - * @param note is the note itself. + * + * @param note is the note itself. * @param subject contains user information. * @throws IOException */ - @ZeppelinApi public void save(Note note, AuthenticationInfo subject) throws IOException; + @ZeppelinApi + void save(Note note, AuthenticationInfo subject) throws IOException; /** - * Remove note with given id. - * @param noteId is the note id. - * @param subject contains user information. + * + * Move given note to another path + * + * @param noteId + * @param notePath + * @param newNotePath * @throws IOException */ - @ZeppelinApi public void remove(String noteId, AuthenticationInfo subject) throws IOException; + @ZeppelinApi + void move(String noteId, String notePath, String newNotePath, + AuthenticationInfo subject) throws IOException; /** - * Release any underlying resources + * Move folder to another path + * + * @param folderPath + * @param newFolderPath + * @param subject + * @throws IOException + */ + void move(String folderPath, String newFolderPath, + AuthenticationInfo subject) throws IOException; + + /** + * Remove note with given id and notePath + * + * @param noteId is note id. + * @param notePath is note path + * @param subject contains user information. + * @throws IOException + */ + @ZeppelinApi + void remove(String noteId, String notePath, AuthenticationInfo subject) throws IOException; + + /** + * Remove folder + * + * @param folderPath + * @param subject + * @throws IOException */ - @ZeppelinApi public void close(); + @ZeppelinApi + void remove(String folderPath, AuthenticationInfo subject) throws IOException; /** - * Versioning API (optional, preferred to have). + * Release any underlying resources */ + @ZeppelinApi + void close(); + /** * Get NotebookRepo settings got the given user. @@ -83,7 +125,8 @@ public interface NotebookRepo { * @param subject * @return */ - @ZeppelinApi public List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject); + @ZeppelinApi + List<NotebookRepoSettingsInfo> getSettings(AuthenticationInfo subject); /** * update notebook repo settings. @@ -91,6 +134,49 @@ public interface NotebookRepo { * @param settings * @param subject */ - @ZeppelinApi public void updateSettings(Map<String, String> settings, AuthenticationInfo subject); + @ZeppelinApi + void updateSettings(Map<String, String> settings, AuthenticationInfo subject); + + default String buildNoteFileName(String noteId, String notePath) throws IOException { + if (!notePath.startsWith("/")) { + throw new IOException("Invalid notePath: " + notePath); + } + return (notePath + "_" + noteId + ".zpln").substring(1); + } + + default String buildNoteFileName(Note note) throws IOException { + return buildNoteFileName(note.getId(), note.getPath()); + } + + default String buildNoteTempFileName(Note note) { + return (note.getPath() + "_" + note.getId() + ".tmp").substring(1); + } + + default String getNoteId(String noteFileName) throws IOException { + int separatorIndex = noteFileName.lastIndexOf("_"); + if (separatorIndex == -1) { + throw new IOException( + "Invalid note name, no '_' in note name: " + noteFileName); + } + try { + int dotIndex = noteFileName.lastIndexOf("."); + return noteFileName.substring(separatorIndex + 1, dotIndex); + } catch (StringIndexOutOfBoundsException e) { + throw new IOException("Invalid note name: " + noteFileName); + } + } + default String getNotePath(String rootNoteFolder, String noteFileName) + throws IOException { + int index = noteFileName.lastIndexOf("_"); + if (index == -1) { + throw new IOException( + "Invalid note name, no '_' in note name: " + noteFileName); + } + try { + return noteFileName.substring(rootNoteFolder.length(), index); + } catch (StringIndexOutOfBoundsException e) { + throw new IOException("Invalid note name: " + noteFileName); + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java index 38665ff..ad486b9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoSync.java @@ -30,21 +30,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.*; /** * Notebook repository sync with remote storage */ public class NotebookRepoSync implements NotebookRepoWithVersionControl { - private static final Logger LOG = LoggerFactory.getLogger(NotebookRepoSync.class); + private static final Logger LOGGER = LoggerFactory.getLogger(NotebookRepoSync.class); private static final int maxRepoNum = 2; private static final String pushKey = "pushNoteIds"; private static final String pullKey = "pullNoteIds"; private static final String delDstKey = "delDstNoteIds"; - private static ZeppelinConfiguration config; private static final String defaultStorage = "org.apache.zeppelin.notebook.repo.GitNotebookRepo"; private List<NotebookRepo> repos = new ArrayList<>(); @@ -59,16 +56,16 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } public void init(ZeppelinConfiguration conf) throws IOException { - config = conf; oneWaySync = conf.getBoolean(ConfVars.ZEPPELIN_NOTEBOOK_ONE_WAY_SYNC); String allStorageClassNames = conf.getNotebookStorageClass().trim(); if (allStorageClassNames.isEmpty()) { allStorageClassNames = defaultStorage; - LOG.warn("Empty ZEPPELIN_NOTEBOOK_STORAGE conf parameter, using default {}", defaultStorage); + LOGGER.warn("Empty ZEPPELIN_NOTEBOOK_STORAGE conf parameter, using default {}", + defaultStorage); } String[] storageClassNames = allStorageClassNames.split(","); if (storageClassNames.length > getMaxRepoNum()) { - LOG.warn("Unsupported number {} of storage classes in ZEPPELIN_NOTEBOOK_STORAGE : {}\n" + + LOGGER.warn("Unsupported number {} of storage classes in ZEPPELIN_NOTEBOOK_STORAGE : {}\n" + "first {} will be used", storageClassNames.length, allStorageClassNames, getMaxRepoNum()); } @@ -81,7 +78,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } // couldn't initialize any storage, use default if (getRepoCount() == 0) { - LOG.info("No storage could be initialized, using default {} storage", defaultStorage); + LOGGER.info("No storage could be initialized, using default {} storage", defaultStorage); NotebookRepo defaultNotebookRepo = PluginManager.get().loadNotebookRepo(defaultStorage); defaultNotebookRepo.init(conf); repos.add(defaultNotebookRepo); @@ -91,7 +88,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { try { sync(AuthenticationInfo.ANONYMOUS); } catch (IOException e) { - LOG.error("Couldn't sync on start ", e); + LOGGER.error("Couldn't sync on start ", e); } } } @@ -133,26 +130,26 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { * Lists Notebooks from the first repository */ @Override - public List<NoteInfo> list(AuthenticationInfo subject) throws IOException { + public Map<String, NoteInfo> list(AuthenticationInfo subject) throws IOException { return getRepo(0).list(subject); } /* list from specific repo (for tests) */ List<NoteInfo> list(int repoIndex, AuthenticationInfo subject) throws IOException { - return getRepo(repoIndex).list(subject); + return new ArrayList<>(getRepo(repoIndex).list(subject).values()); } /** * Returns from Notebook from the first repository */ @Override - public Note get(String noteId, AuthenticationInfo subject) throws IOException { - return getRepo(0).get(noteId, subject); + public Note get(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + return getRepo(0).get(noteId, notePath, subject); } /* get note from specific repo (for tests) */ - Note get(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException { - return getRepo(repoIndex).get(noteId, subject); + Note get(int repoIndex, String noteId, String noteName, AuthenticationInfo subject) throws IOException { + return getRepo(repoIndex).get(noteId, noteName, subject); } /** @@ -166,7 +163,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { getRepo(1).save(note, subject); } catch (IOException e) { - LOG.info(e.getMessage() + ": Failed to write to secondary storage"); + LOGGER.info(e.getMessage() + ": Failed to write to secondary storage"); } } } @@ -177,15 +174,44 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } @Override - public void remove(String noteId, AuthenticationInfo subject) throws IOException { + public void move(String noteId, String notePath, String newNotePath, + AuthenticationInfo subject) throws IOException { + getRepo(0).move(noteId, notePath, newNotePath, subject); + if (getRepoCount() > 1) { + try { + getRepo(1).move(noteId, notePath, newNotePath, subject); + } + catch (IOException e) { + LOGGER.info(e.getMessage() + ": Failed to write to secondary storage"); + } + } + } + + @Override + public void move(String folderPath, String newFolderPath, + AuthenticationInfo subject) throws IOException { for (NotebookRepo repo : repos) { - repo.remove(noteId, subject); + repo.move(folderPath, newFolderPath, subject); + } + } + + @Override + public void remove(String noteId, String notePath, AuthenticationInfo subject) throws IOException { + for (NotebookRepo repo : repos) { + repo.remove(noteId, notePath, subject); } /* TODO(khalid): handle case when removing from secondary storage fails */ } - void remove(int repoIndex, String noteId, AuthenticationInfo subject) throws IOException { - getRepo(repoIndex).remove(noteId, subject); + @Override + public void remove(String folderPath, AuthenticationInfo subject) throws IOException { + for (NotebookRepo repo : repos) { + repo.remove(folderPath, subject); + } + } + + void remove(int repoIndex, String noteId, String noteName, AuthenticationInfo subject) throws IOException { + getRepo(repoIndex).remove(noteId, noteName, subject); } /** @@ -194,67 +220,67 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { * @throws IOException */ void sync(int sourceRepoIndex, int destRepoIndex, AuthenticationInfo subject) throws IOException { - LOG.info("Sync started"); + LOGGER.info("Sync started"); NotebookAuthorization auth = NotebookAuthorization.getInstance(); NotebookRepo srcRepo = getRepo(sourceRepoIndex); NotebookRepo dstRepo = getRepo(destRepoIndex); - List <NoteInfo> allSrcNotes = srcRepo.list(subject); - List <NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject); - List <NoteInfo> dstNotes = dstRepo.list(subject); + List<NoteInfo> allSrcNotes = new ArrayList<>(srcRepo.list(subject).values()); + List<NoteInfo> srcNotes = auth.filterByUser(allSrcNotes, subject); + List<NoteInfo> dstNotes = new ArrayList<>(dstRepo.list(subject).values()); - Map<String, List<String>> noteIds = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo, + Map<String, List<NoteInfo>> noteIds = notesCheckDiff(srcNotes, srcRepo, dstNotes, dstRepo, subject); - List<String> pushNoteIds = noteIds.get(pushKey); - List<String> pullNoteIds = noteIds.get(pullKey); - List<String> delDstNoteIds = noteIds.get(delDstKey); + List<NoteInfo> pushNoteIds = noteIds.get(pushKey); + List<NoteInfo> pullNoteIds = noteIds.get(pullKey); + List<NoteInfo> delDstNoteIds = noteIds.get(delDstKey); if (!pushNoteIds.isEmpty()) { - LOG.info("Notes with the following IDs will be pushed"); - for (String id : pushNoteIds) { - LOG.info("ID : " + id); + LOGGER.info("The following notes will be pushed"); + for (NoteInfo noteInfo : pushNoteIds) { + LOGGER.info("Note : " + noteIds); } pushNotes(subject, pushNoteIds, srcRepo, dstRepo, false); } else { - LOG.info("Nothing to push"); + LOGGER.info("Nothing to push"); } if (!pullNoteIds.isEmpty()) { - LOG.info("Notes with the following IDs will be pulled"); - for (String id : pullNoteIds) { - LOG.info("ID : " + id); + LOGGER.info("The following notes will be pulled"); + for (NoteInfo noteInfo : pullNoteIds) { + LOGGER.info("Note : " + noteInfo); } pushNotes(subject, pullNoteIds, dstRepo, srcRepo, true); } else { - LOG.info("Nothing to pull"); + LOGGER.info("Nothing to pull"); } if (!delDstNoteIds.isEmpty()) { - LOG.info("Notes with the following IDs will be deleted from dest"); - for (String id : delDstNoteIds) { - LOG.info("ID : " + id); + LOGGER.info("The following notes will be deleted from dest"); + for (NoteInfo noteInfo : delDstNoteIds) { + LOGGER.info("Note : " + noteIds); } deleteNotes(subject, delDstNoteIds, dstRepo); } else { - LOG.info("Nothing to delete from dest"); + LOGGER.info("Nothing to delete from dest"); } - LOG.info("Sync ended"); + LOGGER.info("Sync ended"); } public void sync(AuthenticationInfo subject) throws IOException { sync(0, 1, subject); } - private void pushNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo localRepo, + private void pushNotes(AuthenticationInfo subject, List<NoteInfo> notesInfo, NotebookRepo localRepo, NotebookRepo remoteRepo, boolean setPermissions) { - for (String id : ids) { + for (NoteInfo noteInfo : notesInfo) { try { - remoteRepo.save(localRepo.get(id, subject), subject); - if (setPermissions && emptyNoteAcl(id)) { - makePrivate(id, subject); + remoteRepo.save(localRepo.get(noteInfo.getId(), noteInfo.getPath(), subject), subject); + if (setPermissions && emptyNoteAcl(noteInfo.getId())) { + makePrivate(noteInfo.getId(), subject); } } catch (IOException e) { - LOG.error("Failed to push note to storage, moving onto next one", e); + LOGGER.error("Failed to push note to storage, moving onto next one", e); } } } @@ -269,7 +295,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { private void makePrivate(String noteId, AuthenticationInfo subject) { if (AuthenticationInfo.isAnonymous(subject)) { - LOG.info("User is anonymous, permissions are not set for pulled notes"); + LOGGER.info("User is anonymous, permissions are not set for pulled notes"); return; } NotebookAuthorization notebookAuthorization = NotebookAuthorization.getInstance(); @@ -287,10 +313,10 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { notebookAuthorization.setWriters(noteId, users); } - private void deleteNotes(AuthenticationInfo subject, List<String> ids, NotebookRepo repo) + private void deleteNotes(AuthenticationInfo subject, List<NoteInfo> noteInfos, NotebookRepo repo) throws IOException { - for (String id : ids) { - repo.remove(id, subject); + for (NoteInfo noteInfo : noteInfos) { + repo.remove(noteInfo.getId(), noteInfo.getPath(), subject); } } @@ -310,12 +336,12 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return repos.get(repoIndex); } - private Map<String, List<String>> notesCheckDiff(List<NoteInfo> sourceNotes, + private Map<String, List<NoteInfo>> notesCheckDiff(List<NoteInfo> sourceNotes, NotebookRepo sourceRepo, List<NoteInfo> destNotes, NotebookRepo destRepo, AuthenticationInfo subject) { - List <String> pushIDs = new ArrayList<>(); - List <String> pullIDs = new ArrayList<>(); - List <String> delDstIDs = new ArrayList<>(); + List<NoteInfo> pushIDs = new ArrayList<>(); + List<NoteInfo> pullIDs = new ArrayList<>(); + List<NoteInfo> delDstIDs = new ArrayList<>(); NoteInfo dnote; Date sdate, ddate; @@ -324,10 +350,10 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { if (dnote != null) { try { /* note exists in source and destination storage systems */ - sdate = lastModificationDate(sourceRepo.get(snote.getId(), subject)); - ddate = lastModificationDate(destRepo.get(dnote.getId(), subject)); + sdate = lastModificationDate(sourceRepo.get(snote.getId(), snote.getPath(), subject)); + ddate = lastModificationDate(destRepo.get(dnote.getId(), dnote.getPath(), subject)); } catch (IOException e) { - LOG.error("Cannot access previously listed note {} from storage ", dnote.getId(), e); + LOGGER.error("Cannot access previously listed note {} from storage ", dnote.getId(), e); continue; } @@ -335,19 +361,19 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { if (sdate.after(ddate) || oneWaySync) { /* if source contains more up to date note - push * if oneWaySync is enabled, always push no matter who's newer */ - pushIDs.add(snote.getId()); - LOG.info("Modified note is added to push list : " + sdate); + pushIDs.add(snote); + LOGGER.info("Modified note is added to push list : " + sdate); } else { /* destination contains more up to date note - pull */ - LOG.info("Modified note is added to pull list : " + ddate); - pullIDs.add(snote.getId()); + LOGGER.info("Modified note is added to pull list : " + ddate); + pullIDs.add(snote); } } } else { /* note exists in source storage, and absent in destination * view source as up to date - push * (another scenario : note was deleted from destination - not considered)*/ - pushIDs.add(snote.getId()); + pushIDs.add(snote); } } @@ -357,17 +383,17 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { /* note exists in destination storage, and absent in source */ if (oneWaySync) { /* if oneWaySync is enabled, delete the note from destination */ - LOG.info("Extraneous note is added to delete dest list : " + note.getId()); - delDstIDs.add(note.getId()); + LOGGER.info("Extraneous note is added to delete dest list : " + note.getId()); + delDstIDs.add(note); } else { /* if oneWaySync is disabled, pull the note from destination */ - LOG.info("Missing note is added to pull list : " + note.getId()); - pullIDs.add(note.getId()); + LOGGER.info("Missing note is added to pull list : " + note.getId()); + pullIDs.add(note); } } } - Map<String, List<String>> map = new HashMap<>(); + Map<String, List<NoteInfo>> map = new HashMap<>(); map.put(pushKey, pushIDs); map.put(pullKey, pullIDs); map.put(delDstKey, delDstIDs); @@ -410,7 +436,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { @Override public void close() { - LOG.info("Closing all notebook storages"); + LOGGER.info("Closing all notebook storages"); for (NotebookRepo repo: repos) { repo.close(); } @@ -426,14 +452,14 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { return true; } } catch (IOException e) { - LOG.error("Error getting default repo", e); + LOGGER.error("Error getting default repo", e); } return false; } //checkpoint to all available storages @Override - public Revision checkpoint(String noteId, String checkpointMsg, AuthenticationInfo subject) + public Revision checkpoint(String noteId, String noteName, String checkpointMsg, AuthenticationInfo subject) throws IOException { int repoCount = getRepoCount(); int repoBound = Math.min(repoCount, getMaxRepoNum()); @@ -446,10 +472,10 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { if (isRevisionSupportedInRepo(i)) { allRepoCheckpoints .add(((NotebookRepoWithVersionControl) getRepo(i)) - .checkpoint(noteId, checkpointMsg, subject)); + .checkpoint(noteId, noteName, checkpointMsg, subject)); } } catch (IOException e) { - LOG.warn("Couldn't checkpoint in {} storage with index {} for note {}", + LOGGER.warn("Couldn't checkpoint in {} storage with index {} for note {}", getRepo(i).getClass().toString(), i, noteId); errorMessage += "Error on storage class " + getRepo(i).getClass().toString() + " with index " + i + " : " + e.getMessage() + "\n"; @@ -471,27 +497,30 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { } @Override - public Note get(String noteId, String revId, AuthenticationInfo subject) { + public Note get(String noteId, String noteName, String revId, AuthenticationInfo subject) { Note revisionNote = null; try { if (isRevisionSupportedInDefaultRepo()) { - revisionNote = ((NotebookRepoWithVersionControl) getRepo(0)).get(noteId, revId, subject); + revisionNote = ((NotebookRepoWithVersionControl) getRepo(0)).get(noteId, noteName, + revId, subject); } } catch (IOException e) { - LOG.error("Failed to get revision {} of note {}", revId, noteId, e); + LOGGER.error("Failed to get revision {} of note {}", revId, noteId, e); } return revisionNote; } @Override - public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject) { + public List<Revision> revisionHistory(String noteId, String noteName, + AuthenticationInfo subject) { List<Revision> revisions = Collections.emptyList(); try { if (isRevisionSupportedInDefaultRepo()) { - revisions = ((NotebookRepoWithVersionControl) getRepo(0)).revisionHistory(noteId, subject); + revisions = ((NotebookRepoWithVersionControl) getRepo(0)) + .revisionHistory(noteId, noteName, subject); } } catch (IOException e) { - LOG.error("Failed to list revision history", e); + LOGGER.error("Failed to list revision history", e); } return revisions; } @@ -502,7 +531,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { try { repoSettings = getRepo(0).getSettings(subject); } catch (IOException e) { - LOG.error("Cannot get notebook repo settings", e); + LOGGER.error("Cannot get notebook repo settings", e); } return repoSettings; } @@ -512,12 +541,12 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { try { getRepo(0).updateSettings(settings, subject); } catch (IOException e) { - LOG.error("Cannot update notebook repo settings", e); + LOGGER.error("Cannot update notebook repo settings", e); } } @Override - public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) + public Note setNoteRevision(String noteId, String noteName, String revId, AuthenticationInfo subject) throws IOException { int repoCount = getRepoCount(); int repoBound = Math.min(repoCount, getMaxRepoNum()); @@ -526,7 +555,7 @@ public class NotebookRepoSync implements NotebookRepoWithVersionControl { try { if (isRevisionSupportedInRepo(i)) { currentNote = ((NotebookRepoWithVersionControl) getRepo(i)) - .setNoteRevision(noteId, revId, subject); + .setNoteRevision(noteId, noteName, revId, subject); } } catch (IOException e) { // already logged http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java index 05c846e..ba5f4cf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepoWithVersionControl.java @@ -34,49 +34,57 @@ public interface NotebookRepoWithVersionControl extends NotebookRepo { /** * chekpoint (set revision) for notebook. - * @param noteId Id of the Notebook + * @param noteId Id of the note + * @param noteName name of the note * @param checkpointMsg message description of the checkpoint * @return Rev * @throws IOException */ - @ZeppelinApi public Revision checkpoint(String noteId, String checkpointMsg, - AuthenticationInfo subject) throws IOException; + @ZeppelinApi Revision checkpoint(String noteId, + String noteName, + String checkpointMsg, + AuthenticationInfo subject) throws IOException; /** * Get particular revision of the Notebook. * - * @param noteId Id of the Notebook + * @param noteId Id of the note + * @param noteName name of the note * @param revId revision of the Notebook * @return a Notebook * @throws IOException */ - @ZeppelinApi public Note get(String noteId, String revId, AuthenticationInfo subject) + @ZeppelinApi Note get(String noteId, String noteName, String revId, AuthenticationInfo subject) throws IOException; /** * List of revisions of the given Notebook. * - * @param noteId id of the Notebook + * @param noteId id of the note + * @param noteName name of the note + * @param subject * @return list of revisions */ - @ZeppelinApi public List<Revision> revisionHistory(String noteId, AuthenticationInfo subject); + @ZeppelinApi List<Revision> revisionHistory(String noteId, + String noteName, + AuthenticationInfo subject) throws IOException; /** * Set note to particular revision. * * @param noteId Id of the Notebook + * @param noteName name of the note * @param revId revision of the Notebook * @return a Notebook * @throws IOException */ - @ZeppelinApi - public Note setNoteRevision(String noteId, String revId, AuthenticationInfo subject) - throws IOException; + @ZeppelinApi Note setNoteRevision(String noteId, String noteName, String revId, + AuthenticationInfo subject) throws IOException; /** * Represents the 'Revision' a point in life of the notebook */ - static class Revision { + class Revision { public static final Revision EMPTY = new Revision(StringUtils.EMPTY, StringUtils.EMPTY, 0); public String id; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java index 4c9de8a..177dfec 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/scheduler/RemoteScheduler.java @@ -151,7 +151,6 @@ public class RemoteScheduler extends AbstractScheduler { return getLastStatus(); } Status status = Status.valueOf(remoteInterpreter.getStatus(job.getId())); - LOGGER.info("getStatus:" + status.name()); if (status == Status.UNKNOWN) { // not found this job in the remote schedulers. // maybe not submitted, maybe already finished @@ -159,7 +158,6 @@ public class RemoteScheduler extends AbstractScheduler { } lastStatus = status; listener.onStatusChange(job, null, status); - LOGGER.info("status:" + getLastStatus()); return status; } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java index ccdc574..d04dcd4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/LuceneSearch.java @@ -69,7 +69,7 @@ import org.slf4j.LoggerFactory; * new IndexReader every time. Index is thread-safe, as re-uses single IndexWriter, which is * thread-safe. */ -public class LuceneSearch implements SearchService { +public class LuceneSearch extends SearchService { private static final Logger logger = LoggerFactory.getLogger(LuceneSearch.class); private static final String SEARCH_FIELD_TEXT = "contents"; @@ -85,6 +85,7 @@ public class LuceneSearch implements SearchService { private IndexWriter indexWriter; public LuceneSearch(ZeppelinConfiguration zeppelinConfiguration) { + super("LuceneSearch-Thread"); this.zeppelinConfiguration = zeppelinConfiguration; if (zeppelinConfiguration.isZeppelinSearchUseDisk()) { try { @@ -361,8 +362,8 @@ public class LuceneSearch implements SearchService { * @see org.apache.zeppelin.search.Search#deleteIndexDocs(org.apache.zeppelin.notebook.Note) */ @Override - public void deleteIndexDocs(Note note) { - deleteDoc(note, null); + public void deleteIndexDocs(String noteId) { + deleteDoc(noteId, null); } /* (non-Javadoc) @@ -370,22 +371,18 @@ public class LuceneSearch implements SearchService { * #deleteIndexDoc(org.apache.zeppelin.notebook.Note, org.apache.zeppelin.notebook.Paragraph) */ @Override - public void deleteIndexDoc(Note note, Paragraph p) { - deleteDoc(note, p); + public void deleteIndexDoc(String noteId, Paragraph p) { + deleteDoc(noteId, p); } - private void deleteDoc(Note note, Paragraph p) { - if (null == note) { - logger.error("Trying to delete note by reference to NULL"); - return; - } - String fullNoteOrJustParagraph = formatDeleteId(note.getId(), p); - logger.debug("Deleting note {}, out of: {}", note.getId(), indexWriter.numDocs()); + private void deleteDoc(String noteId, Paragraph p) { + String fullNoteOrJustParagraph = formatDeleteId(noteId, p); + logger.debug("Deleting note {}, out of: {}", noteId, indexWriter.numDocs()); try { indexWriter.deleteDocuments(new WildcardQuery(new Term(ID_FIELD, fullNoteOrJustParagraph))); indexWriter.commit(); } catch (IOException e) { - logger.error("Failed to delete {} from index by '{}'", note, fullNoteOrJustParagraph, e); + logger.error("Failed to delete {} from index by '{}'", noteId, fullNoteOrJustParagraph, e); } logger.debug("Done, index contains {} docs now" + indexWriter.numDocs()); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java index 0b86ac6..53d5e92 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/search/SearchService.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteEventAsyncListener; import org.apache.zeppelin.notebook.Paragraph; /** @@ -31,7 +32,11 @@ import org.apache.zeppelin.notebook.Paragraph; * - local Lucene (in-memory, on-disk) * - remote Elasticsearch */ -public interface SearchService { +public abstract class SearchService extends NoteEventAsyncListener { + + public SearchService(String name) { + super(name); + } /** * Full-text search in all the notes @@ -39,7 +44,7 @@ public interface SearchService { * @param queryStr a query * @return A list of matching paragraphs (id, text, snippet w/ highlight) */ - public List<Map<String, String>> query(String queryStr); + public abstract List<Map<String, String>> query(String queryStr); /** * Updates all documents in index for the given note: @@ -49,39 +54,83 @@ public interface SearchService { * @param note a Note to update index for * @throws IOException */ - public void updateIndexDoc(Note note) throws IOException; + public abstract void updateIndexDoc(Note note) throws IOException; /** * Indexes full collection of notes: all the paragraphs + Note names * * @param collection of Notes */ - public void addIndexDocs(Collection<Note> collection); + public abstract void addIndexDocs(Collection<Note> collection); /** * Indexes the given note. * * @throws IOException If there is a low-level I/O error */ - public void addIndexDoc(Note note); + public abstract void addIndexDoc(Note note); /** * Deletes all docs on given Note from index */ - public void deleteIndexDocs(Note note); + public abstract void deleteIndexDocs(String noteId); /** * Deletes doc for a given * - * @param note + * @param noteId * @param p * @throws IOException */ - public void deleteIndexDoc(Note note, Paragraph p); + public abstract void deleteIndexDoc(String noteId, Paragraph p); /** * Frees the recourses used by index */ - public void close(); + public void close() { + super.close(); + } + + @Override + public void handleNoteCreateEvent(NoteCreateEvent noteCreateEvent) { + addIndexDoc(noteCreateEvent.getNote()); + } + + @Override + public void handleNoteRemoveEvent(NoteRemoveEvent noteRemoveEvent) { + deleteIndexDocs(noteRemoveEvent.getNote().getId()); + } + + @Override + public void handleNoteUpdateEvent(NoteUpdateEvent noteUpdateEvent) { + try { + updateIndexDoc(noteUpdateEvent.getNote()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void handleParagraphCreateEvent(ParagraphCreateEvent paragraphCreateEvent) { + try { + updateIndexDoc(paragraphCreateEvent.getParagraph().getNote()); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void handleParagraphRemoveEvent(ParagraphRemoveEvent paragraphRemoveEvent) { + Paragraph p = paragraphRemoveEvent.getParagraph(); + deleteIndexDoc(p.getNote().getId(), p); + } + @Override + public void handleParagraphUpdateEvent(ParagraphUpdateEvent paragraphUpdateEvent) { + try { + updateIndexDoc(paragraphUpdateEvent.getParagraph().getNote()); + } catch (IOException e) { + e.printStackTrace(); + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java index 1d44dd5..4b57b00 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -69,6 +69,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { notebookAuthorization, new Credentials(false, null, null)); + heliumAppFactory.setNotebook(notebook); notebook.addNotebookEventListener(heliumAppFactory); @@ -94,7 +95,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { new String[][]{}, "", ""); - Note note1 = notebook.createNote(anonymous); + Note note1 = notebook.createNote("note1", anonymous); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -139,8 +140,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { new String[][]{}, "", ""); - Note note1 = notebook.createNote(anonymous); - + Note note1 = notebook.createNote("note1", anonymous); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); // make sure interpreter process running @@ -178,7 +178,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { new String[][]{}, "", ""); - Note note1 = notebook.createNote(anonymous); + Note note1 = notebook.createNote("note1", anonymous); Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -205,7 +205,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { @Test public void testInterpreterUnbindOfNullReplParagraph() throws IOException { // create note - Note note1 = notebook.createNote(anonymous); + Note note1 = notebook.createNote("note1", anonymous); // add paragraph with invalid magic Paragraph p1 = note1.addNewParagraph(AuthenticationInfo.ANONYMOUS); @@ -236,7 +236,7 @@ public class HeliumApplicationFactoryTest extends AbstractInterpreterTest { new String[][]{}, "", ""); - Note note1 = notebook.createNote(anonymous); + Note note1 = notebook.createNote("note1", anonymous); String mock1IntpSettingId = null; for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.getId())) { if (setting.getName().equals("mock1")) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/085efeb6/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java index cd556a6..4f975fb 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/AbstractInterpreterTest.java @@ -71,7 +71,7 @@ public abstract class AbstractInterpreterTest { } protected Note createNote() { - return new Note("test", "test", null, interpreterFactory, interpreterSettingManager, null, null, null, null); + return new Note("test", "test", interpreterFactory, interpreterSettingManager, null, null, null); } protected InterpreterContext createDummyInterpreterContext() {
