Repository: incubator-zeppelin Updated Branches: refs/heads/master 87f28ab4b -> 4fa3db81d
ZEPPELIN-26 Pluggable notebook persistence layer See https://issues.apache.org/jira/browse/ZEPPELIN-26 * [x] Notebook persistence layer abstraction * [x] Make persistence layer implementation selection configurable This PR abstract notebook persistence layer and provides one implementation based on commons-vfs. Author: Lee moon soo <[email protected]> Closes #44 from Leemoonsoo/ZEPPELIN-26 and squashes the following commits: c847d1c [Lee moon soo] Restore reference of note from paragraph 48fdd8e [Lee moon soo] Remove URI param from NotebookRepo constructor 44d6a6f [Lee moon soo] ZEPPELIN-26 let implementation configurable 1d63234 [Lee moon soo] Add notebook storage abstraction and implementation based on commons-vfs2 Project: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/commit/4fa3db81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/tree/4fa3db81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/diff/4fa3db81 Branch: refs/heads/master Commit: 4fa3db81d393c1ae2fff555e651df327ec76f97a Parents: 87f28ab Author: Lee moon soo <[email protected]> Authored: Mon Apr 27 11:32:52 2015 +0900 Committer: Lee moon soo <[email protected]> Committed: Tue Apr 28 07:02:15 2015 +0900 ---------------------------------------------------------------------- conf/zeppelin-site.xml.template | 8 +- .../apache/zeppelin/server/ZeppelinServer.java | 11 +- zeppelin-zengine/pom.xml | 24 ++ .../zeppelin/conf/ZeppelinConfiguration.java | 4 +- .../java/org/apache/zeppelin/notebook/Note.java | 90 ++----- .../org/apache/zeppelin/notebook/NoteInfo.java | 67 ++++++ .../org/apache/zeppelin/notebook/Notebook.java | 131 +++++----- .../zeppelin/notebook/repo/NotebookRepo.java | 34 +++ .../zeppelin/notebook/repo/VFSNotebookRepo.java | 241 +++++++++++++++++++ .../apache/zeppelin/notebook/NotebookTest.java | 8 +- 10 files changed, 479 insertions(+), 139 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/conf/zeppelin-site.xml.template ---------------------------------------------------------------------- diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index fae1104..9f773d5 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -49,7 +49,13 @@ <property> <name>zeppelin.notebook.dir</name> <value>notebook</value> - <description>notebook persist</description> + <description>path or URI for notebook persist</description> +</property> + +<property> + <name>zeppelin.notebook.storage</name> + <value>org.apache.zeppelin.notebook.repo.VFSNotebookRepo</value> + <description>notebook persistence layer implementation</description> </property> <property> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index bd55b2d..0072b87 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.server; import java.io.File; import java.io.IOException; +import java.lang.reflect.Constructor; import java.util.EnumSet; import java.util.HashSet; import java.util.Set; @@ -32,6 +33,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.notebook.Notebook; +import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.rest.InterpreterRestApi; import org.apache.zeppelin.rest.NotebookRestApi; import org.apache.zeppelin.rest.ZeppelinRestApi; @@ -71,6 +73,8 @@ public class ZeppelinServer extends Application { private InterpreterFactory replFactory; + private NotebookRepo notebookRepo; + public static void main(String[] args) throws Exception { ZeppelinConfiguration conf = ZeppelinConfiguration.create(); conf.setProperty("args", args); @@ -299,7 +303,12 @@ public class ZeppelinServer extends Application { this.schedulerFactory = new SchedulerFactory(); this.replFactory = new InterpreterFactory(conf, notebookServer); - notebook = new Notebook(conf, schedulerFactory, replFactory, notebookServer); + Class<?> notebookStorageClass = getClass().forName( + conf.getString(ConfVars.ZEPPELIN_NOTEBOOK_STORAGE)); + Constructor<?> constructor = notebookStorageClass.getConstructor( + ZeppelinConfiguration.class); + this.notebookRepo = (NotebookRepo) constructor.newInstance(conf); + notebook = new Notebook(conf, notebookRepo, schedulerFactory, replFactory, notebookServer); } @Override http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/pom.xml ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml index b90847b..6a2c00a 100644 --- a/zeppelin-zengine/pom.xml +++ b/zeppelin-zengine/pom.xml @@ -77,6 +77,30 @@ </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-vfs2</artifactId> + <version>2.0</version> + </dependency> + + <dependency> + <groupId>org.apache.jackrabbit</groupId> + <artifactId>jackrabbit-webdav</artifactId> + <version>1.5.2</version> + <exclusions> + <exclusion> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>commons-httpclient</groupId> + <artifactId>commons-httpclient</artifactId> + <version>3.1</version> + </dependency> + + <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.1</version> http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 580860a..bbf46fc 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.XMLConfiguration; import org.apache.commons.configuration.tree.ConfigurationNode; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -328,7 +329,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { } public String getNotebookDir() { - return getRelativeDir(ConfVars.ZEPPELIN_NOTEBOOK_DIR); + return getString(ConfVars.ZEPPELIN_NOTEBOOK_DIR); } public String getInterpreterDir() { @@ -392,6 +393,7 @@ public class ZeppelinConfiguration extends XMLConfiguration { ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), + ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()), ZEPPELIN_INTERPRETER_REMOTE_RUNNER("zeppelin.interpreter.remoterunner", "bin/interpreter.sh"), // Decide when new note is created, interpreter settings will be binded automatically or not. ZEPPELIN_NOTEBOOK_AUTO_INTERPRETER_BINDING("zeppelin.notebook.autoInterpreterBinding", true); http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index b5e68a4..46b4c1a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -17,9 +17,6 @@ package org.apache.zeppelin.notebook; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.HashMap; @@ -28,27 +25,21 @@ import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.utility.IdHashes; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; -import org.apache.zeppelin.scheduler.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - /** * Binded interpreters for a note */ @@ -63,6 +54,7 @@ public class Note implements Serializable, JobListener { private transient NoteInterpreterLoader replLoader; private transient ZeppelinConfiguration conf; private transient JobListenerFactory jobListenerFactory; + private transient NotebookRepo repo; /** * note configurations. @@ -78,11 +70,13 @@ public class Note implements Serializable, JobListener { */ private Map<String, Object> info = new HashMap<String, Object>(); + public Note() {} - public Note(ZeppelinConfiguration conf, NoteInterpreterLoader replLoader, - JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) { - this.conf = conf; + public Note(NotebookRepo repo, + NoteInterpreterLoader replLoader, + JobListenerFactory jobListenerFactory) { + this.repo = repo; this.replLoader = replLoader; this.jobListenerFactory = jobListenerFactory; generateId(); @@ -112,8 +106,20 @@ public class Note implements Serializable, JobListener { this.replLoader = replLoader; } - public void setZeppelinConfiguration(ZeppelinConfiguration conf) { - this.conf = conf; + public JobListenerFactory getJobListenerFactory() { + return jobListenerFactory; + } + + public void setJobListenerFactory(JobListenerFactory jobListenerFactory) { + this.jobListenerFactory = jobListenerFactory; + } + + public NotebookRepo getNotebookRepo() { + return repo; + } + + public void setNotebookRepo(NotebookRepo repo) { + this.repo = repo; } public Map<String, List<AngularObject>> getAngularObjects() { @@ -294,61 +300,11 @@ public class Note implements Serializable, JobListener { } public void persist() throws IOException { - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.setPrettyPrinting(); - Gson gson = gsonBuilder.create(); - - File dir = new File(conf.getNotebookDir() + "/" + id); - if (!dir.exists()) { - dir.mkdirs(); - } else if (dir.isFile()) { - throw new RuntimeException("File already exists" + dir.toString()); - } - - File file = new File(conf.getNotebookDir() + "/" + id + "/note.json"); - logger().info("Persist note {} into {}", id, file.getAbsolutePath()); - - snapshotAngularObjectRegistry(); - String json = gson.toJson(this); - FileOutputStream out = new FileOutputStream(file); - out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); - out.close(); + repo.save(this); } public void unpersist() throws IOException { - File dir = new File(conf.getNotebookDir() + "/" + id); - - FileUtils.deleteDirectory(dir); - } - - public static Note load(String id, ZeppelinConfiguration conf, NoteInterpreterLoader replLoader, - Scheduler scheduler, JobListenerFactory jobListenerFactory, org.quartz.Scheduler quartzSched) - throws IOException { - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.setPrettyPrinting(); - Gson gson = gsonBuilder.create(); - - File file = new File(conf.getNotebookDir() + "/" + id + "/note.json"); - logger().info("Load note {} from {}", id, file.getAbsolutePath()); - - if (!file.isFile()) { - return null; - } - - FileInputStream ins = new FileInputStream(file); - String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); - Note note = gson.fromJson(json, Note.class); - note.setZeppelinConfiguration(conf); - note.setReplLoader(replLoader); - note.jobListenerFactory = jobListenerFactory; - for (Paragraph p : note.paragraphs) { - p.setNote(note); - - if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { - p.setStatus(Status.ABORT); - } - } - return note; + repo.remove(id()); } public Map<String, Object> getConfig() { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java new file mode 100644 index 0000000..db07e50 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteInfo.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook; + +import java.util.HashMap; +import java.util.Map; + +/** + * + */ +public class NoteInfo { + String id; + String name; + private Map<String, Object> config = new HashMap<String, Object>(); + + public NoteInfo(String id, String name, Map<String, Object> config) { + super(); + this.id = id; + this.name = name; + this.config = config; + } + + public NoteInfo(Note note) { + id = note.id(); + name = note.getName(); + config = note.getConfig(); + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Map<String, Object> getConfig() { + return config; + } + + public void setConfig(Map<String, Object> config) { + this.config = config; + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 844763f..1b29509 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.notebook; -import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -32,11 +31,9 @@ import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.interpreter.InterpreterFactory; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterSetting; -import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; @@ -64,11 +61,14 @@ public class Notebook { private StdSchedulerFactory quertzSchedFact; private org.quartz.Scheduler quartzSched; private JobListenerFactory jobListenerFactory; + private NotebookRepo notebookRepo; - public Notebook(ZeppelinConfiguration conf, SchedulerFactory schedulerFactory, + public Notebook(ZeppelinConfiguration conf, NotebookRepo notebookRepo, + SchedulerFactory schedulerFactory, InterpreterFactory replFactory, JobListenerFactory jobListenerFactory) throws IOException, SchedulerException { this.conf = conf; + this.notebookRepo = notebookRepo; this.schedulerFactory = schedulerFactory; this.replFactory = replFactory; this.jobListenerFactory = jobListenerFactory; @@ -102,7 +102,7 @@ public class Notebook { */ public Note createNote(List<String> interpreterIds) throws IOException { NoteInterpreterLoader intpLoader = new NoteInterpreterLoader(replFactory); - Note note = new Note(conf, intpLoader, jobListenerFactory, quartzSched); + Note note = new Note(notebookRepo, intpLoader, jobListenerFactory); intpLoader.setNoteId(note.id()); synchronized (notes) { notes.put(note.id(), note); @@ -111,6 +111,7 @@ public class Notebook { bindInterpretersToNote(note.id(), interpreterIds); } + note.persist(); return note; } @@ -159,79 +160,75 @@ public class Notebook { } } - private void loadAllNotes() throws IOException { - File notebookDir = new File(conf.getNotebookDir()); - File[] dirs = notebookDir.listFiles(); - if (dirs == null) { - return; + private Note loadNoteFromRepo(String id) { + Note note = null; + try { + note = notebookRepo.get(id); + } catch (IOException e) { + logger.error("Failed to load " + id, e); + } + if (note == null) { + return null; } - Map<String, SnapshotAngularObject> angularObjectSnapshot = - new HashMap<String, SnapshotAngularObject>(); + // set NoteInterpreterLoader + NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader( + replFactory); + note.setReplLoader(noteInterpreterLoader); + noteInterpreterLoader.setNoteId(note.id()); - for (File f : dirs) { - boolean isHidden = f.getName().startsWith("."); - if (f.isDirectory() && !isHidden) { - Scheduler scheduler = - schedulerFactory.createOrGetFIFOScheduler("note_" + System.currentTimeMillis()); - logger.info("Loading note from " + f.getName()); - NoteInterpreterLoader noteInterpreterLoader = new NoteInterpreterLoader(replFactory); - Note note = Note.load(f.getName(), - conf, - noteInterpreterLoader, - scheduler, - jobListenerFactory, quartzSched); - noteInterpreterLoader.setNoteId(note.id()); - - // restore angular object -------------- - Date lastUpdatedDate = new Date(0); - for (Paragraph p : note.getParagraphs()) { - if (p.getDateFinished() != null && - lastUpdatedDate.before(p.getDateFinished())) { - lastUpdatedDate = p.getDateFinished(); - } - } + // set JobListenerFactory + note.setJobListenerFactory(jobListenerFactory); - Map<String, List<AngularObject>> savedObjects = note.getAngularObjects(); - - if (savedObjects != null) { - for (String intpGroupName : savedObjects.keySet()) { - List<AngularObject> objectList = savedObjects.get(intpGroupName); - - for (AngularObject savedObject : objectList) { - SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName()); - if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) { - angularObjectSnapshot.put( - savedObject.getName(), - new SnapshotAngularObject( - intpGroupName, - savedObject, - lastUpdatedDate)); - } - } - } - } + // set notebookRepo + note.setNotebookRepo(notebookRepo); - synchronized (notes) { - notes.put(note.id(), note); - refreshCron(note.id()); - } + Map<String, SnapshotAngularObject> angularObjectSnapshot = + new HashMap<String, SnapshotAngularObject>(); + + // restore angular object -------------- + Date lastUpdatedDate = new Date(0); + for (Paragraph p : note.getParagraphs()) { + p.setNote(note); + if (p.getDateFinished() != null && + lastUpdatedDate.before(p.getDateFinished())) { + lastUpdatedDate = p.getDateFinished(); } } - for (String name : angularObjectSnapshot.keySet()) { - SnapshotAngularObject snapshot = angularObjectSnapshot.get(name); - List<InterpreterSetting> settings = replFactory.get(); - for (InterpreterSetting setting : settings) { - InterpreterGroup intpGroup = setting.getInterpreterGroup(); - if (intpGroup.getId().equals(snapshot.getIntpGroupId())) { - AngularObjectRegistry registry = intpGroup.getAngularObjectRegistry(); - if (registry.get(name) == null) { - registry.add(name, snapshot.getAngularObject().get(), false); + Map<String, List<AngularObject>> savedObjects = note.getAngularObjects(); + + if (savedObjects != null) { + for (String intpGroupName : savedObjects.keySet()) { + List<AngularObject> objectList = savedObjects.get(intpGroupName); + + for (AngularObject savedObject : objectList) { + SnapshotAngularObject snapshot = angularObjectSnapshot.get(savedObject.getName()); + if (snapshot == null || snapshot.getLastUpdate().before(lastUpdatedDate)) { + angularObjectSnapshot.put( + savedObject.getName(), + new SnapshotAngularObject( + intpGroupName, + savedObject, + lastUpdatedDate)); } } } } + + synchronized (notes) { + notes.put(note.id(), note); + refreshCron(note.id()); + } + return note; + } + + private void loadAllNotes() throws IOException { + List<NoteInfo> noteInfos = notebookRepo.list(); + + for (NoteInfo info : noteInfos) { + loadNoteFromRepo(info.getId()); + } } class SnapshotAngularObject { http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java new file mode 100644 index 0000000..07e0875 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/NotebookRepo.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import java.io.IOException; +import java.util.List; + +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; + +/** + * Notebook repository (persistence layer) abstraction + */ +public interface NotebookRepo { + public List<NoteInfo> list() throws IOException; + public Note get(String noteId) throws IOException; + public void save(Note note) throws IOException; + public void remove(String noteId) throws IOException; +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java new file mode 100644 index 0000000..3039f80 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.notebook.repo; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.vfs2.FileContent; +import org.apache.commons.vfs2.FileObject; +import org.apache.commons.vfs2.FileSystemManager; +import org.apache.commons.vfs2.FileType; +import org.apache.commons.vfs2.NameScope; +import org.apache.commons.vfs2.Selectors; +import org.apache.commons.vfs2.VFS; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.Note; +import org.apache.zeppelin.notebook.NoteInfo; +import org.apache.zeppelin.notebook.Paragraph; +import org.apache.zeppelin.scheduler.Job.Status; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** +* +*/ +public class VFSNotebookRepo implements NotebookRepo { + Logger logger = LoggerFactory.getLogger(VFSNotebookRepo.class); + + private FileSystemManager fsManager; + private URI filesystemRoot; + + private ZeppelinConfiguration conf; + + public VFSNotebookRepo(ZeppelinConfiguration conf) throws IOException { + this.conf = conf; + + try { + filesystemRoot = new URI(conf.getNotebookDir()); + } catch (URISyntaxException e1) { + throw new IOException(e1); + } + + if (filesystemRoot.getScheme() == null) { // it is local path + try { + this.filesystemRoot = new URI(new File( + conf.getRelativeDir(filesystemRoot.getPath())).getAbsolutePath()); + } catch (URISyntaxException e) { + throw new IOException(e); + } + } else { + this.filesystemRoot = filesystemRoot; + } + fsManager = VFS.getManager(); + } + + private String getPath(String path) { + if (path == null || path.trim().length() == 0) { + return filesystemRoot.toString(); + } + if (path.startsWith("/")) { + return filesystemRoot.toString() + path; + } else { + return filesystemRoot.toString() + "/" + path; + } + } + + private boolean isDirectory(FileObject fo) throws IOException { + if (fo == null) return false; + if (fo.getType() == FileType.FOLDER) { + return true; + } else { + return false; + } + } + + @Override + public List<NoteInfo> list() throws IOException { + FileObject rootDir = getRootDir(); + + FileObject[] children = rootDir.getChildren(); + + List<NoteInfo> infos = new LinkedList<NoteInfo>(); + for (FileObject f : children) { + String fileName = f.getName().getBaseName(); + if (f.isHidden() + || fileName.startsWith(".") + || fileName.startsWith("#") + || fileName.startsWith("~")) { + // skip hidden, temporary files + continue; + } + + if (!isDirectory(f)) { + // currently single note is saved like, [NOTE_ID]/note.json. + // so it must be a directory + continue; + } + + NoteInfo info = null; + + try { + info = getNoteInfo(f); + if (info != null) { + infos.add(info); + } + } catch (IOException e) { + logger.error("Can't read note " + f.getName().toString(), e); + } + } + + return infos; + } + + private Note getNote(FileObject noteDir) throws IOException { + if (!isDirectory(noteDir)) { + throw new IOException(noteDir.getName().toString() + " is not a directory"); + } + + FileObject noteJson = noteDir.resolveFile("note.json", NameScope.CHILD); + if (!noteJson.exists()) { + throw new IOException(noteJson.getName().toString() + " not found"); + } + + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + + FileContent content = noteJson.getContent(); + InputStream ins = content.getInputStream(); + String json = IOUtils.toString(ins, conf.getString(ConfVars.ZEPPELIN_ENCODING)); + ins.close(); + + Note note = gson.fromJson(json, Note.class); +// note.setReplLoader(replLoader); +// note.jobListenerFactory = jobListenerFactory; + + for (Paragraph p : note.getParagraphs()) { + if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { + p.setStatus(Status.ABORT); + } + } + + return note; + } + + private NoteInfo getNoteInfo(FileObject noteDir) throws IOException { + Note note = getNote(noteDir); + return new NoteInfo(note); + } + + @Override + public Note get(String noteId) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD); + + return getNote(noteDir); + } + + private FileObject getRootDir() throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + + if (!rootDir.exists()) { + throw new IOException("Root path does not exists"); + } + + if (!isDirectory(rootDir)) { + throw new IOException("Root path is not a directory"); + } + + return rootDir; + } + + @Override + public void save(Note note) throws IOException { + GsonBuilder gsonBuilder = new GsonBuilder(); + gsonBuilder.setPrettyPrinting(); + Gson gson = gsonBuilder.create(); + String json = gson.toJson(note); + + FileObject rootDir = getRootDir(); + + FileObject noteDir = rootDir.resolveFile(note.id(), NameScope.CHILD); + + if (!noteDir.exists()) { + noteDir.createFolder(); + } + if (!isDirectory(noteDir)) { + throw new IOException(noteDir.getName().toString() + " is not a directory"); + } + + FileObject noteJson = noteDir.resolveFile("note.json", NameScope.CHILD); + // false means not appending. creates file if not exists + OutputStream out = noteJson.getContent().getOutputStream(false); + out.write(json.getBytes(conf.getString(ConfVars.ZEPPELIN_ENCODING))); + out.close(); + } + + @Override + public void remove(String noteId) throws IOException { + FileObject rootDir = fsManager.resolveFile(getPath("/")); + FileObject noteDir = rootDir.resolveFile(noteId, NameScope.CHILD); + + if (!noteDir.exists()) { + // nothing to do + return; + } + + if (!isDirectory(noteDir)) { + // it is not look like zeppelin note savings + throw new IOException("Can not remove " + noteDir.getName().toString()); + } + + noteDir.delete(Selectors.SELECT_SELF_AND_CHILDREN); + } +} http://git-wip-us.apache.org/repos/asf/incubator-zeppelin/blob/4fa3db81/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java index 8d2c65a..0d4d111 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/notebook/NotebookTest.java @@ -32,6 +32,8 @@ import org.apache.zeppelin.interpreter.InterpreterFactory; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.mock.MockInterpreter1; import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.apache.zeppelin.notebook.repo.NotebookRepo; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.Job.Status; import org.apache.zeppelin.scheduler.JobListener; @@ -48,6 +50,7 @@ public class NotebookTest implements JobListenerFactory{ private SchedulerFactory schedulerFactory; private File notebookDir; private Notebook notebook; + private NotebookRepo notebookRepo; private InterpreterFactory factory; @Before @@ -71,7 +74,8 @@ public class NotebookTest implements JobListenerFactory{ factory = new InterpreterFactory(conf, new InterpreterOption(false), null); - notebook = new Notebook(conf, schedulerFactory, factory, this); + notebookRepo = new VFSNotebookRepo(conf); + notebook = new Notebook(conf, notebookRepo, schedulerFactory, factory, this); } @After @@ -108,7 +112,7 @@ public class NotebookTest implements JobListenerFactory{ p1.setText("hello world"); note.persist(); - Notebook notebook2 = new Notebook(conf, schedulerFactory, new InterpreterFactory(conf, null), this); + Notebook notebook2 = new Notebook(conf, notebookRepo, schedulerFactory, new InterpreterFactory(conf, null), this); assertEquals(1, notebook2.getAllNotes().size()); }
