http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java new file mode 100644 index 0000000..6759f97 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumApplicationFactory.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import com.google.gson.Gson; +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.InterpreterSetting; +import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.thrift.RemoteApplicationResult; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.notebook.*; +import org.apache.zeppelin.scheduler.ExecutorFactory; +import org.apache.zeppelin.scheduler.Job; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.ExecutorService; + +/** + * HeliumApplicationFactory + */ +public class HeliumApplicationFactory implements ApplicationEventListener, NotebookEventListener { + private final Logger logger = LoggerFactory.getLogger(HeliumApplicationFactory.class); + private final ExecutorService executor; + private final Gson gson = new Gson(); + private Notebook notebook; + private ApplicationEventListener applicationEventListener; + + public HeliumApplicationFactory() { + executor = ExecutorFactory.singleton().createOrGet( + HeliumApplicationFactory.class.getName(), 10); + } + + private boolean isRemote(InterpreterGroup group) { + return group.getAngularObjectRegistry() instanceof RemoteAngularObjectRegistry; + } + + + /** + * Load pkg and run task + */ + public String loadAndRun(HeliumPackage pkg, Paragraph paragraph) { + ApplicationState appState = paragraph.createOrGetApplicationState(pkg); + onLoad(paragraph.getNote().getId(), paragraph.getId(), appState.getId(), + appState.getHeliumPackage()); + executor.submit(new LoadApplication(appState, pkg, paragraph)); + return appState.getId(); + } + + /** + * Load application and run in the remote process + */ + private class LoadApplication implements Runnable { + private final HeliumPackage pkg; + private final Paragraph paragraph; + private final ApplicationState appState; + + public LoadApplication(ApplicationState appState, HeliumPackage pkg, Paragraph paragraph) { + this.appState = appState; + this.pkg = pkg; + this.paragraph = paragraph; + } + + @Override + public void run() { + try { + // get interpreter process + Interpreter intp = paragraph.getRepl(paragraph.getRequiredReplName()); + InterpreterGroup intpGroup = intp.getInterpreterGroup(); + RemoteInterpreterProcess intpProcess = intpGroup.getRemoteInterpreterProcess(); + if (intpProcess == null) { + throw new ApplicationException("Target interpreter process is not running"); + } + + // load application + load(intpProcess, appState); + + // run application + RunApplication runTask = new RunApplication(paragraph, appState.getId()); + runTask.run(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + + if (appState != null) { + appStatusChange(paragraph, appState.getId(), ApplicationState.Status.ERROR); + appState.setOutput(e.getMessage()); + } + } + } + + private void load(RemoteInterpreterProcess intpProcess, ApplicationState appState) + throws Exception { + + RemoteInterpreterService.Client client = null; + + synchronized (appState) { + if (appState.getStatus() == ApplicationState.Status.LOADED) { + // already loaded + return; + } + + try { + appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADING); + String pkgInfo = gson.toJson(pkg); + String appId = appState.getId(); + + client = intpProcess.getClient(); + RemoteApplicationResult ret = client.loadApplication( + appId, + pkgInfo, + paragraph.getNote().getId(), + paragraph.getId()); + + if (ret.isSuccess()) { + appStatusChange(paragraph, appState.getId(), ApplicationState.Status.LOADED); + } else { + throw new ApplicationException(ret.getMsg()); + } + } catch (TException e) { + intpProcess.releaseBrokenClient(client); + throw e; + } finally { + if (client != null) { + intpProcess.releaseClient(client); + } + } + } + } + } + + /** + * Get ApplicationState + * @param paragraph + * @param appId + * @return + */ + public ApplicationState get(Paragraph paragraph, String appId) { + return paragraph.getApplicationState(appId); + } + + /** + * Unload application + * It does not remove ApplicationState + * + * @param paragraph + * @param appId + */ + public void unload(Paragraph paragraph, String appId) { + executor.execute(new UnloadApplication(paragraph, appId)); + } + + /** + * Unload application task + */ + private class UnloadApplication implements Runnable { + private final Paragraph paragraph; + private final String appId; + + public UnloadApplication(Paragraph paragraph, String appId) { + this.paragraph = paragraph; + this.appId = appId; + } + + @Override + public void run() { + ApplicationState appState = null; + try { + appState = paragraph.getApplicationState(appId); + + if (appState == null) { + logger.warn("Can not find {} to unload from {}", appId, paragraph.getId()); + return; + } + if (appState.getStatus() == ApplicationState.Status.UNLOADED) { + // not loaded + return; + } + unload(appState); + } catch (Exception e) { + logger.error(e.getMessage(), e); + if (appState != null) { + appStatusChange(paragraph, appId, ApplicationState.Status.ERROR); + appState.setOutput(e.getMessage()); + } + } + } + + private void unload(ApplicationState appsToUnload) throws ApplicationException { + synchronized (appsToUnload) { + if (appsToUnload.getStatus() != ApplicationState.Status.LOADED) { + throw new ApplicationException( + "Can't unload application status " + appsToUnload.getStatus()); + } + appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADING); + Interpreter intp = paragraph.getCurrentRepl(); + if (intp == null) { + throw new ApplicationException("No interpreter found"); + } + + RemoteInterpreterProcess intpProcess = + intp.getInterpreterGroup().getRemoteInterpreterProcess(); + if (intpProcess == null) { + throw new ApplicationException("Target interpreter process is not running"); + } + + RemoteInterpreterService.Client client; + try { + client = intpProcess.getClient(); + } catch (Exception e) { + throw new ApplicationException(e); + } + + try { + RemoteApplicationResult ret = client.unloadApplication(appsToUnload.getId()); + + if (ret.isSuccess()) { + appStatusChange(paragraph, appsToUnload.getId(), ApplicationState.Status.UNLOADED); + } else { + throw new ApplicationException(ret.getMsg()); + } + } catch (TException e) { + intpProcess.releaseBrokenClient(client); + throw new ApplicationException(e); + } finally { + intpProcess.releaseClient(client); + } + } + } + } + + /** + * Run application + * It does not remove ApplicationState + * + * @param paragraph + * @param appId + */ + public void run(Paragraph paragraph, String appId) { + executor.execute(new RunApplication(paragraph, appId)); + } + + /** + * Run application task + */ + private class RunApplication implements Runnable { + private final Paragraph paragraph; + private final String appId; + + public RunApplication(Paragraph paragraph, String appId) { + this.paragraph = paragraph; + this.appId = appId; + } + + @Override + public void run() { + ApplicationState appState = null; + try { + appState = paragraph.getApplicationState(appId); + + if (appState == null) { + logger.warn("Can not find {} to unload from {}", appId, paragraph.getId()); + return; + } + + run(appState); + } catch (Exception e) { + logger.error(e.getMessage(), e); + if (appState != null) { + appStatusChange(paragraph, appId, ApplicationState.Status.UNLOADED); + appState.setOutput(e.getMessage()); + } + } + } + + private void run(ApplicationState app) throws ApplicationException { + synchronized (app) { + if (app.getStatus() != ApplicationState.Status.LOADED) { + throw new ApplicationException( + "Can't run application status " + app.getStatus()); + } + + Interpreter intp = paragraph.getCurrentRepl(); + if (intp == null) { + throw new ApplicationException("No interpreter found"); + } + + RemoteInterpreterProcess intpProcess = + intp.getInterpreterGroup().getRemoteInterpreterProcess(); + if (intpProcess == null) { + throw new ApplicationException("Target interpreter process is not running"); + } + RemoteInterpreterService.Client client = null; + try { + client = intpProcess.getClient(); + } catch (Exception e) { + throw new ApplicationException(e); + } + + try { + RemoteApplicationResult ret = client.runApplication(app.getId()); + + if (ret.isSuccess()) { + // success + } else { + throw new ApplicationException(ret.getMsg()); + } + } catch (TException e) { + intpProcess.releaseBrokenClient(client); + client = null; + throw new ApplicationException(e); + } finally { + if (client != null) { + intpProcess.releaseClient(client); + } + } + } + } + } + + @Override + public void onOutputAppend(String noteId, String paragraphId, String appId, String output) { + ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId); + + if (appToUpdate != null) { + appToUpdate.appendOutput(output); + } else { + logger.error("Can't find app {}", appId); + } + + if (applicationEventListener != null) { + applicationEventListener.onOutputAppend(noteId, paragraphId, appId, output); + } + } + + @Override + public void onOutputUpdated(String noteId, String paragraphId, String appId, String output) { + ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId); + + if (appToUpdate != null) { + appToUpdate.setOutput(output); + } else { + logger.error("Can't find app {}", appId); + } + + if (applicationEventListener != null) { + applicationEventListener.onOutputUpdated(noteId, paragraphId, appId, output); + } + } + + @Override + public void onLoad(String noteId, String paragraphId, String appId, HeliumPackage pkg) { + if (applicationEventListener != null) { + applicationEventListener.onLoad(noteId, paragraphId, appId, pkg); + } + } + + @Override + public void onStatusChange(String noteId, String paragraphId, String appId, String status) { + ApplicationState appToUpdate = getAppState(noteId, paragraphId, appId); + if (appToUpdate != null) { + appToUpdate.setStatus(ApplicationState.Status.valueOf(status)); + } + + if (applicationEventListener != null) { + applicationEventListener.onStatusChange(noteId, paragraphId, appId, status); + } + } + + private void appStatusChange(Paragraph paragraph, + String appId, + ApplicationState.Status status) { + ApplicationState app = paragraph.getApplicationState(appId); + app.setStatus(status); + onStatusChange(paragraph.getNote().getId(), paragraph.getId(), appId, status.toString()); + } + + private ApplicationState getAppState(String noteId, String paragraphId, String appId) { + if (notebook == null) { + return null; + } + + Note note = notebook.getNote(noteId); + if (note == null) { + logger.error("Can't get note {}", noteId); + return null; + } + Paragraph paragraph = note.getParagraph(paragraphId); + if (paragraph == null) { + logger.error("Can't get paragraph {}", paragraphId); + return null; + } + + ApplicationState appFound = paragraph.getApplicationState(appId); + + return appFound; + } + + public Notebook getNotebook() { + return notebook; + } + + public void setNotebook(Notebook notebook) { + this.notebook = notebook; + } + + public ApplicationEventListener getApplicationEventListener() { + return applicationEventListener; + } + + public void setApplicationEventListener(ApplicationEventListener applicationEventListener) { + this.applicationEventListener = applicationEventListener; + } + + @Override + public void onNoteRemove(Note note) { + } + + @Override + public void onNoteCreate(Note note) { + + } + + @Override + public void onUnbindInterpreter(Note note, InterpreterSetting setting) { + for (Paragraph p : note.getParagraphs()) { + Interpreter currentInterpreter = p.getCurrentRepl(); + List<InterpreterSetting.InterpreterInfo> infos = setting.getInterpreterInfos(); + for (InterpreterSetting.InterpreterInfo info : infos) { + if (info.getClassName().equals(currentInterpreter.getClassName())) { + onParagraphRemove(p); + break; + } + } + } + } + + @Override + public void onParagraphRemove(Paragraph paragraph) { + List<ApplicationState> appStates = paragraph.getAllApplicationStates(); + for (ApplicationState app : appStates) { + UnloadApplication unloadJob = new UnloadApplication(paragraph, app.getId()); + unloadJob.run(); + } + } + + @Override + public void onParagraphCreate(Paragraph p) { + + } + + @Override + public void onParagraphStatusChange(Paragraph p, Job.Status status) { + if (status == Job.Status.FINISHED) { + // refresh application + List<ApplicationState> appStates = p.getAllApplicationStates(); + + for (ApplicationState app : appStates) { + loadAndRun(app.getHeliumPackage(), p); + } + } + } +}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java new file mode 100644 index 0000000..2a93caa --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumConf.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import java.util.LinkedList; +import java.util.List; + +/** + * Helium config. This object will be persisted to conf/heliumc.conf + */ +public class HeliumConf { + List<HeliumRegistry> registry = new LinkedList<HeliumRegistry>(); + + public List<HeliumRegistry> getRegistry() { + return registry; + } + + public void setRegistry(List<HeliumRegistry> registry) { + this.registry = registry; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java new file mode 100644 index 0000000..ef28835 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumLocalRegistry.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import com.google.gson.Gson; +import com.google.gson.stream.JsonReader; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.net.URI; +import java.util.LinkedList; +import java.util.List; + +/** + * Simple Helium registry on local filesystem + */ +public class HeliumLocalRegistry extends HeliumRegistry { + Logger logger = LoggerFactory.getLogger(HeliumLocalRegistry.class); + + private final Gson gson; + + public HeliumLocalRegistry(String name, String uri) { + super(name, uri); + gson = new Gson(); + + } + + + @Override + public synchronized List<HeliumPackage> getAll() throws IOException { + List<HeliumPackage> result = new LinkedList<HeliumPackage>(); + + File file = new File(uri()); + File [] files = file.listFiles(); + if (files == null) { + return result; + } + + for (File f : files) { + if (f.getName().startsWith(".")) { + continue; + } + + HeliumPackage pkgInfo = readPackageInfo(f); + if (pkgInfo != null) { + result.add(pkgInfo); + } + } + return result; + } + + private HeliumPackage readPackageInfo(File f) { + try { + JsonReader reader = new JsonReader(new StringReader(FileUtils.readFileToString(f))); + reader.setLenient(true); + + return gson.fromJson(reader, HeliumPackage.class); + } catch (IOException e) { + logger.error(e.getMessage(), e); + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java new file mode 100644 index 0000000..57a9d45 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSearchResult.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +/** + * search result + */ +public class HeliumPackageSearchResult { + private final String registry; + private final HeliumPackage pkg; + + /** + * Create search result item + * @param registry registry name + * @param pkg package information + */ + public HeliumPackageSearchResult(String registry, HeliumPackage pkg) { + this.registry = registry; + this.pkg = pkg; + } + + public String getRegistry() { + return registry; + } + + public HeliumPackage getPkg() { + return pkg; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java new file mode 100644 index 0000000..45c1640 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumPackageSuggestion.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +/** + * Suggested apps + */ +public class HeliumPackageSuggestion { + private final List<HeliumPackageSearchResult> available = + new LinkedList<HeliumPackageSearchResult>(); + + /* + * possible future improvement + * provides n - 'favorite' list, based on occurrence of apps in notebook + */ + + public HeliumPackageSuggestion() { + + } + + public void addAvailablePackage(HeliumPackageSearchResult r) { + available.add(r); + + } + + public void sort() { + Collections.sort(available, new Comparator<HeliumPackageSearchResult>() { + @Override + public int compare(HeliumPackageSearchResult o1, HeliumPackageSearchResult o2) { + return o1.getPkg().getName().compareTo(o2.getPkg().getName()); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java new file mode 100644 index 0000000..125ad92 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistry.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import java.io.IOException; +import java.net.URI; +import java.util.List; + +/** + * Helium package registry + */ +public abstract class HeliumRegistry { + private final String name; + private final String uri; + + public HeliumRegistry(String name, String uri) { + this.name = name; + this.uri = uri; + } + public String name() { + return name; + } + public String uri() { + return uri; + } + public abstract List<HeliumPackage> getAll() throws IOException; +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java new file mode 100644 index 0000000..3abcb9f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/helium/HeliumRegistrySerializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import com.google.gson.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Type; +import java.net.URI; +import java.net.URISyntaxException; + +/** + * HeliumRegistrySerializer (and deserializer) for gson + */ +public class HeliumRegistrySerializer + implements JsonSerializer<HeliumRegistry>, JsonDeserializer<HeliumRegistry> { + Logger logger = LoggerFactory.getLogger(HeliumRegistrySerializer.class); + + @Override + public HeliumRegistry deserialize(JsonElement json, + Type type, + JsonDeserializationContext jsonDeserializationContext) + throws JsonParseException { + JsonObject jsonObject = json.getAsJsonObject(); + String className = jsonObject.get("class").getAsString(); + String uri = jsonObject.get("uri").getAsString(); + String name = jsonObject.get("name").getAsString(); + + try { + logger.info("Restore helium registry {} {} {}", name, className, uri); + Class<HeliumRegistry> cls = + (Class<HeliumRegistry>) getClass().getClassLoader().loadClass(className); + Constructor<HeliumRegistry> constructor = cls.getConstructor(String.class, String.class); + HeliumRegistry registry = constructor.newInstance(name, uri); + return registry; + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | + InstantiationException | InvocationTargetException e) { + logger.error(e.getMessage(), e); + return null; + } + } + + @Override + public JsonElement serialize(HeliumRegistry heliumRegistry, + Type type, + JsonSerializationContext jsonSerializationContext) { + JsonObject json = new JsonObject(); + json.addProperty("class", heliumRegistry.getClass().getName()); + json.addProperty("uri", heliumRegistry.uri()); + json.addProperty("name", heliumRegistry.name()); + return json; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index 5595c14..47a4325 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -30,7 +30,10 @@ import org.apache.zeppelin.dep.Dependency; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.dev.DevInterpreter; +import org.apache.zeppelin.interpreter.dev.ZeppelinDevServer; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; @@ -82,22 +85,29 @@ public class InterpreterFactory implements InterpreterGroupFactory { private AngularObjectRegistryListener angularObjectRegistryListener; private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private final ApplicationEventListener appEventListener; private DependencyResolver depResolver; + private Map<String, String> env = new HashMap<String, String>(); + + private Interpreter devInterpreter; + public InterpreterFactory(ZeppelinConfiguration conf, AngularObjectRegistryListener angularObjectRegistryListener, RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appEventListener, DependencyResolver depResolver) throws InterpreterException, IOException, RepositoryException { this(conf, new InterpreterOption(true), angularObjectRegistryListener, - remoteInterpreterProcessListener, depResolver); + remoteInterpreterProcessListener, appEventListener, depResolver); } public InterpreterFactory(ZeppelinConfiguration conf, InterpreterOption defaultOption, AngularObjectRegistryListener angularObjectRegistryListener, RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appEventListener, DependencyResolver depResolver) throws InterpreterException, IOException, RepositoryException { this.conf = conf; @@ -106,6 +116,7 @@ public class InterpreterFactory implements InterpreterGroupFactory { this.depResolver = depResolver; this.interpreterRepositories = depResolver.getRepos(); this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.appEventListener = appEventListener; String replsConf = conf.getString(ConfVars.ZEPPELIN_INTERPRETERS); interpreterClassList = replsConf.split(","); String groupOrder = conf.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER); @@ -547,8 +558,18 @@ public class InterpreterFactory implements InterpreterGroupFactory { Interpreter intp; if (option.isRemote()) { - intp = createRemoteRepl(info.getPath(), key, info.getClassName(), properties, - interpreterSetting.id()); + if (option.isConnectExistingProcess()) { + intp = connectToRemoteRepl( + noteId, + info.getClassName(), + option.getHost(), option.getPort(), properties); + } else { + intp = createRemoteRepl(info.getPath(), + key, + info.getClassName(), + properties, + interpreterSetting.id()); + } } else { intp = createRepl(info.getPath(), info.getClassName(), properties); } @@ -850,6 +871,26 @@ public class InterpreterFactory implements InterpreterGroupFactory { } } + private Interpreter connectToRemoteRepl(String noteId, + String className, + String host, + int port, + Properties property) { + int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + int maxPoolSize = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE); + LazyOpenInterpreter intp = new LazyOpenInterpreter( + new RemoteInterpreter( + property, + noteId, + className, + host, + port, + connectTimeout, + maxPoolSize, + remoteInterpreterProcessListener, + appEventListener)); + return intp; + } private Interpreter createRemoteRepl(String interpreterPath, String noteId, String className, Properties property, String interpreterSettingId) { @@ -859,10 +900,14 @@ public class InterpreterFactory implements InterpreterGroupFactory { updatePropertiesFromRegisteredInterpreter(property, className); - LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter(property, noteId, - className, conf.getInterpreterRemoteRunnerPath(), interpreterPath, localRepoPath, - connectTimeout, maxPoolSize, remoteInterpreterProcessListener)); - return intp; + + RemoteInterpreter remoteInterpreter = new RemoteInterpreter( + property, noteId, className, conf.getInterpreterRemoteRunnerPath(), + interpreterPath, localRepoPath, connectTimeout, + maxPoolSize, remoteInterpreterProcessListener, appEventListener); + remoteInterpreter.setEnv(env); + + return new LazyOpenInterpreter(remoteInterpreter); } private Properties updatePropertiesFromRegisteredInterpreter(Properties properties, @@ -1037,6 +1082,11 @@ public class InterpreterFactory implements InterpreterGroupFactory { } } + // dev interpreter + if (DevInterpreter.isInterpreterName(replName)) { + return getDevInterpreter(); + } + return null; } @@ -1073,4 +1123,34 @@ public class InterpreterFactory implements InterpreterGroupFactory { depResolver.delRepo(id); saveToFile(); } + + public Map<String, String> getEnv() { + return env; + } + + public void setEnv(Map<String, String> env) { + this.env = env; + } + + + public Interpreter getDevInterpreter() { + if (devInterpreter == null) { + InterpreterOption option = new InterpreterOption(); + option.setRemote(true); + + InterpreterGroup interpreterGroup = createInterpreterGroup("dev", option); + + devInterpreter = connectToRemoteRepl("dev", DevInterpreter.class.getName(), + "localhost", + ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, + new Properties()); + + LinkedList<Interpreter> intpList = new LinkedList<Interpreter>(); + intpList.add(devInterpreter); + interpreterGroup.put("dev", intpList); + + devInterpreter.setInterpreterGroup(interpreterGroup); + } + return devInterpreter; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java index f9e43ab..7aac781 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java @@ -22,16 +22,13 @@ package org.apache.zeppelin.interpreter; */ public class InterpreterOption { boolean remote; + String host = null; + int port = -1; boolean perNoteSession; boolean perNoteProcess; boolean isExistingProcess; - String host; - String port; - - - public boolean isExistingProcess() { return isExistingProcess; @@ -41,18 +38,10 @@ public class InterpreterOption { this.isExistingProcess = isExistingProcess; } - public String getPort() { - return port; - } - - public void setPort(String port) { + public void setPort(int port) { this.port = port; } - public String getHost() { - return host; - } - public void setHost(String host) { this.host = host; } @@ -82,6 +71,18 @@ public class InterpreterOption { this.perNoteSession = perNoteSession; } + public boolean isConnectExistingProcess() { + return (host != null && port != -1); + } + + public String getHost() { + return host; + } + + public int getPort() { + return port; + } + public boolean isPerNoteProcess() { return perNoteProcess; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java new file mode 100644 index 0000000..1505db9 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/ApplicationState.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.helium.HeliumPackage; +import org.apache.zeppelin.interpreter.InterpreterGroup; + +/** + * Current state of application + */ +public class ApplicationState { + + /** + * Status of Application + */ + public static enum Status { + LOADING, + LOADED, + UNLOADING, + UNLOADED, + ERROR + }; + + Status status = Status.UNLOADED; + + String id; // unique id for this instance. Similar to note id or paragraph id + HeliumPackage pkg; + String output; + + public ApplicationState(String id, HeliumPackage pkg) { + this.id = id; + this.pkg = pkg; + } + + /** + * After ApplicationState is restored from NotebookRepo, + * such as after Zeppelin daemon starts or Notebook import, + * Application status need to be reset. + */ + public void resetStatus() { + if (status != Status.ERROR) { + status = Status.UNLOADED; + } + } + + + @Override + public boolean equals(Object o) { + String compareName; + if (o instanceof ApplicationState) { + return pkg.equals(((ApplicationState) o).getHeliumPackage()); + } else if (o instanceof HeliumPackage) { + return pkg.equals((HeliumPackage) o); + } else { + return false; + } + } + + @Override + public int hashCode() { + return pkg.hashCode(); + } + + public String getId() { + return id; + } + + public void setStatus(Status status) { + this.status = status; + } + + public Status getStatus() { + return status; + } + + public String getOutput() { + return output; + } + + public void setOutput(String output) { + this.output = output; + } + + public synchronized void appendOutput(String output) { + if (this.output == null) { + this.output = output; + } else { + this.output += output; + } + } + + public HeliumPackage getHeliumPackage() { + return pkg; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java index e57ed9b..0a8a45d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Note.java @@ -30,6 +30,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.Input; +import org.apache.zeppelin.helium.HeliumApplicationFactory; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -51,7 +52,7 @@ import org.slf4j.LoggerFactory; /** * Binded interpreters for a note */ -public class Note implements Serializable, JobListener { +public class Note implements Serializable, ParagraphJobListener { static Logger logger = LoggerFactory.getLogger(Note.class); private static final long serialVersionUID = 7920699076577612429L; @@ -78,6 +79,7 @@ public class Note implements Serializable, JobListener { private transient NotebookRepo repo; private transient SearchService index; private transient ScheduledFuture delayedPersist; + private transient NoteEventListener noteEventListener; private transient Credentials credentials; /** @@ -98,11 +100,13 @@ public class Note implements Serializable, JobListener { public Note() {} public Note(NotebookRepo repo, InterpreterFactory factory, - JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials) { + JobListenerFactory jlFactory, SearchService noteIndex, Credentials credentials, + NoteEventListener noteEventListener) { this.repo = repo; this.factory = factory; this.jobListenerFactory = jlFactory; this.index = noteIndex; + this.noteEventListener = noteEventListener; this.credentials = credentials; generateId(); } @@ -202,6 +206,9 @@ public class Note implements Serializable, JobListener { synchronized (paragraphs) { paragraphs.add(p); } + if (noteEventListener != null) { + noteEventListener.onParagraphCreate(p); + } return p; } @@ -239,6 +246,9 @@ public class Note implements Serializable, JobListener { synchronized (paragraphs) { paragraphs.add(newParagraph); } + if (noteEventListener != null) { + noteEventListener.onParagraphCreate(newParagraph); + } } /** @@ -252,6 +262,9 @@ public class Note implements Serializable, JobListener { synchronized (paragraphs) { paragraphs.add(index, p); } + if (noteEventListener != null) { + noteEventListener.onParagraphCreate(p); + } return p; } @@ -287,12 +300,14 @@ public class Note implements Serializable, JobListener { if (p.getId().equals(paragraphId)) { index.deleteIndexDoc(this, p); i.remove(); + + if (noteEventListener != null) { + noteEventListener.onParagraphRemove(p); + } return p; } } } - - return null; } @@ -431,9 +446,11 @@ public class Note implements Serializable, JobListener { AuthenticationInfo authenticationInfo = new AuthenticationInfo(); authenticationInfo.setUser(cronExecutingUser); p.setAuthenticationInfo(authenticationInfo); + p.setInterpreterFactory(factory); p.setListener(jobListenerFactory.getParagraphJobListener(this)); Interpreter intp = factory.getInterpreter(getId(), p.getRequiredReplName()); + intp.getScheduler().submit(p); } } @@ -450,6 +467,7 @@ public class Note implements Serializable, JobListener { p.setListener(jobListenerFactory.getParagraphJobListener(this)); String requiredReplName = p.getRequiredReplName(); Interpreter intp = factory.getInterpreter(getId(), requiredReplName); + if (intp == null) { // TODO(jongyoul): Make "%jdbc" configurable from JdbcInterpreter if (conf.getUseJdbcAlias() && null != (intp = factory.getInterpreter(getId(), "jdbc"))) { @@ -526,8 +544,25 @@ public class Note implements Serializable, JobListener { if (registry instanceof RemoteAngularObjectRegistry) { // remove paragraph scope object ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, paragraphId); + + // remove app scope object + List<ApplicationState> appStates = getParagraph(paragraphId).getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess( + id, app.getId()); + } + } } else { registry.removeAll(id, paragraphId); + + // remove app scope object + List<ApplicationState> appStates = getParagraph(paragraphId).getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + registry.removeAll(id, app.getId()); + } + } } } } @@ -624,12 +659,67 @@ public class Note implements Serializable, JobListener { @Override public void beforeStatusChange(Job job, Status before, Status after) { + if (jobListenerFactory != null) { + ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this); + if (listener != null) { + listener.beforeStatusChange(job, before, after); + } + } } @Override public void afterStatusChange(Job job, Status before, Status after) { + if (jobListenerFactory != null) { + ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this); + if (listener != null) { + listener.afterStatusChange(job, before, after); + } + } + + if (noteEventListener != null) { + noteEventListener.onParagraphStatusChange((Paragraph) job, after); + } } @Override - public void onProgressUpdate(Job job, int progress) {} + public void onProgressUpdate(Job job, int progress) { + if (jobListenerFactory != null) { + ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this); + if (listener != null) { + listener.onProgressUpdate(job, progress); + } + } + } + + + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + if (jobListenerFactory != null) { + ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this); + if (listener != null) { + listener.onOutputAppend(paragraph, out, output); + } + } + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + if (jobListenerFactory != null) { + ParagraphJobListener listener = jobListenerFactory.getParagraphJobListener(this); + if (listener != null) { + listener.onOutputUpdate(paragraph, out, output); + } + } + } + + + + public NoteEventListener getNoteEventListener() { + return noteEventListener; + } + + public void setNoteEventListener(NoteEventListener noteEventListener) { + this.noteEventListener = noteEventListener; + } + } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java new file mode 100644 index 0000000..5f98f70 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NoteEventListener.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.scheduler.Job; + +/** + * NoteEventListener + */ +public interface NoteEventListener { + public void onParagraphRemove(Paragraph p); + public void onParagraphCreate(Paragraph p); + public void onParagraphStatusChange(Paragraph p, Job.Status status); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java index 3243ba7..ab2ce5d 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Notebook.java @@ -41,6 +41,7 @@ import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.notebook.repo.NotebookRepo; import org.apache.zeppelin.notebook.repo.NotebookRepoSync; import org.apache.zeppelin.resource.ResourcePoolUtils; +import org.apache.zeppelin.scheduler.Job; import org.apache.zeppelin.scheduler.SchedulerFactory; import org.apache.zeppelin.search.SearchService; import org.apache.zeppelin.user.AuthenticationInfo; @@ -64,7 +65,7 @@ import com.google.gson.stream.JsonReader; /** * Collection of Notes. */ -public class Notebook { +public class Notebook implements NoteEventListener { static Logger logger = LoggerFactory.getLogger(Notebook.class); @SuppressWarnings("unused") @Deprecated //TODO(bzz): remove unused @@ -80,6 +81,8 @@ public class Notebook { private NotebookRepo notebookRepo; private SearchService notebookIndex; private NotebookAuthorization notebookAuthorization; + private final List<NotebookEventListener> notebookEventListeners = + Collections.synchronizedList(new LinkedList<NotebookEventListener>()); private Credentials credentials; /** @@ -151,7 +154,13 @@ public class Notebook { */ public Note createNote(List<String> interpreterIds, AuthenticationInfo subject) throws IOException { - Note note = new Note(notebookRepo, replFactory, jobListenerFactory, notebookIndex, credentials); + Note note = new Note( + notebookRepo, + replFactory, + jobListenerFactory, + notebookIndex, + credentials, + this); synchronized (notes) { notes.put(note.id(), note); } @@ -162,6 +171,7 @@ public class Notebook { notebookIndex.addIndexDoc(note); note.persist(subject); + fireNoteCreateEvent(note); return note; } @@ -216,7 +226,7 @@ public class Notebook { logger.error(e.toString(), e); throw e; } - + return newNote; } @@ -256,6 +266,13 @@ public class Notebook { List<String> interpreterSettingIds) throws IOException { Note note = getNote(id); if (note != null) { + List<InterpreterSetting> currentBindings = replFactory.getInterpreterSettings(id); + for (InterpreterSetting setting : currentBindings) { + if (!interpreterSettingIds.contains(setting.id())) { + fireUnbindInterpreter(note, setting); + } + } + replFactory.setInterpreters(note.getId(), interpreterSettingIds); // comment out while note.getNoteReplLoader().setInterpreters(...) do the same // replFactory.putNoteInterpreterSettingBinding(id, interpreterSettingIds); @@ -303,6 +320,15 @@ public class Notebook { // remove paragraph scope object for (Paragraph p : note.getParagraphs()) { ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, p.getId()); + + // remove app scope object + List<ApplicationState> appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess( + id, app.getId()); + } + } } // remove notebook scope object ((RemoteAngularObjectRegistry) registry).removeAllAndNotifyRemoteProcess(id, null); @@ -310,6 +336,14 @@ public class Notebook { // remove paragraph scope object for (Paragraph p : note.getParagraphs()) { registry.removeAll(id, p.getId()); + + // remove app scope object + List<ApplicationState> appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + registry.removeAll(id, app.getId()); + } + } } // remove notebook scope object registry.removeAll(id, null); @@ -318,6 +352,8 @@ public class Notebook { ResourcePoolUtils.removeResourcesBelongsToNote(id); + fireNoteRemoveEvent(note); + try { note.unpersist(subject); } catch (IOException e) { @@ -379,6 +415,8 @@ public class Notebook { } } + note.setNoteEventListener(this); + synchronized (notes) { notes.put(note.id(), note); refreshCron(note.id()); @@ -402,6 +440,7 @@ public class Notebook { } } } + return note; } @@ -737,4 +776,46 @@ public class Notebook { this.notebookIndex.close(); } + public void addNotebookEventListener(NotebookEventListener listener) { + notebookEventListeners.add(listener); + } + + private void fireNoteCreateEvent(Note note) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onNoteCreate(note); + } + } + + private void fireNoteRemoveEvent(Note note) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onNoteRemove(note); + } + } + + private void fireUnbindInterpreter(Note note, InterpreterSetting setting) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onUnbindInterpreter(note, setting); + } + } + + @Override + public void onParagraphRemove(Paragraph p) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onParagraphRemove(p); + } + } + + @Override + public void onParagraphCreate(Paragraph p) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onParagraphCreate(p); + } + } + + @Override + public void onParagraphStatusChange(Paragraph p, Job.Status status) { + for (NotebookEventListener listener : notebookEventListeners) { + listener.onParagraphStatusChange(p, status); + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java new file mode 100644 index 0000000..904eba0 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/NotebookEventListener.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.notebook; + +import org.apache.zeppelin.interpreter.InterpreterSetting; + +/** + * Notebook event + */ +public interface NotebookEventListener extends NoteEventListener { + public void onNoteRemove(Note note); + public void onNoteCreate(Note note); + + public void onUnbindInterpreter(Note note, InterpreterSetting setting); +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java index df4765d..d1a7824 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/Paragraph.java @@ -19,6 +19,7 @@ package org.apache.zeppelin.notebook; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.helium.HeliumPackage; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.user.AuthenticationInfo; import org.apache.zeppelin.user.Credentials; @@ -61,6 +62,11 @@ public class Paragraph extends Job implements Serializable, Cloneable { private Map<String, Object> config; // paragraph configs like isOpen, colWidth, etc public final GUI settings; // form and parameter settings + /** + * Applicaiton states in this paragraph + */ + private final List<ApplicationState> apps = new LinkedList<ApplicationState>(); + @VisibleForTesting Paragraph() { super(generateId(), null); @@ -230,7 +236,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { String replName = getRequiredReplName(); Interpreter repl = getRepl(replName); if (repl != null) { - return repl.getProgress(getInterpreterContext()); + return repl.getProgress(getInterpreterContext(null)); } else { return 0; } @@ -283,7 +289,6 @@ public class Paragraph extends Job implements Serializable, Cloneable { context.out.flush(); InterpreterResult.Type outputType = context.out.getType(); byte[] interpreterOutput = context.out.toByteArray(); - context.out.clear(); if (interpreterOutput != null && interpreterOutput.length > 0) { message = new String(interpreterOutput); @@ -323,12 +328,44 @@ public class Paragraph extends Job implements Serializable, Cloneable { if (job != null) { job.setStatus(Status.ABORT); } else { - repl.cancel(getInterpreterContext()); + repl.cancel(getInterpreterContext(null)); } return true; } private InterpreterContext getInterpreterContext() { + final Paragraph self = this; + + return getInterpreterContext(new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + updateParagraphResult(out); + ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, + new String(output)); + } + + private void updateParagraphResult(InterpreterOutput out) { + // update paragraph result + Throwable t = null; + String message = null; + try { + message = new String(out.toByteArray()); + } catch (IOException e) { + logger().error(e.getMessage(), e); + t = e; + } + setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); + } + })); + } + + private InterpreterContext getInterpreterContext(InterpreterOutput output) { AngularObjectRegistry registry = null; ResourcePool resourcePool = null; @@ -363,33 +400,7 @@ public class Paragraph extends Job implements Serializable, Cloneable { registry, resourcePool, runners, - new InterpreterOutput(new InterpreterOutputListener() { - @Override - public void onAppend(InterpreterOutput out, byte[] line) { - updateParagraphResult(out); - ((ParagraphJobListener) getListener()).onOutputAppend(self, out, new String(line)); - } - - @Override - public void onUpdate(InterpreterOutput out, byte[] output) { - updateParagraphResult(out); - ((ParagraphJobListener) getListener()).onOutputUpdate(self, out, - new String(output)); - } - - private void updateParagraphResult(InterpreterOutput out) { - // update paragraph result - Throwable t = null; - String message = null; - try { - message = new String(out.toByteArray()); - } catch (IOException e) { - logger().error(e.getMessage(), e); - t = e; - } - setReturn(new InterpreterResult(Code.SUCCESS, out.getType(), message), t); - } - })); + output); return interpreterContext; } @@ -433,6 +444,44 @@ public class Paragraph extends Job implements Serializable, Cloneable { return paraClone; } + private String getApplicationId(HeliumPackage pkg) { + return "app_" + getNote().getId() + "-" + getId() + pkg.getName().replaceAll("\\.", "_"); + } + + public ApplicationState createOrGetApplicationState(HeliumPackage pkg) { + synchronized (apps) { + for (ApplicationState as : apps) { + if (as.equals(pkg)) { + return as; + } + } + + String appId = getApplicationId(pkg); + ApplicationState appState = new ApplicationState(appId, pkg); + apps.add(appState); + return appState; + } + } + + + public ApplicationState getApplicationState(String appId) { + synchronized (apps) { + for (ApplicationState as : apps) { + if (as.getId().equals(appId)) { + return as; + } + } + } + + return null; + } + + public List<ApplicationState> getAllApplicationStates() { + synchronized (apps) { + return new LinkedList<ApplicationState>(apps); + } + } + String extractVariablesFromAngularRegistry(String scriptBody, Map<String, Input> inputs, AngularObjectRegistry angularRegistry) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java index 152e087..a74e6c7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/repo/VFSNotebookRepo.java @@ -36,6 +36,7 @@ import org.apache.commons.vfs2.Selectors; import org.apache.commons.vfs2.VFS; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.notebook.ApplicationState; import org.apache.zeppelin.notebook.Note; import org.apache.zeppelin.notebook.NoteInfo; import org.apache.zeppelin.notebook.Paragraph; @@ -172,6 +173,15 @@ public class VFSNotebookRepo implements NotebookRepo { if (p.getStatus() == Status.PENDING || p.getStatus() == Status.RUNNING) { p.setStatus(Status.ABORT); } + + List<ApplicationState> appStates = p.getAllApplicationStates(); + if (appStates != null) { + for (ApplicationState app : appStates) { + if (app.getStatus() != ApplicationState.Status.ERROR) { + app.setStatus(ApplicationState.Status.UNLOADED); + } + } + } } return note; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java index 320709e..08b3235 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/notebook/socket/Message.java @@ -111,9 +111,15 @@ public class Message { CONFIGURATIONS_INFO, // [s-c] all key/value pairs of configurations // @param settings serialized Map<String, String> object - CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository - // @param noteId - // @param checkpointName + CHECKPOINT_NOTEBOOK, // [c-s] checkpoint notebook to storage repository + // @param noteId + // @param checkpointName + + APP_APPEND_OUTPUT, // [s-c] append output + APP_UPDATE_OUTPUT, // [s-c] update (replace) output + APP_LOAD, // [s-c] on app load + APP_STATUS_CHANGE, // [s-c] on app status change + LIST_NOTEBOOK_JOBS, // [c-s] get notebook job management infomations LIST_UPDATE_NOTEBOOK_JOBS // [c-s] get job management informations for until unixtime // @param unixTime http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java new file mode 100644 index 0000000..602f384 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumApplicationFactoryTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.mock.MockInterpreter1; +import org.apache.zeppelin.interpreter.mock.MockInterpreter2; +import org.apache.zeppelin.notebook.*; +import org.apache.zeppelin.notebook.repo.VFSNotebookRepo; +import org.apache.zeppelin.scheduler.ExecutorFactory; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.apache.zeppelin.search.SearchService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +public class HeliumApplicationFactoryTest implements JobListenerFactory { + private File tmpDir; + private File notebookDir; + private ZeppelinConfiguration conf; + private SchedulerFactory schedulerFactory; + private DependencyResolver depResolver; + private InterpreterFactory factory; + private VFSNotebookRepo notebookRepo; + private Notebook notebook; + private HeliumApplicationFactory heliumAppFactory; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir")+"/ZeppelinLTest_"+System.currentTimeMillis()); + tmpDir.mkdirs(); + File confDir = new File(tmpDir, "conf"); + confDir.mkdirs(); + notebookDir = new File(tmpDir + "/notebook"); + notebookDir.mkdirs(); + + File home = new File(getClass().getClassLoader().getResource("note").getFile()) // zeppelin/zeppelin-zengine/target/test-classes/note + .getParentFile() // zeppelin/zeppelin-zengine/target/test-classes + .getParentFile() // zeppelin/zeppelin-zengine/target + .getParentFile() // zeppelin/zeppelin-zengine + .getParentFile(); // zeppelin + + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), home.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), tmpDir.getAbsolutePath() + "/conf"); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETERS.getVarName(), "org.apache.zeppelin.interpreter.mock.MockInterpreter1,org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + + conf = new ZeppelinConfiguration(); + + this.schedulerFactory = new SchedulerFactory(); + + MockInterpreter1.register("mock1", "org.apache.zeppelin.interpreter.mock.MockInterpreter1"); + MockInterpreter2.register("mock2", "org.apache.zeppelin.interpreter.mock.MockInterpreter2"); + + + heliumAppFactory = new HeliumApplicationFactory(); + depResolver = new DependencyResolver(tmpDir.getAbsolutePath() + "/local-repo"); + factory = new InterpreterFactory(conf, + new InterpreterOption(true), null, null, heliumAppFactory, depResolver); + HashMap<String, String> env = new HashMap<String, String>(); + env.put("ZEPPELIN_CLASSPATH", new File("./target/test-classes").getAbsolutePath()); + factory.setEnv(env); + + SearchService search = mock(SearchService.class); + notebookRepo = new VFSNotebookRepo(conf); + NotebookAuthorization notebookAuthorization = new NotebookAuthorization(conf); + notebook = new Notebook( + conf, + notebookRepo, + schedulerFactory, + factory, + this, + search, + notebookAuthorization, + null); + + heliumAppFactory.setNotebook(notebook); + + notebook.addNotebookEventListener(heliumAppFactory); + } + + @After + public void tearDown() throws Exception { + List<InterpreterSetting> settings = factory.get(); + for (InterpreterSetting setting : settings) { + for (InterpreterGroup intpGroup : setting.getAllInterpreterGroups()) { + intpGroup.close(); + } + } + + FileUtils.deleteDirectory(tmpDir); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), + ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getStringValue()); + } + + + @Test + public void testLoadRunUnloadApplication() + throws IOException, ApplicationException, InterruptedException { + // given + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "", + HeliumTestApplication.class.getName(), + new String[][]{}); + + Note note1 = notebook.createNote(null); + factory.setInterpreters(note1.getId(),factory.getDefaultInterpreterSettingList()); + + Paragraph p1 = note1.addParagraph(); + + // make sure interpreter process running + p1.setText("%mock1 job"); + note1.run(p1.getId()); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + + assertEquals("repl1: job", p1.getResult().message()); + + // when + assertEquals(0, p1.getAllApplicationStates().size()); + String appId = heliumAppFactory.loadAndRun(pkg1, p1); + assertEquals(1, p1.getAllApplicationStates().size()); + ApplicationState app = p1.getApplicationState(appId); + Thread.sleep(500); // wait for enough time + + // then + assertEquals("Hello world 1", app.getOutput()); + + // when + heliumAppFactory.run(p1, appId); + Thread.sleep(500); // wait for enough time + + // then + assertEquals("Hello world 2", app.getOutput()); + + // clean + heliumAppFactory.unload(p1, appId); + notebook.removeNote(note1.getId(), null); + } + + @Test + public void testUnloadOnParagraphRemove() throws IOException { + // given + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "", + HeliumTestApplication.class.getName(), + new String[][]{}); + + Note note1 = notebook.createNote(null); + factory.setInterpreters(note1.id(), factory.getDefaultInterpreterSettingList()); + + Paragraph p1 = note1.addParagraph(); + + // make sure interpreter process running + p1.setText("%mock1 job"); + note1.run(p1.getId()); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + + assertEquals(0, p1.getAllApplicationStates().size()); + String appId = heliumAppFactory.loadAndRun(pkg1, p1); + ApplicationState app = p1.getApplicationState(appId); + while (app.getStatus() != ApplicationState.Status.LOADED) { + Thread.yield(); + } + + // when remove paragraph + note1.removeParagraph(p1.getId()); + + // then + assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); + + // clean + notebook.removeNote(note1.getId(), null); + } + + + @Test + public void testUnloadOnInterpreterUnbind() throws IOException { + // given + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "", + HeliumTestApplication.class.getName(), + new String[][]{}); + + Note note1 = notebook.createNote(null); + notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList()); + + Paragraph p1 = note1.addParagraph(); + + // make sure interpreter process running + p1.setText("%mock1 job"); + note1.run(p1.getId()); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + + assertEquals(0, p1.getAllApplicationStates().size()); + String appId = heliumAppFactory.loadAndRun(pkg1, p1); + ApplicationState app = p1.getApplicationState(appId); + while (app.getStatus() != ApplicationState.Status.LOADED) { + Thread.yield(); + } + + // when unbind interpreter + notebook.bindInterpretersToNote(note1.id(), new LinkedList<String>()); + + // then + assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); + + // clean + notebook.removeNote(note1.getId(), null); + } + + + @Test + public void testUnloadOnInterpreterRestart() throws IOException { + // given + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "", + HeliumTestApplication.class.getName(), + new String[][]{}); + + Note note1 = notebook.createNote(null); + notebook.bindInterpretersToNote(note1.id(), factory.getDefaultInterpreterSettingList()); + String mock1IntpSettingId = null; + for (InterpreterSetting setting : notebook.getBindedInterpreterSettings(note1.id())) { + if (setting.getName().equals("mock1")) { + mock1IntpSettingId = setting.id(); + break; + } + } + + Paragraph p1 = note1.addParagraph(); + + // make sure interpreter process running + p1.setText("%mock1 job"); + note1.run(p1.getId()); + while(p1.isTerminated()==false || p1.getResult()==null) Thread.yield(); + assertEquals(0, p1.getAllApplicationStates().size()); + String appId = heliumAppFactory.loadAndRun(pkg1, p1); + ApplicationState app = p1.getApplicationState(appId); + while (app.getStatus() != ApplicationState.Status.LOADED) { + Thread.yield(); + } + // wait until application is executed + while (!"Hello world 1".equals(app.getOutput())) { + Thread.yield(); + } + // when restart interpreter + factory.restart(mock1IntpSettingId); + while (app.getStatus() == ApplicationState.Status.LOADED) { + Thread.yield(); + } + // then + assertEquals(ApplicationState.Status.UNLOADED, app.getStatus()); + + // clean + notebook.removeNote(note1.getId(), null); + } + + @Override + public ParagraphJobListener getParagraphJobListener(Note note) { + return new ParagraphJobListener() { + @Override + public void onOutputAppend(Paragraph paragraph, InterpreterOutput out, String output) { + } + + @Override + public void onOutputUpdate(Paragraph paragraph, InterpreterOutput out, String output) { + + } + + @Override + public void onProgressUpdate(Job job, int progress) { + + } + + @Override + public void beforeStatusChange(Job job, Job.Status before, Job.Status after) { + + } + + @Override + public void afterStatusChange(Job job, Job.Status before, Job.Status after) { + + } + }; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java new file mode 100644 index 0000000..e90cbc3 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumLocalRegistryTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import com.google.gson.Gson; +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +public class HeliumLocalRegistryTest { + private File tmpDir; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + tmpDir.mkdirs(); + } + + @After + public void tearDown() throws IOException { + FileUtils.deleteDirectory(tmpDir); + } + + @Test + public void testGetAllPackage() throws IOException { + // given + File r1Path = new File(tmpDir, "r1"); + HeliumLocalRegistry r1 = new HeliumLocalRegistry("r1", r1Path.getAbsolutePath()); + assertEquals(0, r1.getAll().size()); + + // when + Gson gson = new Gson(); + HeliumPackage pkg1 = new HeliumPackage(HeliumPackage.Type.APPLICATION, + "app1", + "desc1", + "artifact1", + "classname1", + new String[][]{}); + FileUtils.writeStringToFile(new File(r1Path, "pkg1.json"), gson.toJson(pkg1)); + + // then + assertEquals(1, r1.getAll().size()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java new file mode 100644 index 0000000..1ed31c5 --- /dev/null +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/helium/HeliumTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.helium; + +import org.apache.commons.io.FileUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class HeliumTest { + private File tmpDir; + private File localRegistryPath; + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + tmpDir.mkdirs(); + localRegistryPath = new File(tmpDir, "helium"); + localRegistryPath.mkdirs(); + } + + @After + public void tearDown() throws IOException { + FileUtils.deleteDirectory(tmpDir); + } + + @Test + public void testSaveLoadConf() throws IOException, URISyntaxException { + // given + File heliumConf = new File(tmpDir, "helium.conf"); + Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath()); + assertFalse(heliumConf.exists()); + HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); + helium.addRegistry(registry1); + assertEquals(2, helium.getAllRegistry().size()); + assertEquals(0, helium.getAllPackageInfo().size()); + + // when + helium.save(); + + // then + assertTrue(heliumConf.exists()); + + // then + Helium heliumRestored = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath()); + assertEquals(2, heliumRestored.getAllRegistry().size()); + } + + @Test + public void testRestoreRegistryInstances() throws IOException, URISyntaxException { + File heliumConf = new File(tmpDir, "helium.conf"); + Helium helium = new Helium(heliumConf.getAbsolutePath(), localRegistryPath.getAbsolutePath()); + HeliumTestRegistry registry1 = new HeliumTestRegistry("r1", "r1"); + HeliumTestRegistry registry2 = new HeliumTestRegistry("r2", "r2"); + helium.addRegistry(registry1); + helium.addRegistry(registry2); + + // when + registry1.add(new HeliumPackage( + HeliumPackage.Type.APPLICATION, + "name1", + "desc1", + "artifact1", + "className1", + new String[][]{})); + + registry2.add(new HeliumPackage( + HeliumPackage.Type.APPLICATION, + "name2", + "desc2", + "artifact2", + "className2", + new String[][]{})); + + // then + assertEquals(2, helium.getAllPackageInfo().size()); + } +}
