http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java index 42ebe48..df0c210 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -21,9 +21,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; +import java.net.URISyntaxException; import java.net.URL; +import java.util.Collections; import java.util.LinkedList; import java.util.List; +import java.util.concurrent.ConcurrentHashMap; /** * InterpreterOutput is OutputStream that supposed to print content on notebook @@ -32,6 +35,7 @@ import java.util.List; public class InterpreterOutput extends OutputStream { Logger logger = LoggerFactory.getLogger(InterpreterOutput.class); private final int NEW_LINE_CHAR = '\n'; + private List<String> resourceSearchPaths = Collections.synchronizedList(new LinkedList<String>()); ByteArrayOutputStream buffer = new ByteArrayOutputStream(); @@ -61,7 +65,6 @@ public class InterpreterOutput extends OutputStream { public void setType(InterpreterResult.Type type) { if (this.type != type) { clear(); - flushListener.onUpdate(this, new byte[]{}); this.type = type; } } @@ -74,6 +77,8 @@ public class InterpreterOutput extends OutputStream { if (watcher != null) { watcher.clear(); } + + flushListener.onUpdate(this, new byte[]{}); } } @@ -149,33 +154,33 @@ public class InterpreterOutput extends OutputStream { * @throws IOException */ public void write(URL url) throws IOException { - if ("file".equals(url.getProtocol())) { - write(new File(url.getPath())); - } else { - outList.add(url); - } + outList.add(url); + } + + public void addResourceSearchPath(String path) { + resourceSearchPaths.add(path); } public void writeResource(String resourceName) throws IOException { - // search file under resource dir first for dev mode - File mainResource = new File("./src/main/resources/" + resourceName); - File testResource = new File("./src/test/resources/" + resourceName); - if (mainResource.isFile()) { - write(mainResource); - } else if (testResource.isFile()) { - write(testResource); - } else { - // search from classpath - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - if (cl == null) { - cl = this.getClass().getClassLoader(); - } - if (cl == null) { - cl = ClassLoader.getSystemClassLoader(); + // search file under provided paths first, for dev mode + for (String path : resourceSearchPaths) { + File res = new File(path + "/" + resourceName); + if (res.isFile()) { + write(res); + return; } + } - write(cl.getResource(resourceName)); + // search from classpath + ClassLoader cl = Thread.currentThread().getContextClassLoader(); + if (cl == null) { + cl = this.getClass().getClassLoader(); } + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } + + write(cl.getResource(resourceName)); } public byte[] toByteArray() throws IOException {
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/DevInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/DevInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/DevInterpreter.java new file mode 100644 index 0000000..a972b91 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/DevInterpreter.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.dev; + +import java.util.LinkedList; +import java.util.List; +import java.util.Properties; + +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterPropertyBuilder; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; + +/** + * Dummy interpreter to support development mode for Zeppelin app + */ +public class DevInterpreter extends Interpreter { + static { + Interpreter.register( + "dev", + "dev", + DevInterpreter.class.getName(), + new InterpreterPropertyBuilder().build()); + } + + private InterpreterEvent interpreterEvent; + private InterpreterContext context; + + public static boolean isInterpreterName(String replName) { + return replName.equals("dev"); + } + + /** + * event handler for ZeppelinApplicationDevServer + */ + public static interface InterpreterEvent { + public InterpreterResult interpret(String st, InterpreterContext context); + } + + public DevInterpreter(Properties property) { + super(property); + } + + public DevInterpreter(Properties property, InterpreterEvent interpreterEvent) { + super(property); + this.interpreterEvent = interpreterEvent; + } + + @Override + public void open() { + } + + @Override + public void close() { + } + + public void rerun() { + for (InterpreterContextRunner r : context.getRunners()) { + if (context.getParagraphId().equals(r.getParagraphId())) { + r.run(); + } + } + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + this.context = context; + try { + return interpreterEvent.interpret(st, context); + } catch (Exception e) { + throw new InterpreterException(e); + } + } + + @Override + public void cancel(InterpreterContext context) { + } + + @Override + public FormType getFormType() { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) { + return 0; + } + + @Override + public List<InterpreterCompletion> completion(String buf, int cursor) { + return new LinkedList<InterpreterCompletion>(); + } + + public InterpreterContext getLastInterpretContext() { + return context; + } + + public void setInterpreterEvent(InterpreterEvent event) { + this.interpreterEvent = event; + } + + public InterpreterEvent getInterpreterEvent() { + return interpreterEvent; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java new file mode 100644 index 0000000..941fdfe --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinApplicationDevServer.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.dev; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.net.URL; + +import com.google.gson.Gson; +import org.apache.log4j.ConsoleAppender; +import org.apache.log4j.Level; +import org.apache.log4j.PatternLayout; +import org.apache.zeppelin.helium.*; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; +import org.apache.zeppelin.resource.ResourceSet; +import org.apache.zeppelin.resource.WellKnownResourceName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Run this server for development mode. + */ +public class ZeppelinApplicationDevServer extends ZeppelinDevServer { + final Logger logger = LoggerFactory.getLogger(ZeppelinApplicationDevServer.class); + + private final String className; + private final ResourceSet resourceSet; + private Application app; + private InterpreterOutput out; + + public ZeppelinApplicationDevServer(final String className, ResourceSet resourceSet) throws + Exception { + this(ZeppelinDevServer.DEFAULT_TEST_INTERPRETER_PORT, className, resourceSet); + } + + public ZeppelinApplicationDevServer(int port, String className, ResourceSet resourceSet) throws + Exception { + super(port); + this.className = className; + this.resourceSet = resourceSet; + setLogger(); + }; + + void setLogger() { + ConsoleAppender console = new ConsoleAppender(); //create appender + //configure the appender + String PATTERN = "%d [%p|%c|%C{1}] %m%n"; + console.setLayout(new PatternLayout(PATTERN)); + console.setThreshold(Level.DEBUG); + console.activateOptions(); + //add appender to any Logger (here is root) + org.apache.log4j.Logger.getRootLogger().addAppender(console); + } + + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + if (app == null) { + logger.info("Create instance " + className); + try { + Class<?> appClass = ClassLoader.getSystemClassLoader().loadClass(className); + Constructor<?> constructor = appClass.getConstructor(ApplicationContext.class); + + // classPath will be ..../target/classes in dev mode most cases + String classPath = appClass.getProtectionDomain().getCodeSource().getLocation().getPath(); + + context.out.addResourceSearchPath(classPath + "../../src/main/resources/"); + context.out.addResourceSearchPath(classPath + "../../src/test/resources/"); + + ApplicationContext appContext = getApplicationContext(context); + app = (Application) constructor.newInstance(appContext); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + } + + try { + logger.info("Run " + className); + app.context().out.clear(); + app.context().out.setType(InterpreterResult.Type.ANGULAR); + transferTableResultDataToFrontend(); + app.run(resourceSet); + } catch (IOException | ApplicationException e) { + logger.error(e.getMessage(), e); + return new InterpreterResult(Code.ERROR, e.getMessage()); + } + return new InterpreterResult(Code.SUCCESS, ""); + } + + private void transferTableResultDataToFrontend() throws IOException { + ResourceSet results = resourceSet.filterByClassname(InterpreterResult.class.getName()); + if (results.size() == 0) { + return; + } + + InterpreterResult result = (InterpreterResult) results.get(0).get(); + Gson gson = new Gson(); + String resultJson = gson.toJson(result); + StringBuffer transferResult = new StringBuffer(); + transferResult.append("$z.result = " + resultJson + ";\n"); + if (result.type() == InterpreterResult.Type.TABLE) { + transferResult.append("$z.scope.loadTableData($z.result);\n"); + } + transferResult.append("$z.scope._devmodeResult = $z.result;\n"); + app.printStringAsJavascript(transferResult.toString()); + } + + ApplicationContext getApplicationContext(InterpreterContext interpreterContext) { + return new ApplicationContext( + interpreterContext.getNoteId(), + interpreterContext.getParagraphId(), + "app_" + this.hashCode(), + new HeliumAppAngularObjectRegistry( + interpreterContext.getAngularObjectRegistry(), + interpreterContext.getNoteId(), + interpreterContext.getParagraphId()), + interpreterContext.out); + } + + @Override + protected InterpreterOutput createInterpreterOutput( + final String noteId, final String paragraphId) { + if (out == null) { + final RemoteInterpreterEventClient eventClient = getEventClient(); + try { + out = new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output)); + } + }, this); + } catch (IOException e) { + return null; + } + } + + return out; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java new file mode 100644 index 0000000..9f40923 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/dev/ZeppelinDevServer.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.dev; + +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.Properties; + +import org.apache.thrift.TException; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.dev.DevInterpreter.InterpreterEvent; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Interpreter development server + */ +public class ZeppelinDevServer extends + RemoteInterpreterServer implements InterpreterEvent, InterpreterOutputChangeListener { + final Logger logger = LoggerFactory.getLogger(ZeppelinDevServer.class); + public static final int DEFAULT_TEST_INTERPRETER_PORT = 29914; + + DevInterpreter interpreter = null; + InterpreterOutput out; + public ZeppelinDevServer(int port) throws TException { + super(port); + } + + @Override + protected Interpreter getInterpreter(String noteId, String className) throws TException { + synchronized (this) { + InterpreterGroup interpreterGroup = getInterpreterGroup(); + if (interpreterGroup == null) { + createInterpreter( + "dev", + noteId, + DevInterpreter.class.getName(), + new HashMap<String, String>()); + + Interpreter intp = super.getInterpreter(noteId, className); + interpreter = (DevInterpreter) ( + ((LazyOpenInterpreter) intp).getInnerInterpreter()); + interpreter.setInterpreterEvent(this); + notify(); + } + } + return super.getInterpreter(noteId, className); + } + + @Override + protected InterpreterOutput createInterpreterOutput( + final String noteId, final String paragraphId) { + if (out == null) { + final RemoteInterpreterEventClient eventClient = getEventClient(); + try { + out = new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + eventClient.onInterpreterOutputAppend(noteId, paragraphId, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + eventClient.onInterpreterOutputUpdate(noteId, paragraphId, new String(output)); + } + }, this); + } catch (IOException e) { + return null; + } + } + + out.clear(); + return out; + } + + @Override + public void fileChanged(File file) { + refresh(); + } + + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + waitForConnected(); + return new InterpreterResult(InterpreterResult.Code.SUCCESS, ""); + } + + public void refresh() { + interpreter.rerun(); + } + + /** + * Wait until %dev paragraph is executed and connected to this process + */ + public void waitForConnected() { + synchronized (this) { + while (!isConnected()) { + try { + this.wait(10 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + public boolean isConnected() { + return !(interpreter == null || interpreter.getLastInterpretContext() == null); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index 6126f75..809c574 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -76,4 +76,9 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{ } } } + + @Override + public boolean validateObject(PooledObject<Client> p) { + return p.getObject().getOutputProtocol().getTransport().isOpen(); + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index b80a252..0ac7116 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -23,9 +23,7 @@ import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.interpreter.Interpreter; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -96,7 +94,7 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); - if (!remoteInterpreterProcess.isRunning()) { + if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { return super.remove(name, noteId, paragraphId); } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index e18edbd..db740f4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -23,6 +23,7 @@ import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; +import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.display.Input; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Type; @@ -43,6 +44,7 @@ import com.google.gson.reflect.TypeToken; */ public class RemoteInterpreter extends Interpreter { private final RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private final ApplicationEventListener applicationEventListener; Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class); Gson gson = new Gson(); private String interpreterRunner; @@ -55,17 +57,22 @@ public class RemoteInterpreter extends Interpreter { private Map<String, String> env; private int connectTimeout; private int maxPoolSize; - private static String schedulerName; + private String host; + private int port; + /** + * Remote interpreter and manage interpreter process + */ public RemoteInterpreter(Properties property, - String noteId, - String className, - String interpreterRunner, - String interpreterPath, - String localRepoPath, - int connectTimeout, - int maxPoolSize, - RemoteInterpreterProcessListener remoteInterpreterProcessListener) { + String noteId, + String className, + String interpreterRunner, + String interpreterPath, + String localRepoPath, + int connectTimeout, + int maxPoolSize, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener) { super(property); this.noteId = noteId; this.className = className; @@ -77,9 +84,39 @@ public class RemoteInterpreter extends Interpreter { this.connectTimeout = connectTimeout; this.maxPoolSize = maxPoolSize; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; } - public RemoteInterpreter(Properties property, + + /** + * Connect to existing process + */ + public RemoteInterpreter( + Properties property, + String noteId, + String className, + String host, + int port, + int connectTimeout, + int maxPoolSize, + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener) { + super(property); + this.noteId = noteId; + this.className = className; + initialized = false; + this.host = host; + this.port = port; + this.connectTimeout = connectTimeout; + this.maxPoolSize = maxPoolSize; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; + } + + + // VisibleForTesting + public RemoteInterpreter( + Properties property, String noteId, String className, String interpreterRunner, @@ -87,7 +124,8 @@ public class RemoteInterpreter extends Interpreter { String localRepoPath, Map<String, String> env, int connectTimeout, - RemoteInterpreterProcessListener remoteInterpreterProcessListener) { + RemoteInterpreterProcessListener remoteInterpreterProcessListener, + ApplicationEventListener appListener) { super(property); this.className = className; this.noteId = noteId; @@ -99,6 +137,7 @@ public class RemoteInterpreter extends Interpreter { this.connectTimeout = connectTimeout; this.maxPoolSize = 10; this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.applicationEventListener = appListener; } private Map<String, String> getEnvFromInterpreterProperty(Properties property) { @@ -124,6 +163,10 @@ public class RemoteInterpreter extends Interpreter { return className; } + private boolean connectToExistingProcess() { + return host != null && port > 0; + } + public RemoteInterpreterProcess getInterpreterProcess() { InterpreterGroup intpGroup = getInterpreterGroup(); if (intpGroup == null) { @@ -132,10 +175,20 @@ public class RemoteInterpreter extends Interpreter { synchronized (intpGroup) { if (intpGroup.getRemoteInterpreterProcess() == null) { - // create new remote process - RemoteInterpreterProcess remoteProcess = new RemoteInterpreterProcess( - interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, - remoteInterpreterProcessListener); + RemoteInterpreterProcess remoteProcess; + if (connectToExistingProcess()) { + remoteProcess = new RemoteInterpreterRunningProcess( + connectTimeout, + remoteInterpreterProcessListener, + applicationEventListener, + host, + port); + } else { + // create new remote process + remoteProcess = new RemoteInterpreterManagedProcess( + interpreterRunner, interpreterPath, localRepoPath, env, connectTimeout, + remoteInterpreterProcessListener, applicationEventListener); + } intpGroup.setRemoteInterpreterProcess(remoteProcess); } @@ -168,7 +221,9 @@ public class RemoteInterpreter extends Interpreter { boolean broken = false; try { logger.info("Create remote interpreter {}", getClassName()); - property.put("zeppelin.interpreter.localRepo", localRepoPath); + if (localRepoPath != null) { + property.put("zeppelin.interpreter.localRepo", localRepoPath); + } client.createInterpreter(groupId, noteId, getClassName(), (Map) property); @@ -462,4 +517,12 @@ public class RemoteInterpreter extends Interpreter { client.angularRegistryPush(gson.toJson(registry, registryType)); } } + + public Map<String, String> getEnv() { + return env; + } + + public void setEnv(Map<String, String> env) { + this.env = env; + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java index 158f145..6f26ffd 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventClient.java @@ -242,4 +242,55 @@ public class RemoteInterpreterEventClient implements ResourcePoolConnector { } } + public void onAppOutputAppend(String noteId, String paragraphId, String appId, String output) { + Map<String, String> appendOutput = new HashMap<String, String>(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("appId", appId); + appendOutput.put("data", output); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_APPEND, + gson.toJson(appendOutput))); + } + + + public void onAppOutputUpdate(String noteId, String paragraphId, String appId, String output) { + Map<String, String> appendOutput = new HashMap<String, String>(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("appId", appId); + appendOutput.put("data", output); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.OUTPUT_UPDATE, + gson.toJson(appendOutput))); + } + + public void onAppStatusUpdate(String noteId, String paragraphId, String appId, String status) { + Map<String, String> appendOutput = new HashMap<String, String>(); + appendOutput.put("noteId", noteId); + appendOutput.put("paragraphId", paragraphId); + appendOutput.put("appId", appId); + appendOutput.put("status", status); + + sendEvent(new RemoteInterpreterEvent( + RemoteInterpreterEventType.APP_STATUS_UPDATE, + gson.toJson(appendOutput))); + } + + /** + * Wait for eventQueue becomes empty + */ + public void waitForEventQueueBecomesEmpty() { + synchronized (eventQueue) { + while (!eventQueue.isEmpty()) { + try { + eventQueue.wait(100); + } catch (InterruptedException e) { + // ignore exception + } + } + } + } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index 48e79de..48c14d5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -22,9 +22,9 @@ import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.InterpreterContextRunner; import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.InterpreterOutputListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEvent; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventType; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; @@ -36,7 +36,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -47,14 +46,18 @@ import java.util.Map; public class RemoteInterpreterEventPoller extends Thread { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterEventPoller.class); private final RemoteInterpreterProcessListener listener; + private final ApplicationEventListener appListener; private volatile boolean shutdown; private RemoteInterpreterProcess interpreterProcess; private InterpreterGroup interpreterGroup; - public RemoteInterpreterEventPoller(RemoteInterpreterProcessListener listener) { + public RemoteInterpreterEventPoller( + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener) { this.listener = listener; + this.appListener = appListener; shutdown = false; } @@ -70,7 +73,17 @@ public class RemoteInterpreterEventPoller extends Thread { public void run() { Client client = null; - while (!shutdown && interpreterProcess.isRunning()) { + while (!shutdown) { + // wait and retry + if (!interpreterProcess.isRunning()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // nothing to do + } + continue; + } + try { client = interpreterProcess.getClient(); } catch (Exception e1) { @@ -141,17 +154,38 @@ public class RemoteInterpreterEventPoller extends Thread { String noteId = outputAppend.get("noteId"); String paragraphId = outputAppend.get("paragraphId"); String outputToAppend = outputAppend.get("data"); + String appId = outputAppend.get("appId"); - listener.onOutputAppend(noteId, paragraphId, outputToAppend); + if (appId == null) { + listener.onOutputAppend(noteId, paragraphId, outputToAppend); + } else { + appListener.onOutputAppend(noteId, paragraphId, appId, outputToAppend); + } } else if (event.getType() == RemoteInterpreterEventType.OUTPUT_UPDATE) { // on output update Map<String, String> outputAppend = gson.fromJson( - event.getData(), new TypeToken<Map<String, String>>() {}.getType()); + event.getData(), new TypeToken<Map<String, String>>() {}.getType()); String noteId = outputAppend.get("noteId"); String paragraphId = outputAppend.get("paragraphId"); String outputToUpdate = outputAppend.get("data"); + String appId = outputAppend.get("appId"); + + if (appId == null) { + listener.onOutputUpdated(noteId, paragraphId, outputToUpdate); + } else { + appListener.onOutputUpdated(noteId, paragraphId, appId, outputToUpdate); + } + } else if (event.getType() == RemoteInterpreterEventType.APP_STATUS_UPDATE) { + // on output update + Map<String, String> appStatusUpdate = gson.fromJson( + event.getData(), new TypeToken<Map<String, String>>() {}.getType()); + + String noteId = appStatusUpdate.get("noteId"); + String paragraphId = appStatusUpdate.get("paragraphId"); + String appId = appStatusUpdate.get("appId"); + String status = appStatusUpdate.get("status"); - listener.onOutputUpdated(noteId, paragraphId, outputToUpdate); + appListener.onStatusChange(noteId, paragraphId, appId, status); } logger.debug("Event from remoteproceess {}", event.getType()); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java new file mode 100644 index 0000000..098a9d4 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.commons.exec.*; +import org.apache.commons.exec.environment.EnvironmentUtils; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; + +/** + * This class manages start / stop of remote interpreter process + */ +public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess + implements ExecuteResultHandler { + private static final Logger logger = LoggerFactory.getLogger( + RemoteInterpreterManagedProcess.class); + private final String interpreterRunner; + + private DefaultExecutor executor; + private ExecuteWatchdog watchdog; + boolean running = false; + private int port = -1; + private final String interpreterDir; + private final String localRepoDir; + + private Map<String, String> env; + + public RemoteInterpreterManagedProcess( + String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener) { + super(new RemoteInterpreterEventPoller(listener, appListener), + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + } + + RemoteInterpreterManagedProcess(String intpRunner, + String intpDir, + String localRepoDir, + Map<String, String> env, + RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout) { + super(remoteInterpreterEventPoller, + connectTimeout); + this.interpreterRunner = intpRunner; + this.env = env; + this.interpreterDir = intpDir; + this.localRepoDir = localRepoDir; + } + + @Override + public String getHost() { + return "localhost"; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start() { + // start server process + try { + port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); + } catch (IOException e1) { + throw new InterpreterException(e1); + } + + CommandLine cmdLine = CommandLine.parse(interpreterRunner); + cmdLine.addArgument("-d", false); + cmdLine.addArgument(interpreterDir, false); + cmdLine.addArgument("-p", false); + cmdLine.addArgument(Integer.toString(port), false); + cmdLine.addArgument("-l", false); + cmdLine.addArgument(localRepoDir, false); + + executor = new DefaultExecutor(); + + watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); + executor.setWatchdog(watchdog); + + try { + Map procEnv = EnvironmentUtils.getProcEnvironment(); + procEnv.putAll(env); + + logger.info("Run interpreter process {}", cmdLine); + executor.execute(cmdLine, procEnv, this); + running = true; + } catch (IOException e) { + running = false; + throw new InterpreterException(e); + } + + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < getConnectTimeout()) { + if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { + break; + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + logger.error("Exception in RemoteInterpreterProcess while synchronized reference " + + "Thread.sleep", e); + } + } + } + } + + public void stop() { + if (isRunning()) { + logger.info("kill interpreter process"); + watchdog.destroyProcess(); + } + + executor = null; + watchdog = null; + running = false; + logger.info("Remote process terminated"); + } + + @Override + public void onProcessComplete(int exitValue) { + logger.info("Interpreter process exited {}", exitValue); + running = false; + + } + + @Override + public void onProcessFailed(ExecuteException e) { + logger.info("Interpreter process failed {}", e); + running = false; + } + + public boolean isRunning() { + return running; + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 05baf62..aef6c2b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -14,160 +14,72 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; import org.apache.commons.exec.*; -import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; +import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.Constants; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.Properties; /** - * + * Abstract class for interpreter process */ -public class RemoteInterpreterProcess implements ExecuteResultHandler { +public abstract class RemoteInterpreterProcess { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private final AtomicInteger referenceCount; - private DefaultExecutor executor; private ExecuteWatchdog watchdog; - boolean running = false; - private int port = -1; - private final String interpreterRunner; - private final String interpreterDir; - private final String localRepoDir; private GenericObjectPool<Client> clientPool; - private Map<String, String> env; private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; private int connectTimeout; String host = "localhost"; boolean isInterpreterAlreadyExecuting = false; - public RemoteInterpreterProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, + public RemoteInterpreterProcess( int connectTimeout, - RemoteInterpreterProcessListener listener) { - this(intpRunner, - intpDir, - localRepoDir, - env, - new RemoteInterpreterEventPoller(listener), + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener) { + this(new RemoteInterpreterEventPoller(listener, appListener), connectTimeout); } - RemoteInterpreterProcess(String intpRunner, - String intpDir, - String localRepoDir, - Map<String, String> env, - RemoteInterpreterEventPoller remoteInterpreterEventPoller, - int connectTimeout) { - this.interpreterRunner = intpRunner; - this.interpreterDir = intpDir; - this.localRepoDir = localRepoDir; - this.env = env; + RemoteInterpreterProcess(RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout) { this.interpreterContextRunnerPool = new InterpreterContextRunnerPool(); referenceCount = new AtomicInteger(0); this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; this.connectTimeout = connectTimeout; } + public abstract String getHost(); + public abstract int getPort(); + public abstract void start(); + public abstract void stop(); + public abstract boolean isRunning(); - public int getPort() { - return port; + public int getConnectTimeout() { + return connectTimeout; } public int reference(InterpreterGroup interpreterGroup) { synchronized (referenceCount) { - if (executor == null) { - if (interpreterGroup.containsKey(Constants.EXISTING_PROCESS)) { - Properties properties = interpreterGroup.getProperty(); - isInterpreterAlreadyExecuting = true; - if (isInterpreterAlreadyExecuting) { - if (properties.containsKey(Constants.ZEPPELIN_INTERPRETER_HOST)) { - host = properties.getProperty(Constants.ZEPPELIN_INTERPRETER_HOST); - - } else { - throw new InterpreterException("Can't find value for option Host." - + "Please specify the host on which interpreter is executing"); - } - if (properties.containsKey(Constants.ZEPPELIN_INTERPRETER_PORT)) { - port = Integer.parseInt( - interpreterGroup.getProperty().getProperty(Constants.ZEPPELIN_INTERPRETER_PORT)); - } else { - throw new InterpreterException("Can't find value for option Port." - + "Please specify the port on which interpreter is listening"); - } - } - running = true; - } - - if (!isInterpreterAlreadyExecuting) { - try { - port = RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(); - } catch (IOException e1) { - throw new InterpreterException(e1); - } - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(Integer.toString(port), false); - cmdLine.addArgument("-l", false); - cmdLine.addArgument(localRepoDir, false); - - executor = new DefaultExecutor(); - - watchdog = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT); - executor.setWatchdog(watchdog); - - running = true; - try { - Map procEnv = EnvironmentUtils.getProcEnvironment(); - procEnv.putAll(env); - - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); - - } catch (IOException e) { - running = false; - throw new InterpreterException(e); - } - - } else { - logger.info( - "Not starting interpreter as \"isExistingProcess\" is enabled"); - } - - long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < connectTimeout) { - if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(host, port)) { - break; - } else { - try { - Thread.sleep(500); - } catch (InterruptedException e) { - logger.error("Exception in RemoteInterpreterProcess while synchronized reference " - + "Thread.sleep", e); - } - } - } + if (!isRunning()) { + start(); + } - clientPool = new GenericObjectPool<Client>(new ClientFactory(host, port)); + if (clientPool == null) { + clientPool = new GenericObjectPool<Client>(new ClientFactory(getHost(), getPort())); + clientPool.setTestOnBorrow(true); remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); remoteInterpreterEventPoller.setInterpreterProcess(this); @@ -250,16 +162,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { break; } } - - if (isRunning()) { - logger.info("kill interpreter process"); - watchdog.destroyProcess(); - } - - executor = null; - watchdog = null; - running = false; - logger.info("Remote process terminated"); } return r; } @@ -271,23 +173,6 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { } } - @Override - public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); - running = false; - - } - - @Override - public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running = false; - } - - public boolean isRunning() { - return running; - } - public int getNumActiveClient() { if (clientPool == null) { return 0; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java new file mode 100644 index 0000000..42e6250 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class connects to existing process + */ +public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { + private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); + private final String host; + private final int port; + + public RemoteInterpreterRunningProcess( + int connectTimeout, + RemoteInterpreterProcessListener listener, + ApplicationEventListener appListener, + String host, + int port + ) { + super(connectTimeout, listener, appListener); + this.host = host; + this.port = port; + } + + @Override + public String getHost() { + return host; + } + + @Override + public int getPort() { + return port; + } + + @Override + public void start() { + // assume process is externally managed. nothing to do + } + + @Override + public void stop() { + // assume process is externally managed. nothing to do + } + + @Override + public boolean isRunning() { + return RemoteInterpreterUtils.checkIfRemoteEndpointAccessible(getHost(), getPort()); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6b4edc4..1730265 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -29,7 +29,9 @@ import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.*; +import org.apache.zeppelin.helium.*; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.*; @@ -58,6 +60,8 @@ public class RemoteInterpreterServer InterpreterGroup interpreterGroup; AngularObjectRegistry angularObjectRegistry; DistributedResourcePool resourcePool; + private ApplicationLoader appLoader; + Gson gson = new Gson(); RemoteInterpreterService.Processor<RemoteInterpreterServer> processor; @@ -66,6 +70,10 @@ public class RemoteInterpreterServer private TThreadPoolServer server; RemoteInterpreterEventClient eventClient = new RemoteInterpreterEventClient(); + private DependencyResolver depLoader; + + private final Map<String, RunningApplication> runningApplications = + Collections.synchronizedMap(new HashMap<String, RunningApplication>()); public RemoteInterpreterServer(int port) throws TTransportException { this.port = port; @@ -84,6 +92,7 @@ public class RemoteInterpreterServer @Override public void shutdown() throws TException { + eventClient.waitForEventQueueBecomesEmpty(); if (interpreterGroup != null) { interpreterGroup.close(); interpreterGroup.destroy(); @@ -134,14 +143,17 @@ public class RemoteInterpreterServer @Override public void createInterpreter(String interpreterGroupId, String noteId, String - className, - Map<String, String> properties) throws TException { + className, Map<String, String> properties) throws TException { if (interpreterGroup == null) { interpreterGroup = new InterpreterGroup(interpreterGroupId); angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), this); resourcePool = new DistributedResourcePool(interpreterGroup.getId(), eventClient); interpreterGroup.setAngularObjectRegistry(angularObjectRegistry); interpreterGroup.setResourcePool(resourcePool); + + String localRepoPath = properties.get("zeppelin.interpreter.localRepo"); + depLoader = new DependencyResolver(localRepoPath); + appLoader = new ApplicationLoader(resourcePool, depLoader); } try { @@ -176,6 +188,18 @@ public class RemoteInterpreterServer } } + protected InterpreterGroup getInterpreterGroup() { + return interpreterGroup; + } + + protected ResourcePool getResourcePool() { + return resourcePool; + } + + protected RemoteInterpreterEventClient getEventClient() { + return eventClient; + } + private void setSystemProperty(Properties properties) { for (Object key : properties.keySet()) { if (!RemoteInterpreter.isEnvString((String) key)) { @@ -189,7 +213,7 @@ public class RemoteInterpreterServer } } - private Interpreter getInterpreter(String noteId, String className) throws TException { + protected Interpreter getInterpreter(String noteId, String className) throws TException { if (interpreterGroup == null) { throw new TException( new InterpreterException("Interpreter instance " + className + " not created")); @@ -218,6 +242,24 @@ public class RemoteInterpreterServer @Override public void close(String noteId, String className) throws TException { + // unload all applications + for (String appId : runningApplications.keySet()) { + RunningApplication appInfo = runningApplications.get(appId); + + // see NoteInterpreterLoader.SHARED_SESSION + if (appInfo.noteId.equals(noteId) || noteId.equals("shared_session")) { + try { + logger.info("Unload App {} ", appInfo.pkg.getName()); + appInfo.app.unload(); + // see ApplicationState.Status.UNLOADED + eventClient.onAppStatusUpdate(appInfo.noteId, appInfo.paragraphId, appId, "UNLOADED"); + } catch (ApplicationException e) { + logger.error(e.getMessage(), e); + } + } + } + + // close interpreters synchronized (interpreterGroup) { List<Interpreter> interpreters = interpreterGroup.get(noteId); if (interpreters != null) { @@ -346,7 +388,6 @@ public class RemoteInterpreterServer context.out.flush(); InterpreterResult.Type outputType = context.out.getType(); byte[] interpreterOutput = context.out.toByteArray(); - context.out.clear(); if (interpreterOutput != null && interpreterOutput.length > 0) { message = new String(interpreterOutput); @@ -363,11 +404,13 @@ public class RemoteInterpreterServer } // put result into resource pool - context.getResourcePool().put( - context.getNoteId(), - context.getParagraphId(), - WellKnownResourceName.ParagraphResult.toString(), - combinedResult); + if (combinedResult.type() == InterpreterResult.Type.TABLE) { + context.getResourcePool().put( + context.getNoteId(), + context.getParagraphId(), + WellKnownResourceName.ZeppelinTableResult.toString(), + combinedResult); + } return combinedResult; } finally { InterpreterContext.remove(); @@ -392,7 +435,7 @@ public class RemoteInterpreterServer if (job != null) { job.setStatus(Status.ABORT); } else { - intp.cancel(convert(interpreterContext)); + intp.cancel(convert(interpreterContext, null)); } } @@ -401,7 +444,7 @@ public class RemoteInterpreterServer RemoteInterpreterContext interpreterContext) throws TException { Interpreter intp = getInterpreter(noteId, className); - return intp.getProgress(convert(interpreterContext)); + return intp.getProgress(convert(interpreterContext, null)); } @@ -421,6 +464,10 @@ public class RemoteInterpreterServer } private InterpreterContext convert(RemoteInterpreterContext ric) { + return convert(ric, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId())); + } + + private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutput output) { List<InterpreterContextRunner> contextRunners = new LinkedList<InterpreterContextRunner>(); List<InterpreterContextRunner> runners = gson.fromJson(ric.getRunners(), new TypeToken<List<RemoteInterpreterContextRunner>>() { @@ -441,11 +488,12 @@ public class RemoteInterpreterServer gson.fromJson(ric.getGui(), GUI.class), interpreterGroup.getAngularObjectRegistry(), interpreterGroup.getResourcePool(), - contextRunners, createInterpreterOutput(ric.getNoteId(), ric.getParagraphId())); + contextRunners, output); } - private InterpreterOutput createInterpreterOutput(final String noteId, final String paragraphId) { + protected InterpreterOutput createInterpreterOutput(final String noteId, final String + paragraphId) { return new InterpreterOutput(new InterpreterOutputListener() { @Override public void onAppend(InterpreterOutput out, byte[] line) { @@ -660,9 +708,14 @@ public class RemoteInterpreterServer @Override public List<String> resourcePoolGetAll() throws TException { logger.debug("Request getAll from ZeppelinServer"); + List<String> result = new LinkedList<String>(); + + if (resourcePool == null) { + return result; + } ResourceSet resourceSet = resourcePool.getAll(false); - List<String> result = new LinkedList<String>(); + Gson gson = new Gson(); for (Resource r : resourceSet) { @@ -708,4 +761,134 @@ public class RemoteInterpreterServer logger.info("Exception in RemoteInterpreterServer while angularRegistryPush, nolock", e); } } + + protected InterpreterOutput createAppOutput(final String noteId, + final String paragraphId, + final String appId) { + return new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + eventClient.onAppOutputAppend(noteId, paragraphId, appId, new String(line)); + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + eventClient.onAppOutputUpdate(noteId, paragraphId, appId, new String(output)); + } + }); + } + + private ApplicationContext getApplicationContext( + HeliumPackage packageInfo, String noteId, String paragraphId, String applicationInstanceId) { + InterpreterOutput out = createAppOutput(noteId, paragraphId, applicationInstanceId); + return new ApplicationContext( + noteId, + paragraphId, + applicationInstanceId, + new HeliumAppAngularObjectRegistry(angularObjectRegistry, noteId, applicationInstanceId), + out); + } + + @Override + public RemoteApplicationResult loadApplication( + String applicationInstanceId, String packageInfo, String noteId, String paragraphId) + throws TException { + if (runningApplications.containsKey(applicationInstanceId)) { + logger.warn("Application instance {} is already running"); + return new RemoteApplicationResult(true, ""); + } + HeliumPackage pkgInfo = gson.fromJson(packageInfo, HeliumPackage.class); + ApplicationContext context = getApplicationContext( + pkgInfo, noteId, paragraphId, applicationInstanceId); + try { + Application app = null; + logger.info( + "Loading application {}({}), artifact={}, className={} into note={}, paragraph={}", + pkgInfo.getName(), + applicationInstanceId, + pkgInfo.getArtifact(), + pkgInfo.getClassName(), + noteId, + paragraphId); + app = appLoader.load(pkgInfo, context); + runningApplications.put( + applicationInstanceId, + new RunningApplication(pkgInfo, app, noteId, paragraphId)); + return new RemoteApplicationResult(true, ""); + } catch (Exception e) { + logger.error(e.getMessage(), e); + return new RemoteApplicationResult(false, e.getMessage()); + } + } + + @Override + public RemoteApplicationResult unloadApplication(String applicationInstanceId) + throws TException { + RunningApplication runningApplication = runningApplications.remove(applicationInstanceId); + if (runningApplication != null) { + try { + logger.info("Unloading application {}", applicationInstanceId); + runningApplication.app.unload(); + } catch (ApplicationException e) { + logger.error(e.getMessage(), e); + return new RemoteApplicationResult(false, e.getMessage()); + } + } + return new RemoteApplicationResult(true, ""); + } + + @Override + public RemoteApplicationResult runApplication(String applicationInstanceId) + throws TException { + logger.info("run application {}", applicationInstanceId); + + RunningApplication runningApp = runningApplications.get(applicationInstanceId); + if (runningApp == null) { + logger.error("Application instance {} not exists", applicationInstanceId); + return new RemoteApplicationResult(false, "Application instance does not exists"); + } else { + ApplicationContext context = runningApp.app.context(); + try { + context.out.clear(); + context.out.setType(InterpreterResult.Type.ANGULAR); + ResourceSet resource = appLoader.findRequiredResourceSet( + runningApp.pkg.getResources(), + context.getNoteId(), + context.getParagraphId()); + for (Resource res : resource) { + System.err.println("Resource " + res.get()); + } + runningApp.app.run(resource); + String output = new String(context.out.toByteArray()); + eventClient.onAppOutputUpdate( + context.getNoteId(), + context.getParagraphId(), + applicationInstanceId, + output); + return new RemoteApplicationResult(true, ""); + } catch (ApplicationException | IOException e) { + return new RemoteApplicationResult(false, e.getMessage()); + } + } + + + + } + + private static class RunningApplication { + public final Application app; + public final HeliumPackage pkg; + public final String noteId; + public final String paragraphId; + + public RunningApplication(HeliumPackage pkg, + Application app, + String noteId, + String paragraphId) { + this.app = app; + this.pkg = pkg; + this.noteId = noteId; + this.paragraphId = paragraphId; + } + }; } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java new file mode 100644 index 0000000..a192899 --- /dev/null +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteApplicationResult.java @@ -0,0 +1,518 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Autogenerated by Thrift Compiler (0.9.2) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.zeppelin.interpreter.thrift; + +import org.apache.thrift.scheme.IScheme; +import org.apache.thrift.scheme.SchemeFactory; +import org.apache.thrift.scheme.StandardScheme; + +import org.apache.thrift.scheme.TupleScheme; +import org.apache.thrift.protocol.TTupleProtocol; +import org.apache.thrift.protocol.TProtocolException; +import org.apache.thrift.EncodingUtils; +import org.apache.thrift.TException; +import org.apache.thrift.async.AsyncMethodCallback; +import org.apache.thrift.server.AbstractNonblockingServer.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.EnumMap; +import java.util.Set; +import java.util.HashSet; +import java.util.EnumSet; +import java.util.Collections; +import java.util.BitSet; +import java.nio.ByteBuffer; +import java.util.Arrays; +import javax.annotation.Generated; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"}) +@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2016-5-7") +public class RemoteApplicationResult implements org.apache.thrift.TBase<RemoteApplicationResult, RemoteApplicationResult._Fields>, java.io.Serializable, Cloneable, Comparable<RemoteApplicationResult> { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("RemoteApplicationResult"); + + private static final org.apache.thrift.protocol.TField SUCCESS_FIELD_DESC = new org.apache.thrift.protocol.TField("success", org.apache.thrift.protocol.TType.BOOL, (short)1); + private static final org.apache.thrift.protocol.TField MSG_FIELD_DESC = new org.apache.thrift.protocol.TField("msg", org.apache.thrift.protocol.TType.STRING, (short)2); + + private static final Map<Class<? extends IScheme>, SchemeFactory> schemes = new HashMap<Class<? extends IScheme>, SchemeFactory>(); + static { + schemes.put(StandardScheme.class, new RemoteApplicationResultStandardSchemeFactory()); + schemes.put(TupleScheme.class, new RemoteApplicationResultTupleSchemeFactory()); + } + + public boolean success; // required + public String msg; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + SUCCESS((short)1, "success"), + MSG((short)2, "msg"); + + private static final Map<String, _Fields> byName = new HashMap<String, _Fields>(); + + static { + for (_Fields field : EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // SUCCESS + return SUCCESS; + case 2: // MSG + return MSG; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + private static final int __SUCCESS_ISSET_ID = 0; + private byte __isset_bitfield = 0; + public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.SUCCESS, new org.apache.thrift.meta_data.FieldMetaData("success", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL))); + tmpMap.put(_Fields.MSG, new org.apache.thrift.meta_data.FieldMetaData("msg", org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(RemoteApplicationResult.class, metaDataMap); + } + + public RemoteApplicationResult() { + } + + public RemoteApplicationResult( + boolean success, + String msg) + { + this(); + this.success = success; + setSuccessIsSet(true); + this.msg = msg; + } + + /** + * Performs a deep copy on <i>other</i>. + */ + public RemoteApplicationResult(RemoteApplicationResult other) { + __isset_bitfield = other.__isset_bitfield; + this.success = other.success; + if (other.isSetMsg()) { + this.msg = other.msg; + } + } + + public RemoteApplicationResult deepCopy() { + return new RemoteApplicationResult(this); + } + + @Override + public void clear() { + setSuccessIsSet(false); + this.success = false; + this.msg = null; + } + + public boolean isSuccess() { + return this.success; + } + + public RemoteApplicationResult setSuccess(boolean success) { + this.success = success; + setSuccessIsSet(true); + return this; + } + + public void unsetSuccess() { + __isset_bitfield = EncodingUtils.clearBit(__isset_bitfield, __SUCCESS_ISSET_ID); + } + + /** Returns true if field success is set (has been assigned a value) and false otherwise */ + public boolean isSetSuccess() { + return EncodingUtils.testBit(__isset_bitfield, __SUCCESS_ISSET_ID); + } + + public void setSuccessIsSet(boolean value) { + __isset_bitfield = EncodingUtils.setBit(__isset_bitfield, __SUCCESS_ISSET_ID, value); + } + + public String getMsg() { + return this.msg; + } + + public RemoteApplicationResult setMsg(String msg) { + this.msg = msg; + return this; + } + + public void unsetMsg() { + this.msg = null; + } + + /** Returns true if field msg is set (has been assigned a value) and false otherwise */ + public boolean isSetMsg() { + return this.msg != null; + } + + public void setMsgIsSet(boolean value) { + if (!value) { + this.msg = null; + } + } + + public void setFieldValue(_Fields field, Object value) { + switch (field) { + case SUCCESS: + if (value == null) { + unsetSuccess(); + } else { + setSuccess((Boolean)value); + } + break; + + case MSG: + if (value == null) { + unsetMsg(); + } else { + setMsg((String)value); + } + break; + + } + } + + public Object getFieldValue(_Fields field) { + switch (field) { + case SUCCESS: + return Boolean.valueOf(isSuccess()); + + case MSG: + return getMsg(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case SUCCESS: + return isSetSuccess(); + case MSG: + return isSetMsg(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof RemoteApplicationResult) + return this.equals((RemoteApplicationResult)that); + return false; + } + + public boolean equals(RemoteApplicationResult that) { + if (that == null) + return false; + + boolean this_present_success = true; + boolean that_present_success = true; + if (this_present_success || that_present_success) { + if (!(this_present_success && that_present_success)) + return false; + if (this.success != that.success) + return false; + } + + boolean this_present_msg = true && this.isSetMsg(); + boolean that_present_msg = true && that.isSetMsg(); + if (this_present_msg || that_present_msg) { + if (!(this_present_msg && that_present_msg)) + return false; + if (!this.msg.equals(that.msg)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + List<Object> list = new ArrayList<Object>(); + + boolean present_success = true; + list.add(present_success); + if (present_success) + list.add(success); + + boolean present_msg = true && (isSetMsg()); + list.add(present_msg); + if (present_msg) + list.add(msg); + + return list.hashCode(); + } + + @Override + public int compareTo(RemoteApplicationResult other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetSuccess()).compareTo(other.isSetSuccess()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetSuccess()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.success, other.success); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetMsg()).compareTo(other.isSetMsg()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetMsg()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.msg, other.msg); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + schemes.get(iprot.getScheme()).getScheme().read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + schemes.get(oprot.getScheme()).getScheme().write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("RemoteApplicationResult("); + boolean first = true; + + sb.append("success:"); + sb.append(this.success); + first = false; + if (!first) sb.append(", "); + sb.append("msg:"); + if (this.msg == null) { + sb.append("null"); + } else { + sb.append(this.msg); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor. + __isset_bitfield = 0; + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class RemoteApplicationResultStandardSchemeFactory implements SchemeFactory { + public RemoteApplicationResultStandardScheme getScheme() { + return new RemoteApplicationResultStandardScheme(); + } + } + + private static class RemoteApplicationResultStandardScheme extends StandardScheme<RemoteApplicationResult> { + + public void read(org.apache.thrift.protocol.TProtocol iprot, RemoteApplicationResult struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // SUCCESS + if (schemeField.type == org.apache.thrift.protocol.TType.BOOL) { + struct.success = iprot.readBool(); + struct.setSuccessIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // MSG + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.msg = iprot.readString(); + struct.setMsgIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, RemoteApplicationResult struct) throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + oprot.writeFieldBegin(SUCCESS_FIELD_DESC); + oprot.writeBool(struct.success); + oprot.writeFieldEnd(); + if (struct.msg != null) { + oprot.writeFieldBegin(MSG_FIELD_DESC); + oprot.writeString(struct.msg); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class RemoteApplicationResultTupleSchemeFactory implements SchemeFactory { + public RemoteApplicationResultTupleScheme getScheme() { + return new RemoteApplicationResultTupleScheme(); + } + } + + private static class RemoteApplicationResultTupleScheme extends TupleScheme<RemoteApplicationResult> { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, RemoteApplicationResult struct) throws org.apache.thrift.TException { + TTupleProtocol oprot = (TTupleProtocol) prot; + BitSet optionals = new BitSet(); + if (struct.isSetSuccess()) { + optionals.set(0); + } + if (struct.isSetMsg()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetSuccess()) { + oprot.writeBool(struct.success); + } + if (struct.isSetMsg()) { + oprot.writeString(struct.msg); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, RemoteApplicationResult struct) throws org.apache.thrift.TException { + TTupleProtocol iprot = (TTupleProtocol) prot; + BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.success = iprot.readBool(); + struct.setSuccessIsSet(true); + } + if (incoming.get(1)) { + struct.msg = iprot.readString(); + struct.setMsgIsSet(true); + } + } + } + +} + http://git-wip-us.apache.org/repos/asf/zeppelin/blob/9463fb85/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java ---------------------------------------------------------------------- diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java index 66631d2..9554619 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/thrift/RemoteInterpreterEventType.java @@ -38,7 +38,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { RESOURCE_GET(7), OUTPUT_APPEND(8), OUTPUT_UPDATE(9), - ANGULAR_REGISTRY_PUSH(10); + ANGULAR_REGISTRY_PUSH(10), + APP_STATUS_UPDATE(11); private final int value; @@ -79,6 +80,8 @@ public enum RemoteInterpreterEventType implements org.apache.thrift.TEnum { return OUTPUT_UPDATE; case 10: return ANGULAR_REGISTRY_PUSH; + case 11: + return APP_STATUS_UPDATE; default: return null; }