http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index 5b7223c..a535c96 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -19,6 +19,15 @@ package org.apache.zeppelin.interpreter; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; @@ -35,10 +44,10 @@ import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.apache.zeppelin.interpreter.thrift.AppOutputAppendEvent; import org.apache.zeppelin.interpreter.thrift.AppOutputUpdateEvent; import org.apache.zeppelin.interpreter.thrift.AppStatusUpdateEvent; -import org.apache.zeppelin.interpreter.thrift.RegisterInfo; import org.apache.zeppelin.interpreter.thrift.OutputAppendEvent; import org.apache.zeppelin.interpreter.thrift.OutputUpdateAllEvent; import org.apache.zeppelin.interpreter.thrift.OutputUpdateEvent; +import org.apache.zeppelin.interpreter.thrift.RegisterInfo; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterEventService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; @@ -51,16 +60,6 @@ import org.apache.zeppelin.resource.ResourceSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - public class RemoteInterpreterEventServer implements RemoteInterpreterEventService.Iface { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterEventServer.class); @@ -79,8 +78,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi private final ApplicationEventListener appListener; private final Gson gson = new Gson(); - public RemoteInterpreterEventServer(ZeppelinConfiguration zConf, - InterpreterSettingManager interpreterSettingManager) { + public RemoteInterpreterEventServer( + ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) { this.portRange = zConf.getZeppelinServerRPCPortRange(); this.interpreterSettingManager = interpreterSettingManager; this.listener = interpreterSettingManager.getRemoteInterpreterProcessListener(); @@ -100,32 +99,33 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.info("InterpreterEventServer will start. Port: {}", port); RemoteInterpreterEventService.Processor processor = new RemoteInterpreterEventService.Processor(this); - this.thriftServer = new TThreadPoolServer( - new TThreadPoolServer.Args(tSocket).processor(processor)); + this.thriftServer = + new TThreadPoolServer(new TThreadPoolServer.Args(tSocket).processor(processor)); this.thriftServer.serve(); } public void start() throws IOException { - Thread startingThread = new Thread() { - @Override - public void run() { - TServerSocket tSocket = null; - try { - tSocket = RemoteInterpreterUtils.createTServerSocket(portRange); - port = tSocket.getServerSocket().getLocalPort(); - host = RemoteInterpreterUtils.findAvailableHostAddress(); - } catch (IOException e1) { - throw new RuntimeException(e1); - } + Thread startingThread = + new Thread() { + @Override + public void run() { + TServerSocket tSocket = null; + try { + tSocket = RemoteInterpreterUtils.createTServerSocket(portRange); + port = tSocket.getServerSocket().getLocalPort(); + host = RemoteInterpreterUtils.findAvailableHostAddress(); + } catch (IOException e1) { + throw new RuntimeException(e1); + } - LOGGER.info("InterpreterEventServer will start. Port: {}", port); - RemoteInterpreterEventService.Processor processor = - new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this); - thriftServer = new TThreadPoolServer( - new TThreadPoolServer.Args(tSocket).processor(processor)); - thriftServer.serve(); - } - }; + LOGGER.info("InterpreterEventServer will start. Port: {}", port); + RemoteInterpreterEventService.Processor processor = + new RemoteInterpreterEventService.Processor(RemoteInterpreterEventServer.this); + thriftServer = + new TThreadPoolServer(new TThreadPoolServer.Args(tSocket).processor(processor)); + thriftServer.serve(); + } + }; startingThread.start(); long start = System.currentTimeMillis(); while ((System.currentTimeMillis() - start) < 30 * 1000) { @@ -145,8 +145,9 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.info("InterpreterEventServer is started"); runner = new AppendOutputRunner(listener); - appendFuture = appendService.scheduleWithFixedDelay( - runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); + appendFuture = + appendService.scheduleWithFixedDelay( + runner, 0, AppendOutputRunner.BUFFER_TIME_MS, TimeUnit.MILLISECONDS); } public void stop() { @@ -158,7 +159,6 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi } } - public int getPort() { return port; } @@ -178,8 +178,9 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi RemoteInterpreterProcess interpreterProcess = ((ManagedInterpreterGroup) interpreterGroup).getInterpreterProcess(); if (interpreterProcess == null) { - LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " + - registerInfo.getInterpreterGroupId()); + LOGGER.warn( + "Interpreter process does not existed yet for InterpreterGroup: " + + registerInfo.getInterpreterGroupId()); } ((RemoteInterpreterManagedProcess) interpreterProcess) .processStarted(registerInfo.port, registerInfo.host); @@ -191,19 +192,32 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi runner.appendBuffer( event.getNoteId(), event.getParagraphId(), event.getIndex(), event.getData()); } else { - appListener.onOutputAppend(event.getNoteId(), event.getParagraphId(), event.getIndex(), - event.getAppId(), event.getData()); + appListener.onOutputAppend( + event.getNoteId(), + event.getParagraphId(), + event.getIndex(), + event.getAppId(), + event.getData()); } } @Override public void updateOutput(OutputUpdateEvent event) throws TException { if (event.getAppId() == null) { - listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(), - InterpreterResult.Type.valueOf(event.getType()), event.getData()); + listener.onOutputUpdated( + event.getNoteId(), + event.getParagraphId(), + event.getIndex(), + InterpreterResult.Type.valueOf(event.getType()), + event.getData()); } else { - appListener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), event.getIndex(), - event.getAppId(), InterpreterResult.Type.valueOf(event.getType()), event.getData()); + appListener.onOutputUpdated( + event.getNoteId(), + event.getParagraphId(), + event.getIndex(), + event.getAppId(), + InterpreterResult.Type.valueOf(event.getType()), + event.getData()); } } @@ -212,21 +226,30 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi listener.onOutputClear(event.getNoteId(), event.getParagraphId()); for (int i = 0; i < event.getMsg().size(); i++) { RemoteInterpreterResultMessage msg = event.getMsg().get(i); - listener.onOutputUpdated(event.getNoteId(), event.getParagraphId(), i, - InterpreterResult.Type.valueOf(msg.getType()), msg.getData()); + listener.onOutputUpdated( + event.getNoteId(), + event.getParagraphId(), + i, + InterpreterResult.Type.valueOf(msg.getType()), + msg.getData()); } } @Override public void appendAppOutput(AppOutputAppendEvent event) throws TException { - appListener.onOutputAppend(event.noteId, event.paragraphId, event.index, event.appId, - event.data); + appListener.onOutputAppend( + event.noteId, event.paragraphId, event.index, event.appId, event.data); } @Override public void updateAppOutput(AppOutputUpdateEvent event) throws TException { - appListener.onOutputUpdated(event.noteId, event.paragraphId, event.index, event.appId, - InterpreterResult.Type.valueOf(event.type), event.data); + appListener.onOutputUpdated( + event.noteId, + event.paragraphId, + event.index, + event.appId, + InterpreterResult.Type.valueOf(event.type), + event.data); } @Override @@ -237,11 +260,14 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi @Override public void runParagraphs(RunParagraphsEvent event) throws TException { try { - listener.runParagraphs(event.getNoteId(), event.getParagraphIndices(), - event.getParagraphIds(), event.getCurParagraphId()); + listener.runParagraphs( + event.getNoteId(), + event.getParagraphIndices(), + event.getParagraphIds(), + event.getCurParagraphId()); if (InterpreterContext.get() != null) { - LOGGER.info("complete runParagraphs." + InterpreterContext.get().getParagraphId() + " " - + event); + LOGGER.info( + "complete runParagraphs." + InterpreterContext.get().getParagraphId() + " " + event); } else { LOGGER.info("complete runParagraphs." + event); } @@ -258,8 +284,13 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi if (interpreterGroup == null) { throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - interpreterGroup.getAngularObjectRegistry().add(angularObject.getName(), - angularObject.get(), angularObject.getNoteId(), angularObject.getParagraphId()); + interpreterGroup + .getAngularObjectRegistry() + .add( + angularObject.getName(), + angularObject.get(), + angularObject.getNoteId(), + angularObject.getParagraphId()); } @Override @@ -270,22 +301,22 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi if (interpreterGroup == null) { throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - AngularObject localAngularObject = interpreterGroup.getAngularObjectRegistry().get( - angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); + AngularObject localAngularObject = + interpreterGroup + .getAngularObjectRegistry() + .get( + angularObject.getName(), angularObject.getNoteId(), angularObject.getParagraphId()); if (localAngularObject instanceof RemoteAngularObject) { // to avoid ping-pong loop - ((RemoteAngularObject) localAngularObject).set( - angularObject.get(), true, false); + ((RemoteAngularObject) localAngularObject).set(angularObject.get(), true, false); } else { localAngularObject.set(angularObject.get()); } } @Override - public void removeAngularObject(String intpGroupId, - String noteId, - String paragraphId, - String name) throws TException { + public void removeAngularObject( + String intpGroupId, String noteId, String paragraphId, String name) throws TException { InterpreterGroup interpreterGroup = interpreterSettingManager.getInterpreterGroupById(intpGroupId); if (interpreterGroup == null) { @@ -302,13 +333,11 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi throw new TException("Invalid InterpreterGroupId: " + intpGroupId); } - Map<String, String> paraInfos = gson.fromJson(json, - new TypeToken<Map<String, String>>() { - }.getType()); + Map<String, String> paraInfos = + gson.fromJson(json, new TypeToken<Map<String, String>>() {}.getType()); String noteId = paraInfos.get("noteId"); String paraId = paraInfos.get("paraId"); - String settingId = RemoteInterpreterUtils. - getInterpreterSettingId(interpreterGroup.getId()); + String settingId = RemoteInterpreterUtils.getInterpreterSettingId(interpreterGroup.getId()); if (noteId != null && paraId != null && settingId != null) { listener.onParaInfosReceived(noteId, paraId, settingId, paraInfos); } @@ -359,8 +388,8 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi return obj; } - private Object invokeResourceMethod(String intpGroupId, - final InvokeResourceMethodEventMessage message) { + private Object invokeResourceMethod( + String intpGroupId, final InvokeResourceMethodEventMessage message) { final ResourceId resourceId = message.resourceId; ManagedInterpreterGroup intpGroup = interpreterSettingManager.getInterpreterGroupById(resourceId.getResourcePoolId()); @@ -393,21 +422,26 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi LOGGER.error("no resource pool"); return null; } - } else if (interpreterSettingManager.getInterpreterGroupById(intpGroupId) - .getInterpreterProcess().isRunning()) { - ByteBuffer res = interpreterSettingManager.getInterpreterGroupById(intpGroupId) - .getInterpreterProcess().callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { - return client.resourceInvokeMethod( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName(), - message.toJson()); - } - } - ); + } else if (interpreterSettingManager + .getInterpreterGroupById(intpGroupId) + .getInterpreterProcess() + .isRunning()) { + ByteBuffer res = + interpreterSettingManager + .getInterpreterGroupById(intpGroupId) + .getInterpreterProcess() + .callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(RemoteInterpreterService.Client client) + throws Exception { + return client.resourceInvokeMethod( + resourceId.getNoteId(), + resourceId.getParagraphId(), + resourceId.getName(), + message.toJson()); + } + }); try { return Resource.deserializeObject(res); @@ -420,23 +454,21 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi } private Object getResource(final ResourceId resourceId) { - ManagedInterpreterGroup intpGroup = interpreterSettingManager - .getInterpreterGroupById(resourceId.getResourcePoolId()); + ManagedInterpreterGroup intpGroup = + interpreterSettingManager.getInterpreterGroupById(resourceId.getResourcePoolId()); if (intpGroup == null) { return null; } RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); - ByteBuffer buffer = remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { - @Override - public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { - return client.resourceGet( - resourceId.getNoteId(), - resourceId.getParagraphId(), - resourceId.getName()); - } - } - ); + ByteBuffer buffer = + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<ByteBuffer>() { + @Override + public ByteBuffer call(RemoteInterpreterService.Client client) throws Exception { + return client.resourceGet( + resourceId.getNoteId(), resourceId.getParagraphId(), resourceId.getName()); + } + }); try { Object o = Resource.deserializeObject(buffer); @@ -461,14 +493,15 @@ public class RemoteInterpreterEventServer implements RemoteInterpreterEventServi resourceSet.addAll(localPool.getAll()); } } else if (remoteInterpreterProcess.isRunning()) { - List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<List<String>>() { - @Override - public List<String> call(RemoteInterpreterService.Client client) throws Exception { - return client.resourcePoolGetAll(); - } - } - ); + List<String> resourceList = + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) + throws Exception { + return client.resourcePoolGetAll(); + } + }); for (String res : resourceList) { resourceSet.add(RemoteResource.fromJson(res)); }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java index e303ee6..4dd0eb2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SessionConfInterpreter.java @@ -17,24 +17,24 @@ package org.apache.zeppelin.interpreter; -import org.apache.commons.lang.exception.ExceptionUtils; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; import java.io.StringReader; import java.util.List; import java.util.Properties; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class SessionConfInterpreter extends ConfInterpreter { private static Logger LOGGER = LoggerFactory.getLogger(SessionConfInterpreter.class); - public SessionConfInterpreter(Properties properties, - String sessionId, - String interpreterGroupId, - InterpreterSetting interpreterSetting) { + public SessionConfInterpreter( + Properties properties, + String sessionId, + String interpreterGroupId, + InterpreterSetting interpreterSetting) { super(properties, sessionId, interpreterGroupId, interpreterSetting); } @@ -56,7 +56,8 @@ public class SessionConfInterpreter extends ConfInterpreter { if (intp instanceof RemoteInterpreter) { RemoteInterpreter remoteInterpreter = (RemoteInterpreter) intp; if (remoteInterpreter.isOpened()) { - return new InterpreterResult(InterpreterResult.Code.ERROR, + return new InterpreterResult( + InterpreterResult.Code.ERROR, "Can not change interpreter session properties after this session is started"); } remoteInterpreter.setProperties(finalProperties); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java index 9bef4d9..a336bcd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java @@ -12,10 +12,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Utility class for downloading spark. This is used for spark integration test. - * - */ +/** Utility class for downloading spark. This is used for spark integration test. */ public class SparkDownloadUtils { private static Logger LOGGER = LoggerFactory.getLogger(SparkDownloadUtils.class); @@ -29,7 +26,6 @@ public class SparkDownloadUtils { } } - public static String downloadSpark(String version) { File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6"); if (targetSparkHomeFolder.exists()) { @@ -40,11 +36,19 @@ public class SparkDownloadUtils { boolean downloaded = false; for (int i = 0; i < 3; i++) { try { - String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + String preferredMirror = + IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); File downloadFile = new File(downloadFolder + "/spark-" + version + "-bin-hadoop2.6.tgz"); - String downloadURL = preferredMirror + "/spark/spark-" + version + "/spark-" + version + "-bin-hadoop2.6.tgz"; + String downloadURL = + preferredMirror + + "/spark/spark-" + + version + + "/spark-" + + version + + "-bin-hadoop2.6.tgz"; runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); - runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + runShellCommand( + new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); downloaded = true; break; } catch (Exception e) { @@ -82,11 +86,20 @@ public class SparkDownloadUtils { // Try mirrors a few times until one succeeds for (int i = 0; i < 3; i++) { try { - String preferredMirror = IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); - File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); - String downloadURL = preferredMirror + "/flink/flink-" + version + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"; + String preferredMirror = + IOUtils.toString(new URL("https://www.apache.org/dyn/closer.lua?preferred=true")); + File downloadFile = + new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); + String downloadURL = + preferredMirror + + "/flink/flink-" + + version + + "/flink-" + + version + + "-bin-hadoop27-scala_2.11.tgz"; runShellCommand(new String[] {"wget", downloadURL, "-P", downloadFolder}); - runShellCommand(new String[]{"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); + runShellCommand( + new String[] {"tar", "-xvf", downloadFile.getAbsolutePath(), "-C", downloadFolder}); downloaded = true; break; } catch (Exception e) { @@ -96,7 +109,8 @@ public class SparkDownloadUtils { // fallback to use apache archive if (!downloaded) { - File downloadFile = new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); + File downloadFile = + new File(downloadFolder + "/flink-" + version + "-bin-hadoop27-scala_2.11.tgz"); String downloadURL = "https://archive.apache.org/dist/flink/flink-" + version @@ -141,7 +155,7 @@ public class SparkDownloadUtils { BufferedReader br = new BufferedReader(isr); String line = null; long startTime = System.currentTimeMillis(); - while ( (line = br.readLine()) != null) { + while ((line = br.readLine()) != null) { // logging per 5 seconds if ((System.currentTimeMillis() - startTime) > 5000) { LOGGER.info(line); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java index 0817595..c5d330b 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -16,12 +16,6 @@ */ package org.apache.zeppelin.interpreter.install; -import org.apache.commons.io.FileUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.util.Util; -import org.sonatype.aether.RepositoryException; - import java.io.File; import java.io.IOException; import java.net.URL; @@ -30,10 +24,13 @@ import java.util.List; import java.util.Locale; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.io.FileUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.util.Util; +import org.sonatype.aether.RepositoryException; -/** - * Commandline utility to install interpreter from maven repository - */ +/** Commandline utility to install interpreter from maven repository */ public class InstallInterpreter { private final File interpreterListFile; private final File interpreterBaseDir; @@ -44,7 +41,6 @@ public class InstallInterpreter { private String proxyPassword; /** - * * @param interpreterListFile * @param interpreterBaseDir interpreter directory for installing binaries * @throws IOException @@ -58,10 +54,7 @@ public class InstallInterpreter { readAvailableInterpreters(); } - - /** - * Information for available informations - */ + /** Information for available informations */ private static class AvailableInterpreterInfo { public final String name; public final String artifact; @@ -121,7 +114,7 @@ public class InstallInterpreter { } } - public void install(String [] names) { + public void install(String[] names) { for (String name : names) { install(name); } @@ -139,7 +132,7 @@ public class InstallInterpreter { throw new RuntimeException("Can't find interpreter '" + name + "'"); } - public void install(String [] names, String [] artifacts) { + public void install(String[] names, String[] artifacts) { if (names.length != artifacts.length) { throw new RuntimeException("Length of given names and artifacts are different"); } @@ -157,19 +150,18 @@ public class InstallInterpreter { File installDir = new File(interpreterBaseDir, name); if (installDir.exists()) { - System.err.println("Directory " + installDir.getAbsolutePath() - + " already exists" - + "\n\nSkipped"); + System.err.println( + "Directory " + installDir.getAbsolutePath() + " already exists" + "\n\nSkipped"); return; } - System.out.println("Install " + name + "(" + artifact + ") to " - + installDir.getAbsolutePath() + " ... "); + System.out.println( + "Install " + name + "(" + artifact + ") to " + installDir.getAbsolutePath() + " ... "); try { depResolver.load(artifact, installDir); - System.out.println("Interpreter " + name + " installed under " + - installDir.getAbsolutePath() + "."); + System.out.println( + "Interpreter " + name + " installed under " + installDir.getAbsolutePath() + "."); startTip(); } catch (RepositoryException e) { e.printStackTrace(); @@ -188,29 +180,32 @@ public class InstallInterpreter { System.out.println("Options"); System.out.println(" -l, --list List available interpreters"); System.out.println(" -a, --all Install all available interpreters"); - System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " + - "list)" + - "e.g. md,shell,jdbc,python,angular"); - System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + - ". " + - "(comma separated list correspond to --name) " + - "e.g. customGroup:customArtifact:customVersion"); + System.out.println( + " -n, --name [NAMES] Install interpreters (comma separated " + + "list)" + + "e.g. md,shell,jdbc,python,angular"); + System.out.println( + " -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + + ". " + + "(comma separated list correspond to --name) " + + "e.g. customGroup:customArtifact:customVersion"); System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port"); System.out.println(" --proxy-user [user] (Optional) proxy user"); System.out.println(" --proxy-password [password] (Optional) proxy password"); } - public static void main(String [] args) throws IOException { + public static void main(String[] args) throws IOException { if (args.length == 0) { usage(); return; } ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - InstallInterpreter installer = new InstallInterpreter( - new File(conf.getInterpreterListPath()), - new File(conf.getInterpreterDir()), - conf.getInterpreterLocalRepoPath()); + InstallInterpreter installer = + new InstallInterpreter( + new File(conf.getInterpreterListPath()), + new File(conf.getInterpreterDir()), + conf.getInterpreterLocalRepoPath()); String names = null; String artifacts = null; @@ -281,8 +276,9 @@ public class InstallInterpreter { } private static void startTip() { - System.out.println("\n1. Restart Zeppelin" - + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" - + "\n3. Then you can bind the interpreter on your note"); + System.out.println( + "\n1. Restart Zeppelin" + + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" + + "\n3. Then you can bind the interpreter on your note"); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java index 5a62d22..3baa05c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java @@ -15,29 +15,20 @@ * limitations under the License. */ - package org.apache.zeppelin.interpreter.lifecycle; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -/** - * Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. - */ +/** Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. */ public class NullLifecycleManager implements LifecycleManager { - public NullLifecycleManager(ZeppelinConfiguration zConf) { - - } + public NullLifecycleManager(ZeppelinConfiguration zConf) {} @Override - public void onInterpreterProcessStarted(ManagedInterpreterGroup interpreterGroup) { - - } + public void onInterpreterProcessStarted(ManagedInterpreterGroup interpreterGroup) {} @Override - public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) { - - } + public void onInterpreterUse(ManagedInterpreterGroup interpreterGroup, String sessionId) {} } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java index 90f3f55..b580c49 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -1,24 +1,22 @@ package org.apache.zeppelin.interpreter.lifecycle; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Timer; -import java.util.TimerTask; -import java.util.concurrent.ConcurrentHashMap; - - /** * This lifecycle manager would close interpreter after it is timeout. By default, it is timeout * after no using in 1 hour. * - * For now, this class only manage the lifecycle of interpreter group (will close interpreter - * process after timeout). Managing the lifecycle of interpreter session could be done in future - * if necessary. + * <p>For now, this class only manage the lifecycle of interpreter group (will close interpreter + * process after timeout). Managing the lifecycle of interpreter session could be done in future if + * necessary. */ public class TimeoutLifecycleManager implements LifecycleManager { @@ -33,28 +31,38 @@ public class TimeoutLifecycleManager implements LifecycleManager { private Timer checkTimer; public TimeoutLifecycleManager(ZeppelinConfiguration zConf) { - this.checkInterval = zConf.getLong(ZeppelinConfiguration.ConfVars - .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); - this.timeoutThreshold = zConf.getLong( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); + this.checkInterval = + zConf.getLong( + ZeppelinConfiguration.ConfVars + .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); + this.timeoutThreshold = + zConf.getLong( + ZeppelinConfiguration.ConfVars + .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); this.checkTimer = new Timer(true); - this.checkTimer.scheduleAtFixedRate(new TimerTask() { - @Override - public void run() { - long now = System.currentTimeMillis(); - for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) { - ManagedInterpreterGroup interpreterGroup = entry.getKey(); - Long lastTimeUsing = entry.getValue(); - if ((now - lastTimeUsing) > timeoutThreshold ) { - LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId()); - interpreterGroup.close(); - interpreterGroups.remove(entry.getKey()); + this.checkTimer.scheduleAtFixedRate( + new TimerTask() { + @Override + public void run() { + long now = System.currentTimeMillis(); + for (Map.Entry<ManagedInterpreterGroup, Long> entry : interpreterGroups.entrySet()) { + ManagedInterpreterGroup interpreterGroup = entry.getKey(); + Long lastTimeUsing = entry.getValue(); + if ((now - lastTimeUsing) > timeoutThreshold) { + LOGGER.info("InterpreterGroup {} is timeout.", interpreterGroup.getId()); + interpreterGroup.close(); + interpreterGroups.remove(entry.getKey()); + } + } } - } - } - }, checkInterval, checkInterval); - LOGGER.info("TimeoutLifecycleManager is started with checkinterval: " + checkInterval - + ", timeoutThreshold: " + timeoutThreshold); + }, + checkInterval, + checkInterval); + LOGGER.info( + "TimeoutLifecycleManager is started with checkinterval: " + + checkInterval + + ", timeoutThreshold: " + + timeoutThreshold); } @Override http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index 4dffff1..e854702 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -17,6 +17,11 @@ package org.apache.zeppelin.interpreter.recovery; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.zeppelin.conf.ZeppelinConfiguration; @@ -30,18 +35,10 @@ import org.apache.zeppelin.notebook.FileSystemStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - - /** * Hadoop compatible FileSystem based RecoveryStorage implementation. * - * Save InterpreterProcess in the format of: - * InterpreterGroupId host:port + * <p>Save InterpreterProcess in the format of: InterpreterGroupId host:port */ public class FileSystemRecoveryStorage extends RecoveryStorage { @@ -51,15 +48,15 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { private FileSystemStorage fs; private Path recoveryDir; - public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, - InterpreterSettingManager interpreterSettingManager) + public FileSystemRecoveryStorage( + ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { super(zConf); this.interpreterSettingManager = interpreterSettingManager; this.zConf = zConf; this.fs = new FileSystemStorage(zConf, zConf.getRecoveryDir()); - LOGGER.info("Creating FileSystem: " + this.fs.getFs().getClass().getName() + - " for Zeppelin Recovery."); + LOGGER.info( + "Creating FileSystem: " + this.fs.getFs().getClass().getName() + " for Zeppelin Recovery."); this.recoveryDir = this.fs.makeQualified(new Path(zConf.getRecoveryDir())); LOGGER.info("Using folder {} to store recovery data", recoveryDir); this.fs.tryMkDir(recoveryDir); @@ -82,8 +79,12 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { for (ManagedInterpreterGroup interpreterGroup : interpreterSetting.getAllInterpreterGroups()) { RemoteInterpreterProcess interpreterProcess = interpreterGroup.getInterpreterProcess(); if (interpreterProcess != null) { - recoveryContent.add(interpreterGroup.getId() + "\t" + interpreterProcess.getHost() + ":" + - interpreterProcess.getPort()); + recoveryContent.add( + interpreterGroup.getId() + + "\t" + + interpreterProcess.getHost() + + ":" + + interpreterProcess.getPort()); } } LOGGER.debug("Updating recovery data for interpreterSetting: " + interpreterSettingName); @@ -99,8 +100,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { for (Path path : paths) { String fileName = path.getName(); - String interpreterSettingName = fileName.substring(0, - fileName.length() - ".recovery".length()); + String interpreterSettingName = + fileName.substring(0, fileName.length() - ".recovery".length()); String recoveryContent = fs.readFile(path); if (!StringUtils.isBlank(recoveryContent)) { for (String line : recoveryContent.split(System.lineSeparator())) { @@ -109,8 +110,12 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { String[] hostPort = tokens[1].split(":"); int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( - interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); + RemoteInterpreterRunningProcess client = + new RemoteInterpreterRunningProcess( + interpreterSettingName, + connectTimeout, + hostPort[0], + Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh clients.put(groupId, client); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java index 3a7d12c..c0bcdca 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -17,35 +17,26 @@ package org.apache.zeppelin.interpreter.recovery; +import java.io.IOException; +import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; -import java.io.IOException; -import java.util.Map; - - -/** - * RecoveryStorage that do nothing, used when recovery is not enabled. - * - */ +/** RecoveryStorage that do nothing, used when recovery is not enabled. */ public class NullRecoveryStorage extends RecoveryStorage { - public NullRecoveryStorage(ZeppelinConfiguration zConf, - InterpreterSettingManager interpreterSettingManager) + public NullRecoveryStorage( + ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { super(zConf); } @Override - public void onInterpreterClientStart(InterpreterClient client) throws IOException { - - } + public void onInterpreterClientStart(InterpreterClient client) throws IOException {} @Override - public void onInterpreterClientStop(InterpreterClient client) throws IOException { - - } + public void onInterpreterClientStop(InterpreterClient client) throws IOException {} @Override public Map<String, InterpreterClient> restore() throws IOException { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java index d74b162..7808bf2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java @@ -1,5 +1,7 @@ package org.apache.zeppelin.interpreter.recovery; +import java.io.IOException; +import java.util.Map; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; @@ -7,14 +9,10 @@ import org.apache.zeppelin.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.Map; - - /** - * Utility class for stopping interpreter in the case that you want to stop all the - * interpreter process even when you enable recovery, or you want to kill interpreter process - * to avoid orphan process. + * Utility class for stopping interpreter in the case that you want to stop all the interpreter + * process even when you enable recovery, or you want to kill interpreter process to avoid orphan + * process. */ public class StopInterpreter { @@ -24,9 +22,11 @@ public class StopInterpreter { ZeppelinConfiguration zConf = ZeppelinConfiguration.create(); RecoveryStorage recoveryStorage = null; - recoveryStorage = ReflectionUtils.createClazzInstance(zConf.getRecoveryStorageClass(), - new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, - new Object[] {zConf, null}); + recoveryStorage = + ReflectionUtils.createClazzInstance( + zConf.getRecoveryStorageClass(), + new Class[] {ZeppelinConfiguration.class, InterpreterSettingManager.class}, + new Object[] {zConf, null}); LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName()); Map<String, InterpreterClient> restoredClients = recoveryStorage.restore(); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java index b139404..e6f08da 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -17,10 +17,7 @@ package org.apache.zeppelin.interpreter.remote; -/** - * This element stores the buffered - * append-data of paragraph's output. - */ +/** This element stores the buffered append-data of paragraph's output. */ public class AppendOutputBuffer { private String noteId; @@ -50,5 +47,4 @@ public class AppendOutputBuffer { public String getData() { return data; } - } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java index 2a88dc2..54a75ac 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -17,26 +17,22 @@ package org.apache.zeppelin.interpreter.remote; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This thread sends paragraph's append-data - * periodically, rather than continously, with - * a period of BUFFER_TIME_MS. It handles append-data - * for all paragraphs across all notebooks. + * This thread sends paragraph's append-data periodically, rather than continously, with a period of + * BUFFER_TIME_MS. It handles append-data for all paragraphs across all notebooks. */ public class AppendOutputRunner implements Runnable { - private static final Logger logger = - LoggerFactory.getLogger(AppendOutputRunner.class); + private static final Logger logger = LoggerFactory.getLogger(AppendOutputRunner.class); public static final Long BUFFER_TIME_MS = new Long(100); private static final Long SAFE_PROCESSING_TIME = new Long(10); private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); @@ -70,14 +66,16 @@ public class AppendOutputRunner implements Runnable { Long processingStartTime = System.currentTimeMillis(); queue.drainTo(list); - for (AppendOutputBuffer buffer: list) { + for (AppendOutputBuffer buffer : list) { String noteId = buffer.getNoteId(); String paragraphId = buffer.getParagraphId(); int index = buffer.getIndex(); String stringBufferKey = noteId + ":" + paragraphId + ":" + index; - StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ? - stringBufferMap.get(stringBufferKey) : new StringBuilder(); + StringBuilder builder = + stringBufferMap.containsKey(stringBufferKey) + ? stringBufferMap.get(stringBufferKey) + : new StringBuilder(); builder.append(buffer.getData()); stringBufferMap.put(stringBufferKey, builder); @@ -85,11 +83,12 @@ public class AppendOutputRunner implements Runnable { Long processingTime = System.currentTimeMillis() - processingStartTime; if (processingTime > SAFE_PROCESSING_TIME) { - logger.warn("Processing time for buffered append-output is high: " + - processingTime + " milliseconds."); + logger.warn( + "Processing time for buffered append-output is high: " + + processingTime + + " milliseconds."); } else { - logger.debug("Processing time for append-output took " - + processingTime + " milliseconds"); + logger.debug("Processing time for append-output took " + processingTime + " milliseconds"); } Long sizeProcessed = new Long(0); @@ -101,16 +100,14 @@ public class AppendOutputRunner implements Runnable { } if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { - logger.warn("Processing size for buffered append-output is high: " + - sizeProcessed + " characters."); + logger.warn( + "Processing size for buffered append-output is high: " + sizeProcessed + " characters."); } else { - logger.debug("Processing size for append-output is " + - sizeProcessed + " characters"); + logger.debug("Processing size for append-output is " + sizeProcessed + " characters"); } } public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) { queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend)); } - } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index b2cb78f..346bb26 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -19,7 +19,6 @@ package org.apache.zeppelin.interpreter.remote; import java.util.HashMap; import java.util.Map; - import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; @@ -31,10 +30,8 @@ import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -/** - * - */ -public class ClientFactory extends BasePooledObjectFactory<Client>{ +/** */ +public class ClientFactory extends BasePooledObjectFactory<Client> { private String host; private int port; Map<Client, TSocket> clientSocketMap = new HashMap<>(); @@ -53,7 +50,7 @@ public class ClientFactory extends BasePooledObjectFactory<Client>{ throw new InterpreterException(e); } - TProtocol protocol = new TBinaryProtocol(transport); + TProtocol protocol = new TBinaryProtocol(transport); Client client = new RemoteInterpreterService.Client(protocol); synchronized (clientSocketMap) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java index 62c8efd..896ec47 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java @@ -19,26 +19,27 @@ package org.apache.zeppelin.interpreter.remote; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -/** - * Proxy for AngularObject that exists in remote interpreter process - */ +/** Proxy for AngularObject that exists in remote interpreter process */ public class RemoteAngularObject extends AngularObject { private transient ManagedInterpreterGroup interpreterGroup; - RemoteAngularObject(String name, Object o, String noteId, String paragraphId, - ManagedInterpreterGroup interpreterGroup, - AngularObjectListener listener) { + RemoteAngularObject( + String name, + Object o, + String noteId, + String paragraphId, + ManagedInterpreterGroup interpreterGroup, + AngularObjectListener listener) { super(name, o, noteId, paragraphId, listener); this.interpreterGroup = interpreterGroup; } @Override public void set(Object o, boolean emit) { - set(o, emit, true); + set(o, emit, true); } public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) { @@ -46,9 +47,9 @@ public class RemoteAngularObject extends AngularObject { if (emitRemoteProcess) { // send updated value to remote interpreter - interpreterGroup.getRemoteInterpreterProcess(). - updateRemoteAngularObject( - getName(), getNoteId(), getParagraphId(), o); + interpreterGroup + .getRemoteInterpreterProcess() + .updateRemoteAngularObject(getName(), getNoteId(), getParagraphId(), o); } } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index 7458ce5..db7a330 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -18,28 +18,24 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; -import org.apache.thrift.TException; +import java.util.List; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; - -/** - * Proxy for AngularObjectRegistry that exists in remote interpreter process - */ +/** Proxy for AngularObjectRegistry that exists in remote interpreter process */ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); private ManagedInterpreterGroup interpreterGroup; - public RemoteAngularObjectRegistry(String interpreterId, - AngularObjectRegistryListener listener, - ManagedInterpreterGroup interpreterGroup) { + public RemoteAngularObjectRegistry( + String interpreterId, + AngularObjectRegistryListener listener, + ManagedInterpreterGroup interpreterGroup) { super(interpreterId, listener); this.interpreterGroup = interpreterGroup; } @@ -49,17 +45,16 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } /** - * When ZeppelinServer side code want to add angularObject to the registry, - * this method should be used instead of add() + * When ZeppelinServer side code want to add angularObject to the registry, this method should be + * used instead of add() + * * @param name * @param o * @param noteId * @return */ - public AngularObject addAndNotifyRemoteProcess(final String name, - final Object o, - final String noteId, - final String paragraphId) { + public AngularObject addAndNotifyRemoteProcess( + final String name, final Object o, final String noteId, final String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (!remoteInterpreterProcess.isRunning()) { @@ -74,41 +69,38 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); return null; } - } - ); + }); return super.add(name, o, noteId, paragraphId, true); - } /** - * When ZeppelinServer side code want to remove angularObject from the registry, - * this method should be used instead of remove() + * When ZeppelinServer side code want to remove angularObject from the registry, this method + * should be used instead of remove() + * * @param name * @param noteId * @param paragraphId * @return */ - public AngularObject removeAndNotifyRemoteProcess(final String name, - final String noteId, - final String paragraphId) { + public AngularObject removeAndNotifyRemoteProcess( + final String name, final String noteId, final String paragraphId) { RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { return super.remove(name, noteId, paragraphId); } remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.angularObjectRemove(name, noteId, paragraphId); - return null; - } - } - ); + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.angularObjectRemove(name, noteId, paragraphId); + return null; + } + }); return super.remove(name, noteId, paragraphId); } - + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { List<AngularObject> all = getAll(noteId, paragraphId); for (AngularObject ao : all) { @@ -117,9 +109,9 @@ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { } @Override - protected AngularObject createNewAngularObject(String name, Object o, String noteId, String - paragraphId) { - return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, - getAngularObjectListener()); + protected AngularObject createNewAngularObject( + String name, Object o, String noteId, String paragraphId) { + return new RemoteAngularObject( + name, o, noteId, paragraphId, interpreterGroup, getAngularObjectListener()); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 6f9f81f..795b02f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -20,6 +20,10 @@ package org.apache.zeppelin.interpreter.remote; import com.google.common.annotations.VisibleForTesting; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; import org.apache.thrift.TException; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; @@ -45,20 +49,11 @@ import org.apache.zeppelin.scheduler.SchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * Proxy for Interpreter instance that runs on separate process - */ +/** Proxy for Interpreter instance that runs on separate process */ public class RemoteInterpreter extends Interpreter { private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreter.class); private static final Gson gson = new Gson(); - private String className; private String sessionId; private FormType formType; @@ -69,14 +64,13 @@ public class RemoteInterpreter extends Interpreter { private LifecycleManager lifecycleManager; - /** - * Remote interpreter and manage interpreter process - */ - public RemoteInterpreter(Properties properties, - String sessionId, - String className, - String userName, - LifecycleManager lifecycleManager) { + /** Remote interpreter and manage interpreter process */ + public RemoteInterpreter( + Properties properties, + String sessionId, + String className, + String userName, + LifecycleManager lifecycleManager) { super(properties); this.sessionId = sessionId; this.className = className; @@ -124,8 +118,8 @@ public class RemoteInterpreter extends Interpreter { // The why we we create all the interpreter of the session is because some interpreter // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. // also see method Interpreter.getInterpreterInTheSameSessionByClassName - for (Interpreter interpreter : getInterpreterGroup() - .getOrCreateSession(this.getUserName(), sessionId)) { + for (Interpreter interpreter : + getInterpreterGroup().getOrCreateSession(this.getUserName(), sessionId)) { try { if (!(interpreter instanceof ConfInterpreter)) { ((RemoteInterpreter) interpreter).internal_create(); @@ -135,22 +129,23 @@ public class RemoteInterpreter extends Interpreter { } } - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Open RemoteInterpreter {}", getClassName()); - // open interpreter here instead of in the jobRun method in RemoteInterpreterServer - // client.open(sessionId, className); - // Push angular object loaded from JSON file to remote interpreter - synchronized (getInterpreterGroup()) { - if (!getInterpreterGroup().isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - getInterpreterGroup().setAngularRegistryPushed(true); + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Open RemoteInterpreter {}", getClassName()); + // open interpreter here instead of in the jobRun method in RemoteInterpreterServer + // client.open(sessionId, className); + // Push angular object loaded from JSON file to remote interpreter + synchronized (getInterpreterGroup()) { + if (!getInterpreterGroup().isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + getInterpreterGroup().setAngularRegistryPushed(true); + } + } + return null; } - } - return null; - } - }); + }); isOpened = true; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } @@ -161,21 +156,25 @@ public class RemoteInterpreter extends Interpreter { synchronized (this) { if (!isCreated) { this.interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Create RemoteInterpreter {}", getClassName()); - client.createInterpreter(getInterpreterGroup().getId(), sessionId, - className, (Map) getProperties(), getUserName()); - return null; - } - }); + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + LOGGER.info("Create RemoteInterpreter {}", getClassName()); + client.createInterpreter( + getInterpreterGroup().getId(), + sessionId, + className, + (Map) getProperties(), + getUserName()); + return null; + } + }); isCreated = true; } } } - @Override public void close() throws InterpreterException { if (isOpened) { @@ -185,13 +184,14 @@ public class RemoteInterpreter extends Interpreter { } catch (IOException e) { throw new InterpreterException(e); } - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.close(sessionId, className); - return null; - } - }); + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.close(sessionId, className); + return null; + } + }); isOpened = false; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } else { @@ -219,11 +219,13 @@ public class RemoteInterpreter extends Interpreter { @Override public InterpreterResult call(Client client) throws Exception { - RemoteInterpreterResult remoteResult = client.interpret( - sessionId, className, st, convert(context)); - Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson( - remoteResult.getConfig(), new TypeToken<Map<String, Object>>() { - }.getType()); + RemoteInterpreterResult remoteResult = + client.interpret(sessionId, className, st, convert(context)); + Map<String, Object> remoteConfig = + (Map<String, Object>) + gson.fromJson( + remoteResult.getConfig(), + new TypeToken<Map<String, Object>>() {}.getType()); context.getConfig().clear(); if (remoteConfig != null) { context.getConfig().putAll(remoteConfig); @@ -251,9 +253,7 @@ public class RemoteInterpreter extends Interpreter { InterpreterResult result = convert(remoteResult); return result; } - } - ); - + }); } @Override @@ -269,13 +269,14 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e); } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction<Void>() { - @Override - public Void call(Client client) throws Exception { - client.cancel(sessionId, className, convert(context)); - return null; - } - }); + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + @Override + public Void call(Client client) throws Exception { + client.cancel(sessionId, className, convert(context)); + return null; + } + }); } @Override @@ -297,18 +298,18 @@ public class RemoteInterpreter extends Interpreter { throw new InterpreterException(e); } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - FormType type = interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction<FormType>() { - @Override - public FormType call(Client client) throws Exception { - formType = FormType.valueOf(client.getFormType(sessionId, className)); - return formType; - } - }); + FormType type = + interpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<FormType>() { + @Override + public FormType call(Client client) throws Exception { + formType = FormType.valueOf(client.getFormType(sessionId, className)); + return formType; + } + }); return type; } - @Override public int getProgress(final InterpreterContext context) throws InterpreterException { if (!isOpened) { @@ -331,10 +332,9 @@ public class RemoteInterpreter extends Interpreter { }); } - @Override - public List<InterpreterCompletion> completion(final String buf, final int cursor, - final InterpreterContext interpreterContext) + public List<InterpreterCompletion> completion( + final String buf, final int cursor, final InterpreterContext interpreterContext) throws InterpreterException { if (!isOpened) { open(); @@ -350,8 +350,8 @@ public class RemoteInterpreter extends Interpreter { new RemoteInterpreterProcess.RemoteFunction<List<InterpreterCompletion>>() { @Override public List<InterpreterCompletion> call(Client client) throws Exception { - return client.completion(sessionId, className, buf, cursor, - convert(interpreterContext)); + return client.completion( + sessionId, className, buf, cursor, convert(interpreterContext)); } }); } @@ -377,36 +377,48 @@ public class RemoteInterpreter extends Interpreter { }); } - @Override public Scheduler getScheduler() { - int maxConcurrency = Integer.parseInt( - getProperty("zeppelin.interpreter.max.poolsize", - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); + int maxConcurrency = + Integer.parseInt( + getProperty( + "zeppelin.interpreter.max.poolsize", + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + + "")); // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs // running under the scheduler of this session will be aborted. - Scheduler s = new RemoteScheduler( - RemoteInterpreter.class.getSimpleName() + "-" + getInterpreterGroup().getId() + "-" - + sessionId, - SchedulerFactory.singleton().getExecutor(), - sessionId, - this, - SchedulerFactory.singleton(), - maxConcurrency); + Scheduler s = + new RemoteScheduler( + RemoteInterpreter.class.getSimpleName() + + "-" + + getInterpreterGroup().getId() + + "-" + + sessionId, + SchedulerFactory.singleton().getExecutor(), + sessionId, + this, + SchedulerFactory.singleton(), + maxConcurrency); return SchedulerFactory.singleton().createOrGetScheduler(s); } private RemoteInterpreterContext convert(InterpreterContext ic) { - return new RemoteInterpreterContext(ic.getNoteId(), ic.getNoteName(), ic.getParagraphId(), - ic.getReplName(), ic.getParagraphTitle(), ic.getParagraphText(), - gson.toJson(ic.getAuthenticationInfo()), gson.toJson(ic.getConfig()), ic.getGui().toJson(), + return new RemoteInterpreterContext( + ic.getNoteId(), + ic.getNoteName(), + ic.getParagraphId(), + ic.getReplName(), + ic.getParagraphTitle(), + ic.getParagraphText(), + gson.toJson(ic.getAuthenticationInfo()), + gson.toJson(ic.getConfig()), + ic.getGui().toJson(), gson.toJson(ic.getNoteGui()), ic.getLocalProperties()); } private InterpreterResult convert(RemoteInterpreterResult result) { - InterpreterResult r = new InterpreterResult( - InterpreterResult.Code.valueOf(result.getCode())); + InterpreterResult r = new InterpreterResult(InterpreterResult.Code.valueOf(result.getCode())); for (RemoteInterpreterResultMessage m : result.getMsg()) { r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData()); @@ -416,21 +428,20 @@ public class RemoteInterpreter extends Interpreter { } /** - * Push local angular object registry to - * remote interpreter. This method should be - * call ONLY once when the first Interpreter is created + * Push local angular object registry to remote interpreter. This method should be call ONLY once + * when the first Interpreter is created */ private void pushAngularObjectRegistryToRemote(Client client) throws TException { - final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup() - .getAngularObjectRegistry(); + final AngularObjectRegistry angularObjectRegistry = + this.getInterpreterGroup().getAngularObjectRegistry(); if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) { - final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry - .getRegistry(); - LOGGER.info("Push local angular object registry from ZeppelinServer to" + - " remote interpreter group {}", this.getInterpreterGroup().getId()); - final java.lang.reflect.Type registryType = new TypeToken<Map<String, - Map<String, AngularObject>>>() { - }.getType(); + final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry.getRegistry(); + LOGGER.info( + "Push local angular object registry from ZeppelinServer to" + + " remote interpreter group {}", + this.getInterpreterGroup().getId()); + final java.lang.reflect.Type registryType = + new TypeToken<Map<String, Map<String, AngularObject>>>() {}.getType(); client.angularRegistryPush(gson.toJson(registry, registryType)); } } http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index db6d263..1572fc2 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -18,6 +18,11 @@ package org.apache.zeppelin.interpreter.remote; import com.google.common.annotations.VisibleForTesting; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.exec.CommandLine; import org.apache.commons.exec.DefaultExecutor; import org.apache.commons.exec.ExecuteException; @@ -30,19 +35,11 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -/** - * This class manages start / stop of remote interpreter process - */ +/** This class manages start / stop of remote interpreter process */ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler { - private static final Logger logger = LoggerFactory.getLogger( - RemoteInterpreterManagedProcess.class); + private static final Logger logger = + LoggerFactory.getLogger(RemoteInterpreterManagedProcess.class); private final String interpreterRunner; private final int zeppelinServerRPCPort; @@ -147,11 +144,15 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } } if (!running.get()) { - throw new IOException(new String( - String.format("Interpreter Process creation is time out in %d seconds", - getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " + - "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + - cmdOut.toString())); + throw new IOException( + new String( + String.format( + "Interpreter Process creation is time out in %d seconds", + getConnectTimeout() / 1000) + + "\n" + + "You can increase timeout threshold via " + + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + + cmdOut.toString())); } } catch (InterruptedException e) { logger.error("Remote interpreter is not accessible"); @@ -163,13 +164,14 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess if (isRunning()) { logger.info("Kill interpreter process"); try { - callRemoteFunction(new RemoteFunction<Void>() { - @Override - public Void call(RemoteInterpreterService.Client client) throws Exception { - client.shutdown(); - return null; - } - }); + callRemoteFunction( + new RemoteFunction<Void>() { + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.shutdown(); + return null; + } + }); } catch (Exception e) { logger.warn("ignore the exception when shutting down"); } @@ -186,7 +188,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess public void onProcessComplete(int exitValue) { logger.info("Interpreter process exited {}", exitValue); running.set(false); - } // called by RemoteInterpreterServer to notify that RemoteInterpreter Process is started @@ -253,7 +254,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } @Override - public void write(byte [] b) throws IOException { + public void write(byte[] b) throws IOException { super.write(b); if (out != null) { @@ -266,7 +267,7 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess } @Override - public void write(byte [] b, int offset, int len) throws IOException { + public void write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); if (out != null) { http://git-wip-us.apache.org/repos/asf/zeppelin/blob/55f6c91c/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index e8b3482..5e50265 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -24,17 +24,14 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** - * Abstract class for interpreter process - */ +/** Abstract class for interpreter process */ public abstract class RemoteInterpreterProcess implements InterpreterClient { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); private GenericObjectPool<Client> clientPool; private int connectTimeout; - public RemoteInterpreterProcess( - int connectTimeout) { + public RemoteInterpreterProcess(int connectTimeout) { this.connectTimeout = connectTimeout; } @@ -74,8 +71,8 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { } /** - * Called when angular object is updated in client side to propagate - * change to the remote process + * Called when angular object is updated in client side to propagate change to the remote process + * * @param name * @param o */ @@ -85,8 +82,10 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { client = getClient(); } catch (NullPointerException e) { // remote process not started - logger.info("NullPointerException in RemoteInterpreterProcess while " + - "updateRemoteAngularObject getClient, remote process not started", e); + logger.info( + "NullPointerException in RemoteInterpreterProcess while " + + "updateRemoteAngularObject getClient, remote process not started", + e); return; } catch (Exception e) { logger.error("Can't update angular object", e); @@ -130,10 +129,7 @@ public abstract class RemoteInterpreterProcess implements InterpreterClient { return null; } - /** - * - * @param <T> - */ + /** @param <T> */ public interface RemoteFunction<T> { T call(Client client) throws Exception; }
