http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 12545d6..585a58a 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -17,212 +17,205 @@ package org.apache.zeppelin.interpreter; -import java.io.BufferedReader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.zeppelin.conf.ZeppelinConfiguration; +import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; +import org.apache.zeppelin.dep.Dependency; +import org.apache.zeppelin.dep.DependencyResolver; +import org.apache.zeppelin.display.AngularObjectRegistryListener; +import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.resource.Resource; +import org.apache.zeppelin.resource.ResourcePool; +import org.apache.zeppelin.resource.ResourceSet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.sonatype.aether.repository.Authentication; +import org.sonatype.aether.repository.Proxy; +import org.sonatype.aether.repository.RemoteRepository; + import java.io.File; import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.io.OutputStreamWriter; import java.lang.reflect.Type; import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; import java.nio.file.DirectoryStream.Filter; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.attribute.PosixFilePermission; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.EnumSet; -import java.util.Enumeration; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; -import org.apache.zeppelin.dep.Dependency; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Job.Status; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositoryException; -import org.sonatype.aether.repository.Authentication; -import org.sonatype.aether.repository.Proxy; -import org.sonatype.aether.repository.RemoteRepository; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.internal.StringMap; -import com.google.gson.reflect.TypeToken; - -import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; -import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; /** - * TBD + * InterpreterSettingManager is the component which manage all the interpreter settings. + * (load/create/update/remove/get) + * Besides that InterpreterSettingManager also manage the interpreter setting binding. + * TODO(zjffdu) We could move it into another separated component. */ public class InterpreterSettingManager { - private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class); - private static final String SHARED_SESSION = "shared_session"; + private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterSettingManager.class); private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( "language", (Object) "text", "editOnDblClick", false); - private final ZeppelinConfiguration zeppelinConfiguration; + private final ZeppelinConfiguration conf; private final Path interpreterDirPath; - private final Path interpreterBindingPath; + private final Path interpreterSettingPath; /** - * This is only references with default settings, name and properties - * key: InterpreterSetting.name + * This is only InterpreterSetting templates with default name and properties + * name --> InterpreterSetting */ - private final Map<String, InterpreterSetting> interpreterSettingsRef; + private final Map<String, InterpreterSetting> interpreterSettingTemplates = + Maps.newConcurrentMap(); /** * This is used by creating and running Interpreters - * key: InterpreterSetting.id <- This is becuase backward compatibility + * id --> InterpreterSetting + * TODO(zjffdu) change it to name --> InterpreterSetting */ - private final Map<String, InterpreterSetting> interpreterSettings; - private final Map<String, List<String>> interpreterBindings; - - private final DependencyResolver dependencyResolver; - private final List<RemoteRepository> interpreterRepositories; + private final Map<String, InterpreterSetting> interpreterSettings = + Maps.newConcurrentMap(); - private final InterpreterOption defaultOption; + /** + * noteId --> list of InterpreterSettingId + */ + private final Map<String, List<String>> interpreterBindings = + Maps.newConcurrentMap(); - private final Map<String, URLClassLoader> cleanCl; + private final List<RemoteRepository> interpreterRepositories; + private InterpreterOption defaultOption; + private List<String> interpreterGroupOrderList; + private final Gson gson; - @Deprecated - private String[] interpreterClassList; - private String[] interpreterGroupOrderList; - private InterpreterGroupFactory interpreterGroupFactory; + private AngularObjectRegistryListener angularObjectRegistryListener; + private RemoteInterpreterProcessListener remoteInterpreterProcessListener; + private ApplicationEventListener appEventListener; + private DependencyResolver dependencyResolver; - private final Gson gson; public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, - DependencyResolver dependencyResolver, InterpreterOption interpreterOption) - throws IOException, RepositoryException { - this.zeppelinConfiguration = zeppelinConfiguration; - this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir()); - logger.debug("InterpreterRootPath: {}", interpreterDirPath); - this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath()); - logger.debug("InterpreterBindingPath: {}", interpreterBindingPath); - - this.interpreterSettingsRef = Maps.newConcurrentMap(); - this.interpreterSettings = Maps.newConcurrentMap(); - this.interpreterBindings = Maps.newConcurrentMap(); - - this.dependencyResolver = dependencyResolver; + AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener + remoteInterpreterProcessListener, + ApplicationEventListener appEventListener) + throws IOException { + this(zeppelinConfiguration, new InterpreterOption(true), + angularObjectRegistryListener, + remoteInterpreterProcessListener, + appEventListener); + } + + @VisibleForTesting + public InterpreterSettingManager(ZeppelinConfiguration conf, + InterpreterOption defaultOption, + AngularObjectRegistryListener angularObjectRegistryListener, + RemoteInterpreterProcessListener + remoteInterpreterProcessListener, + ApplicationEventListener appEventListener) throws IOException { + this.conf = conf; + this.defaultOption = defaultOption; + this.interpreterDirPath = Paths.get(conf.getInterpreterDir()); + LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath); + this.interpreterSettingPath = Paths.get(conf.getInterpreterSettingPath()); + LOGGER.debug("InterpreterBindingPath: {}", interpreterSettingPath); + this.dependencyResolver = new DependencyResolver( + conf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO)); this.interpreterRepositories = dependencyResolver.getRepos(); + this.interpreterGroupOrderList = Arrays.asList(conf.getString( + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER).split(",")); + this.gson = new GsonBuilder().setPrettyPrinting().create(); - this.defaultOption = interpreterOption; - - this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); - - String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS); - this.interpreterClassList = replsConf.split(","); - String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER); - this.interpreterGroupOrderList = groupOrder.split(","); - - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.setPrettyPrinting(); - this.gson = gsonBuilder.create(); - + this.angularObjectRegistryListener = angularObjectRegistryListener; + this.remoteInterpreterProcessListener = remoteInterpreterProcessListener; + this.appEventListener = appEventListener; init(); } /** - * Remember this method doesn't keep current connections after being called + * Load interpreter setting from interpreter-setting.json */ private void loadFromFile() { - if (!Files.exists(interpreterBindingPath)) { + if (!Files.exists(interpreterSettingPath)) { // nothing to read + LOGGER.warn("Interpreter Setting file {} doesn't exist", interpreterSettingPath); return; } - InterpreterInfoSaving infoSaving; - try (BufferedReader jsonReader = - Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) { - JsonParser jsonParser = new JsonParser(); - JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject(); - infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class); - - for (String k : infoSaving.interpreterSettings.keySet()) { - InterpreterSetting setting = infoSaving.interpreterSettings.get(k); - - setting.convertFlatPropertiesToPropertiesWithWidgets(); - - List<InterpreterInfo> infos = setting.getInterpreterInfos(); - - // Convert json StringMap to Properties - StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties(); - Map<String, InterpreterProperty> properties = new HashMap(); - for (String key : p.keySet()) { - StringMap<String> fields = (StringMap<String>) p.get(key); - String type = InterpreterPropertyType.TEXTAREA.getValue(); - try { - type = InterpreterPropertyType.byValue(fields.get("type")).getValue(); - } catch (Exception e) { - logger.warn("Incorrect type of property {} in settings {}", key, - setting.getId()); - } - properties.put(key, new InterpreterProperty(key, fields.get("value"), type)); - } - setting.setProperties(properties); - - // Always use separate interpreter process - // While we decided to turn this feature on always (without providing - // enable/disable option on GUI). - // previously created setting should turn this feature on here. - setting.getOption().setRemote(true); - - setting.convertPermissionsFromUsersToOwners( - jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId())); - - // Update transient information from InterpreterSettingRef - InterpreterSetting interpreterSettingObject = - interpreterSettingsRef.get(setting.getGroup()); - if (interpreterSettingObject == null) { - logger.warn("can't get InterpreterSetting " + - "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup()); - continue; + + try { + InterpreterInfoSaving infoSaving = InterpreterInfoSaving.loadFromFile(interpreterSettingPath); + //TODO(zjffdu) still ugly (should move all to InterpreterInfoSaving) + for (InterpreterSetting savedInterpreterSetting : infoSaving.interpreterSettings.values()) { + savedInterpreterSetting.setConf(conf); + savedInterpreterSetting.setInterpreterSettingManager(this); + savedInterpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + savedInterpreterSetting.setRemoteInterpreterProcessListener( + remoteInterpreterProcessListener); + savedInterpreterSetting.setAppEventListener(appEventListener); + savedInterpreterSetting.setDependencyResolver(dependencyResolver); + savedInterpreterSetting.setProperties(InterpreterSetting.convertInterpreterProperties( + savedInterpreterSetting.getProperties() + )); + + InterpreterSetting interpreterSettingTemplate = + interpreterSettingTemplates.get(savedInterpreterSetting.getGroup()); + // InterpreterSettingTemplate is from interpreter-setting.json which represent the latest + // InterpreterSetting, while InterpreterSetting is from interpreter.json which represent + // the user saved interpreter setting + if (interpreterSettingTemplate != null) { + savedInterpreterSetting.setInterpreterDir(interpreterSettingTemplate.getInterpreterDir()); + // merge properties from interpreter-setting.json and interpreter.json + Map<String, InterpreterProperty> mergedProperties = + new HashMap<>(InterpreterSetting.convertInterpreterProperties( + interpreterSettingTemplate.getProperties())); + mergedProperties.putAll(InterpreterSetting.convertInterpreterProperties( + savedInterpreterSetting.getProperties())); + savedInterpreterSetting.setProperties(mergedProperties); + // merge InterpreterInfo + savedInterpreterSetting.setInterpreterInfos( + interpreterSettingTemplate.getInterpreterInfos()); + savedInterpreterSetting.setInterpreterRunner( + interpreterSettingTemplate.getInterpreterRunner()); + } else { + LOGGER.warn("No InterpreterSetting Template found for InterpreterSetting: " + + savedInterpreterSetting.getGroup()); } - String depClassPath = interpreterSettingObject.getPath(); - setting.setPath(depClassPath); - - for (InterpreterInfo info : infos) { - if (info.getEditor() == null) { - Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject, - info.getClassName()); - info.setEditor(editor); + + // Overwrite the default InterpreterSetting we registered from InterpreterSetting Templates + // remove it first + for (InterpreterSetting setting : interpreterSettings.values()) { + if (setting.getName().equals(savedInterpreterSetting.getName())) { + interpreterSettings.remove(setting.getId()); } } - - setting.setInterpreterGroupFactory(interpreterGroupFactory); - - loadInterpreterDependencies(setting); - interpreterSettings.put(k, setting); + savedInterpreterSetting.postProcessing(); + LOGGER.info("Create Interpreter Setting {} from interpreter.json", + savedInterpreterSetting.getName()); + interpreterSettings.put(savedInterpreterSetting.getId(), savedInterpreterSetting); } interpreterBindings.putAll(infoSaving.interpreterBindings); @@ -235,53 +228,27 @@ public class InterpreterSettingManager { } } } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("Fail to load interpreter setting configuration file: " + + interpreterSettingPath, e); } } public void saveToFile() throws IOException { - String jsonString; - synchronized (interpreterSettings) { InterpreterInfoSaving info = new InterpreterInfoSaving(); info.interpreterBindings = interpreterBindings; info.interpreterSettings = interpreterSettings; info.interpreterRepositories = interpreterRepositories; - - jsonString = info.toJson(); + info.saveToFile(interpreterSettingPath); } - - if (!Files.exists(interpreterBindingPath)) { - Files.createFile(interpreterBindingPath); - - try { - Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE); - Files.setPosixFilePermissions(interpreterBindingPath, permissions); - } catch (UnsupportedOperationException e) { - // File system does not support Posix file permissions (likely windows) - continue anyway. - logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath); - } - } - - FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false); - OutputStreamWriter out = new OutputStreamWriter(fos); - out.append(jsonString); - out.close(); - fos.close(); } - //TODO(jl): Fix it to remove InterpreterGroupFactory - public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { - for (InterpreterSetting setting : interpreterSettings.values()) { - setting.setInterpreterGroupFactory(interpreterGroupFactory); - } - this.interpreterGroupFactory = interpreterGroupFactory; - } + private void init() throws IOException { - private void init() throws InterpreterException, IOException, RepositoryException { - String interpreterJson = zeppelinConfiguration.getInterpreterJson(); + // 1. detect interpreter setting via interpreter-setting.json in each interpreter folder + // 2. detect interpreter setting in interpreter.json that is saved before + String interpreterJson = conf.getInterpreterJson(); ClassLoader cl = Thread.currentThread().getContextClassLoader(); - if (Files.exists(interpreterDirPath)) { for (Path interpreterDir : Files .newDirectoryStream(interpreterDirPath, new Filter<Path>() { @@ -298,227 +265,144 @@ public class InterpreterSettingManager { * interpreter-setting.json * 2. Register it from interpreter-setting.json in classpath * {ZEPPELIN_HOME}/interpreter/{interpreter_name} - * 3. Register it by Interpreter.register */ if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) { if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) { - /* - * TODO(jongyoul) - * - Remove these codes below because of legacy code - * - Support ThreadInterpreter - */ - URLClassLoader ccl = new URLClassLoader( - recursiveBuildLibList(interpreterDir.toFile()), cl); - for (String className : interpreterClassList) { - try { - // Load classes - Class.forName(className, true, ccl); - Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet(); - for (String interpreterKey : interpreterKeys) { - if (className - .equals(Interpreter.registeredInterpreters.get(interpreterKey) - .getClassName())) { - Interpreter.registeredInterpreters.get(interpreterKey) - .setPath(interpreterDirString); - logger.info("Interpreter " + interpreterKey + " found. class=" + className); - cleanCl.put(interpreterDirString, ccl); - } - } - } catch (Throwable t) { - // nothing to do - } - } + LOGGER.warn("No interpreter-setting.json found in " + interpreterDirPath); } } } - } - - for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters - .values()) { - logger - .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(), - registeredInterpreter.getClassName(), registeredInterpreter.getProperties()); - } - - // RegisteredInterpreters -> interpreterSettingRef - InterpreterInfo interpreterInfo; - for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) { - interpreterInfo = - new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(), - r.getEditor()); - add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(), - r.getRunner()); - } - - for (String settingId : interpreterSettingsRef.keySet()) { - InterpreterSetting setting = interpreterSettingsRef.get(settingId); - logger.info("InterpreterSettingRef name {}", setting.getName()); + } else { + LOGGER.warn("InterpreterDir {} doesn't exist", interpreterDirPath); } loadFromFile(); - - // if no interpreter settings are loaded, create default set - if (0 == interpreterSettings.size()) { - Map<String, InterpreterSetting> temp = new HashMap<>(); - InterpreterSetting interpreterSetting; - for (InterpreterSetting setting : interpreterSettingsRef.values()) { - interpreterSetting = createFromInterpreterSettingRef(setting); - temp.put(setting.getName(), interpreterSetting); - } - - for (String group : interpreterGroupOrderList) { - if (null != (interpreterSetting = temp.remove(group))) { - interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); - } - } - - for (InterpreterSetting setting : temp.values()) { - interpreterSettings.put(setting.getId(), setting); - } - - saveToFile(); - } - - for (String settingId : interpreterSettings.keySet()) { - InterpreterSetting setting = interpreterSettings.get(settingId); - logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId, - setting.getName()); - } + saveToFile(); } private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, - String interpreterJson) throws IOException, RepositoryException { + String interpreterJson) throws IOException { URL[] urls = recursiveBuildLibList(new File(interpreterDir)); ClassLoader tempClassLoader = new URLClassLoader(urls, cl); - Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson); - if (!interpreterSettings.hasMoreElements()) { + URL url = tempClassLoader.getResource(interpreterJson); + if (url == null) { return false; } - for (URL url : Collections.list(interpreterSettings)) { - try (InputStream inputStream = url.openStream()) { - logger.debug("Reading {} from {}", interpreterJson, url); - List<RegisteredInterpreter> registeredInterpreterList = - getInterpreterListFromJson(inputStream); - registerInterpreters(registeredInterpreterList, interpreterDir); - } - } + + LOGGER.debug("Reading interpreter-setting.json from {} as Resource", url); + List<RegisteredInterpreter> registeredInterpreterList = + getInterpreterListFromJson(url.openStream()); + registerInterpreterSetting(registeredInterpreterList, interpreterDir); return true; } private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson) - throws IOException, RepositoryException { + throws IOException { Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson); if (Files.exists(interpreterJsonPath)) { - logger.debug("Reading {}", interpreterJsonPath); + LOGGER.debug("Reading interpreter-setting.json from file {}", interpreterJsonPath); List<RegisteredInterpreter> registeredInterpreterList = - getInterpreterListFromJson(interpreterJsonPath); - registerInterpreters(registeredInterpreterList, interpreterDir); + getInterpreterListFromJson(new FileInputStream(interpreterJsonPath.toFile())); + registerInterpreterSetting(registeredInterpreterList, interpreterDir); return true; } return false; } - private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename) - throws FileNotFoundException { - return getInterpreterListFromJson(new FileInputStream(filename.toFile())); - } - private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) { Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() { }.getType(); return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType); } - private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters, - String absolutePath) throws IOException, RepositoryException { + private void registerInterpreterSetting(List<RegisteredInterpreter> registeredInterpreters, + String interpreterDir) throws IOException { + Map<String, DefaultInterpreterProperty> properties = new HashMap<>(); + List<InterpreterInfo> interpreterInfos = new ArrayList<>(); + InterpreterOption option = defaultOption; + String group = null; + InterpreterRunner runner = null; for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) { + //TODO(zjffdu) merge RegisteredInterpreter & InterpreterInfo InterpreterInfo interpreterInfo = new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(), registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor()); + group = registeredInterpreter.getGroup(); + runner = registeredInterpreter.getRunner(); // use defaultOption if it is not specified in interpreter-setting.json - InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption : - registeredInterpreter.getOption(); - add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(), - option, absolutePath, registeredInterpreter.getRunner()); - } - - } - - public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) { - if (settings == null || settings.isEmpty()) { - return null; - } - return settings.get(0); - } - + if (registeredInterpreter.getOption() != null) { + option = registeredInterpreter.getOption(); + } + properties.putAll(registeredInterpreter.getProperties()); + interpreterInfos.add(interpreterInfo); + } + + InterpreterSetting interpreterSettingTemplate = new InterpreterSetting.Builder() + .setGroup(group) + .setName(group) + .setInterpreterInfos(interpreterInfos) + .setProperties(properties) + .setDependencies(new ArrayList<Dependency>()) + .setOption(option) + .setRunner(runner) + .setInterpreterDir(interpreterDir) + .setRunner(runner) + .setConf(conf) + .setIntepreterSettingManager(this) + .create(); + + LOGGER.info("Register InterpreterSettingTemplate & InterpreterSetting: {}", + interpreterSettingTemplate.getName()); + interpreterSettingTemplates.put(interpreterSettingTemplate.getName(), + interpreterSettingTemplate); + + InterpreterSetting interpreterSetting = new InterpreterSetting(interpreterSettingTemplate); + interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + interpreterSetting.setAppEventListener(appEventListener); + interpreterSetting.setDependencyResolver(dependencyResolver); + interpreterSetting.setInterpreterSettingManager(this); + interpreterSetting.postProcessing(); + interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); + } + + @VisibleForTesting public InterpreterSetting getDefaultInterpreterSetting(String noteId) { - return getDefaultInterpreterSetting(getInterpreterSettings(noteId)); + return getInterpreterSettings(noteId).get(0); } public List<InterpreterSetting> getInterpreterSettings(String noteId) { - List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId); - LinkedList<InterpreterSetting> settings = new LinkedList<>(); - - Iterator<String> iter = interpreterSettingIds.iterator(); - while (iter.hasNext()) { - String id = iter.next(); - InterpreterSetting setting = get(id); - if (setting == null) { - // interpreter setting is removed from factory. remove id from here, too - iter.remove(); - } else { - settings.add(setting); + List<InterpreterSetting> settings = new ArrayList<>(); + synchronized (interpreterSettings) { + List<String> interpreterSettingIds = interpreterBindings.get(noteId); + if (interpreterSettingIds != null) { + for (String settingId : interpreterSettingIds) { + if (interpreterSettings.containsKey(settingId)) { + settings.add(interpreterSettings.get(settingId)); + } else { + LOGGER.warn("InterpreterSetting {} has been removed, but note {} still bind to it.", + settingId, noteId); + } + } } } return settings; } - private List<String> getNoteInterpreterSettingBinding(String noteId) { - LinkedList<String> bindings = new LinkedList<>(); - List<String> settingIds = interpreterBindings.get(noteId); - if (settingIds != null) { - bindings.addAll(settingIds); - } - return bindings; - } - - private InterpreterSetting createFromInterpreterSettingRef(String name) { - Preconditions.checkNotNull(name, "reference name should be not null"); - InterpreterSetting settingRef = interpreterSettingsRef.get(name); - return createFromInterpreterSettingRef(settingRef); - } - - private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) { - // should return immutable objects - List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ? - new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos()); - List<Dependency> deps = (null == o.getDependencies()) ? - new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies()); - Map<String, InterpreterProperty> props = - convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties()); - InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption()); - - InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(), - infos, props, deps, option, o.getPath(), o.getInterpreterRunner()); - setting.setInterpreterGroupFactory(interpreterGroupFactory); - return setting; - } - - private Map<String, InterpreterProperty> convertInterpreterProperties( - Map<String, DefaultInterpreterProperty> defaultProperties) { - Map<String, InterpreterProperty> properties = new HashMap<>(); - - for (String key : defaultProperties.keySet()) { - DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key); - properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(), - defaultInterpreterProperty.getType())); + public ManagedInterpreterGroup getInterpreterGroupById(String groupId) { + for (InterpreterSetting setting : interpreterSettings.values()) { + ManagedInterpreterGroup interpreterGroup = setting.getInterpreterGroup(groupId); + if (interpreterGroup != null) { + return interpreterGroup; + } } - return properties; + return null; } + //TODO(zjffdu) logic here is a little ugly public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId, String replName) { Map<String, Object> editor = DEFAULT_EDITOR; @@ -533,97 +417,133 @@ public class InterpreterSettingManager { } // when replName is 'name' of interpreter if (defaultSettingName.equals(intpSetting.getName())) { - editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); + editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName()); } // when replName is 'alias name' of interpreter or 'group' of interpreter if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) { - editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); + editor = intpSetting.getEditorFromSettingByClassName(interpreter.getClassName()); break; } } } catch (NullPointerException e) { // Use `debug` level because this log occurs frequently - logger.debug("Couldn't get interpreter editor setting"); + LOGGER.debug("Couldn't get interpreter editor setting"); } return editor; } - public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting, - String className) { - List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos(); - for (InterpreterInfo intpInfo : intpInfos) { + public List<ManagedInterpreterGroup> getAllInterpreterGroup() { + List<ManagedInterpreterGroup> interpreterGroups = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + interpreterGroups.addAll(interpreterSetting.getAllInterpreterGroups()); + } + return interpreterGroups; + } - if (className.equals(intpInfo.getClassName())) { - if (intpInfo.getEditor() == null) { - break; + //TODO(zjffdu) move Resource related api to ResourceManager + public ResourceSet getAllResources() { + return getAllResourcesExcept(null); + } + + private ResourceSet getAllResourcesExcept(String interpreterGroupExcludsion) { + ResourceSet resourceSet = new ResourceSet(); + for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) { + if (interpreterGroupExcludsion != null && + intpGroup.getId().equals(interpreterGroupExcludsion)) { + continue; + } + + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + } else if (remoteInterpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); + } + }); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); } - return intpInfo.getEditor(); } } - return DEFAULT_EDITOR; + return resourceSet; } - private void loadInterpreterDependencies(final InterpreterSetting setting) { - setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - setting.setErrorReason(null); - interpreterSettings.put(setting.getId(), setting); - synchronized (interpreterSettings) { - final Thread t = new Thread() { - public void run() { - try { - // dependencies to prevent library conflict - File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + - setting.getId()); - if (localRepoDir.exists()) { - try { - FileUtils.forceDelete(localRepoDir); - } catch (FileNotFoundException e) { - logger.info("A file that does not exist cannot be deleted, nothing to worry", e); + public void removeResourcesBelongsToParagraph(String noteId, String paragraphId) { + for (ManagedInterpreterGroup intpGroup : getAllInterpreterGroup()) { + ResourceSet resourceSet = new ResourceSet(); + RemoteInterpreterProcess remoteInterpreterProcess = intpGroup.getRemoteInterpreterProcess(); + if (remoteInterpreterProcess == null) { + ResourcePool localPool = intpGroup.getResourcePool(); + if (localPool != null) { + resourceSet.addAll(localPool.getAll()); + } + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } + + for (Resource r : resourceSet) { + localPool.remove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + } + } else if (remoteInterpreterProcess.isRunning()) { + List<String> resourceList = remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<List<String>>() { + @Override + public List<String> call(RemoteInterpreterService.Client client) throws Exception { + return client.resourcePoolGetAll(); } - } + }); + for (String res : resourceList) { + resourceSet.add(Resource.fromJson(res)); + } - // load dependencies - List<Dependency> deps = setting.getDependencies(); - if (deps != null) { - for (Dependency d : deps) { - File destDir = new File( - zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); + if (noteId != null) { + resourceSet = resourceSet.filterByNoteId(noteId); + } + if (paragraphId != null) { + resourceSet = resourceSet.filterByParagraphId(paragraphId); + } - if (d.getExclusions() != null) { - dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(), - new File(destDir, setting.getId())); - } else { - dependencyResolver - .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId())); + for (final Resource r : resourceSet) { + remoteInterpreterProcess.callRemoteFunction( + new RemoteInterpreterProcess.RemoteFunction<Void>() { + + @Override + public Void call(RemoteInterpreterService.Client client) throws Exception { + client.resourceRemove( + r.getResourceId().getNoteId(), + r.getResourceId().getParagraphId(), + r.getResourceId().getName()); + return null; } - } - } - - setting.setStatus(InterpreterSetting.Status.READY); - setting.setErrorReason(null); - } catch (Exception e) { - logger.error(String.format("Error while downloading repos for interpreter group : %s," + - " go to interpreter setting page click on edit and save it again to make " + - "this interpreter work properly. : %s", - setting.getGroup(), e.getLocalizedMessage()), e); - setting.setErrorReason(e.getLocalizedMessage()); - setting.setStatus(InterpreterSetting.Status.ERROR); - } finally { - interpreterSettings.put(setting.getId(), setting); - } + }); } - }; - t.start(); + } } } + public void removeResourcesBelongsToNote(String noteId) { + removeResourcesBelongsToParagraph(noteId, null); + } + /** * Overwrite dependency jar under local-repo/{interpreterId} * if jar file in original path is changed */ private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - interpreterSettings.put(setting.getId(), setting); synchronized (interpreterSettings) { final Thread t = new Thread() { public void run() { @@ -632,7 +552,7 @@ public class InterpreterSettingManager { if (deps != null) { for (Dependency d : deps) { File destDir = new File( - zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); + conf.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); int numSplits = d.getGroupArtifactVersion().split(":").length; if (!(numSplits >= 3 && numSplits <= 6)) { @@ -643,14 +563,14 @@ public class InterpreterSettingManager { } setting.setStatus(InterpreterSetting.Status.READY); } catch (Exception e) { - logger.error(String.format("Error while copying deps for interpreter group : %s," + + LOGGER.error(String.format("Error while copying deps for interpreter group : %s," + " go to interpreter setting page click on edit and save it again to make " + "this interpreter work properly.", setting.getGroup()), e); setting.setErrorReason(e.getLocalizedMessage()); setting.setStatus(InterpreterSetting.Status.ERROR); } finally { - interpreterSettings.put(setting.getId(), setting); + } } }; @@ -663,220 +583,107 @@ public class InterpreterSettingManager { * The list does not contain more than one setting from the same interpreter class. * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name */ - public List<String> getDefaultInterpreterSettingList() { - // this list will contain default interpreter setting list - List<String> defaultSettings = new LinkedList<>(); - - // to ignore the same interpreter group - Map<String, Boolean> interpreterGroupCheck = new HashMap<>(); - - List<InterpreterSetting> sortedSettings = get(); - - for (InterpreterSetting setting : sortedSettings) { - if (defaultSettings.contains(setting.getId())) { - continue; - } - - if (!interpreterGroupCheck.containsKey(setting.getName())) { - defaultSettings.add(setting.getId()); - interpreterGroupCheck.put(setting.getName(), true); - } - } - return defaultSettings; - } - - List<RegisteredInterpreter> getRegisteredInterpreterList() { - return new ArrayList<>(Interpreter.registeredInterpreters.values()); - } - - - private boolean findDefaultInterpreter(List<InterpreterInfo> infos) { - for (InterpreterInfo interpreterInfo : infos) { - if (interpreterInfo.isDefaultInterpreter()) { - return true; - } + public List<String> getInterpreterSettingIds() { + List<String> settingIdList = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : get()) { + settingIdList.add(interpreterSetting.getId()); } - return false; + return settingIdList; } public InterpreterSetting createNewSetting(String name, String group, List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p) throws IOException { + if (name.indexOf(".") >= 0) { throw new IOException("'.' is invalid for InterpreterSetting name."); } - InterpreterSetting setting = createFromInterpreterSettingRef(group); + // check if name is existed + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + if (interpreterSetting.getName().equals(name)) { + throw new IOException("Interpreter " + name + " already existed"); + } + } + InterpreterSetting setting = new InterpreterSetting(interpreterSettingTemplates.get(group)); setting.setName(name); setting.setGroup(group); + //TODO(zjffdu) Should use setDependencies setting.appendDependencies(dependencies); setting.setInterpreterOption(option); setting.setProperties(p); - setting.setInterpreterGroupFactory(interpreterGroupFactory); + setting.setAppEventListener(appEventListener); + setting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + setting.setDependencyResolver(dependencyResolver); + setting.setAngularObjectRegistryListener(angularObjectRegistryListener); + setting.setInterpreterSettingManager(this); + setting.postProcessing(); interpreterSettings.put(setting.getId(), setting); - loadInterpreterDependencies(setting); saveToFile(); return setting; } - private InterpreterSetting add(String group, InterpreterInfo interpreterInfo, - Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option, - String path, InterpreterRunner runner) - throws InterpreterException, IOException, RepositoryException { - ArrayList<InterpreterInfo> infos = new ArrayList<>(); - infos.add(interpreterInfo); - return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path, - runner); - } - - /** - * @param group InterpreterSetting reference name - */ - public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos, - List<Dependency> dependencies, InterpreterOption option, - Map<String, DefaultInterpreterProperty> interpreterProperties, String path, - InterpreterRunner runner) { - Preconditions.checkNotNull(group, "name should not be null"); - Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null"); - Preconditions.checkNotNull(dependencies, "dependencies should not be null"); - Preconditions.checkNotNull(option, "option should not be null"); - Preconditions.checkNotNull(interpreterProperties, "properties should not be null"); - - InterpreterSetting interpreterSetting; - - synchronized (interpreterSettingsRef) { - if (interpreterSettingsRef.containsKey(group)) { - interpreterSetting = interpreterSettingsRef.get(group); - - // Append InterpreterInfo - List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos(); - boolean hasDefaultInterpreter = findDefaultInterpreter(infos); - for (InterpreterInfo interpreterInfo : interpreterInfos) { - if (!infos.contains(interpreterInfo)) { - if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) { - hasDefaultInterpreter = true; - infos.add(0, interpreterInfo); - } else { - infos.add(interpreterInfo); - } - } - } - - // Append dependencies - List<Dependency> dependencyList = interpreterSetting.getDependencies(); - for (Dependency dependency : dependencies) { - if (!dependencyList.contains(dependency)) { - dependencyList.add(dependency); - } - } - - // Append properties - Map<String, DefaultInterpreterProperty> properties = - (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties(); - for (String key : interpreterProperties.keySet()) { - if (!properties.containsKey(key)) { - properties.put(key, interpreterProperties.get(key)); - } - } - - } else { - interpreterSetting = - new InterpreterSetting(group, null, interpreterInfos, interpreterProperties, - dependencies, option, path, runner); - interpreterSettingsRef.put(group, interpreterSetting); - } - } - - if (dependencies.size() > 0) { - loadInterpreterDependencies(interpreterSetting); - } - - interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory); - return interpreterSetting; + @VisibleForTesting + public void addInterpreterSetting(InterpreterSetting interpreterSetting) { + interpreterSettingTemplates.put(interpreterSetting.getName(), interpreterSetting); + interpreterSetting.setAppEventListener(appEventListener); + interpreterSetting.setDependencyResolver(dependencyResolver); + interpreterSetting.setAngularObjectRegistryListener(angularObjectRegistryListener); + interpreterSetting.setRemoteInterpreterProcessListener(remoteInterpreterProcessListener); + interpreterSetting.setInterpreterSettingManager(this); + interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); } /** * map interpreter ids into noteId * + * @param user user name * @param noteId note id - * @param ids InterpreterSetting id list + * @param settingIdList InterpreterSetting id list */ - public void setInterpreters(String user, String noteId, List<String> ids) throws IOException { - putNoteInterpreterSettingBinding(user, noteId, ids); - } - - private void putNoteInterpreterSettingBinding(String user, String noteId, - List<String> settingList) throws IOException { - List<String> unBindedSettings = new LinkedList<>(); + public void setInterpreterBinding(String user, String noteId, List<String> settingIdList) + throws IOException { + List<String> unBindedSettingIdList = new LinkedList<>(); synchronized (interpreterSettings) { - List<String> oldSettings = interpreterBindings.get(noteId); - if (oldSettings != null) { - for (String oldSettingId : oldSettings) { - if (!settingList.contains(oldSettingId)) { - unBindedSettings.add(oldSettingId); + List<String> oldSettingIdList = interpreterBindings.get(noteId); + if (oldSettingIdList != null) { + for (String oldSettingId : oldSettingIdList) { + if (!settingIdList.contains(oldSettingId)) { + unBindedSettingIdList.add(oldSettingId); } } } - interpreterBindings.put(noteId, settingList); + interpreterBindings.put(noteId, settingIdList); saveToFile(); - for (String settingId : unBindedSettings) { - InterpreterSetting setting = get(settingId); - removeInterpretersForNote(setting, user, noteId); + for (String settingId : unBindedSettingIdList) { + InterpreterSetting interpreterSetting = interpreterSettings.get(settingId); + //TODO(zjffdu) Add test for this scenario + //only close Interpreters when it is note scoped + if (interpreterSetting.getOption().perNoteIsolated() || + interpreterSetting.getOption().perNoteScoped()) { + interpreterSetting.closeInterpreters(user, noteId); + } } } } - public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user, - String noteId) { - //TODO(jl): This is only for hotfix. You should fix it as a beautiful way - InterpreterOption interpreterOption = interpreterSetting.getOption(); - if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote) - && InterpreterOption.SHARED.equals(interpreterOption.perUser))) { - interpreterSetting.closeAndRemoveInterpreterGroup(noteId, ""); - } - } - - public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) { - InterpreterOption option = setting.getOption(); - String key; - if (option.isExistingProcess()) { - key = Constants.EXISTING_PROCESS; - } else if (option.perNoteScoped() && option.perUserScoped()) { - key = user + ":" + noteId; - } else if (option.perUserScoped()) { - key = user; - } else if (option.perNoteScoped()) { - key = noteId; - } else { - key = SHARED_SESSION; - } - - logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + - "{}", key, noteId, user, setting.getName()); - return key; - } - - - public List<String> getInterpreters(String noteId) { - return getNoteInterpreterSettingBinding(noteId); + public List<String> getInterpreterBinding(String noteId) { + return interpreterBindings.get(noteId); } + @VisibleForTesting public void closeNote(String user, String noteId) { // close interpreters in this note session + LOGGER.info("Close Note: {}", noteId); List<InterpreterSetting> settings = getInterpreterSettings(noteId); - if (settings == null || settings.size() == 0) { - return; - } - - logger.info("closeNote: {}", noteId); for (InterpreterSetting setting : settings) { - removeInterpretersForNote(setting, user, noteId); + setting.closeInterpreters(user, noteId); } } - public Map<String, InterpreterSetting> getAvailableInterpreterSettings() { - return interpreterSettingsRef; + public Map<String, InterpreterSetting> getInterpreterSettingTemplates() { + return interpreterSettingTemplates; } private URL[] recursiveBuildLibList(File path) throws MalformedURLException { @@ -914,36 +721,25 @@ public class InterpreterSettingManager { } public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException { - List<String> settingIds = interpreterBindings.remove(noteId); - if (settingIds != null) { - for (String settingId : settingIds) { - InterpreterSetting setting = get(settingId); - if (setting != null) { - this.removeInterpretersForNote(setting, user, noteId); - } - } - } - saveToFile(); + setInterpreterBinding(user, noteId, new ArrayList<String>()); + interpreterBindings.remove(noteId); } /** * Change interpreter property and restart */ public void setPropertyAndRestart(String id, InterpreterOption option, - Map<String, InterpreterProperty> properties, - List<Dependency> dependencies) throws IOException { + Map<String, InterpreterProperty> properties, + List<Dependency> dependencies) throws IOException { synchronized (interpreterSettings) { InterpreterSetting intpSetting = interpreterSettings.get(id); if (intpSetting != null) { try { - stopJobAllInterpreter(intpSetting); - - intpSetting.closeAndRemoveAllInterpreterGroups(); + intpSetting.close(); intpSetting.setOption(option); intpSetting.setProperties(properties); intpSetting.setDependencies(dependencies); - loadInterpreterDependencies(intpSetting); - + intpSetting.postProcessing(); saveToFile(); } catch (Exception e) { loadFromFile(); @@ -955,6 +751,7 @@ public class InterpreterSettingManager { } } + // restart in note page public void restart(String settingId, String noteId, String user) { InterpreterSetting intpSetting = interpreterSettings.get(settingId); Preconditions.checkNotNull(intpSetting); @@ -967,11 +764,10 @@ public class InterpreterSettingManager { intpSetting.setInfos(null); copyDependenciesFromLocalPath(intpSetting); - stopJobAllInterpreter(intpSetting); if (user.equals("anonymous")) { - intpSetting.closeAndRemoveAllInterpreterGroups(); + intpSetting.close(); } else { - intpSetting.closeAndRemoveInterpreterGroup(noteId, user); + intpSetting.closeInterpreters(user, noteId); } } else { @@ -984,39 +780,33 @@ public class InterpreterSettingManager { restart(id, "", "anonymous"); } - private void stopJobAllInterpreter(InterpreterSetting intpSetting) { - if (intpSetting != null) { - for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) { - for (List<Interpreter> interpreters : intpGroup.values()) { - for (Interpreter intp : interpreters) { - for (Job job : intp.getScheduler().getJobsRunning()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); - } - for (Job job : intp.getScheduler().getJobsWaiting()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); - } - } - } - } + public InterpreterSetting get(String id) { + synchronized (interpreterSettings) { + return interpreterSettings.get(id); } } - public InterpreterSetting get(String name) { - synchronized (interpreterSettings) { - return interpreterSettings.get(name); + @VisibleForTesting + public InterpreterSetting getByName(String name) { + for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { + if (interpreterSetting.getName().equals(name)) { + return interpreterSetting; + } } + throw new RuntimeException("No InterpreterSetting: " + name); } public void remove(String id) throws IOException { + // 1. close interpreter groups of this interpreter setting + // 2. remove this interpreter setting + // 3. remove this interpreter setting from note binding + // 4. clean local repo directory + LOGGER.info("Remove interpreter setting: " + id); synchronized (interpreterSettings) { if (interpreterSettings.containsKey(id)) { - InterpreterSetting intp = interpreterSettings.get(id); - intp.closeAndRemoveAllInterpreterGroups(); + InterpreterSetting intp = interpreterSettings.get(id); + intp.close(); interpreterSettings.remove(id); for (List<String> settings : interpreterBindings.values()) { Iterator<String> it = settings.iterator(); @@ -1031,7 +821,7 @@ public class InterpreterSettingManager { } } - File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id); + File localRepoDir = new File(conf.getInterpreterLocalRepoPath() + "/" + id); FileUtils.deleteDirectory(localRepoDir); } @@ -1040,84 +830,58 @@ public class InterpreterSettingManager { */ public List<InterpreterSetting> get() { synchronized (interpreterSettings) { - List<InterpreterSetting> orderedSettings = new LinkedList<>(); - - Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>(); - for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { - String group = interpreterSetting.getGroup(); - if (!nameInterpreterSettingMap.containsKey(group)) { - nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>()); - } - nameInterpreterSettingMap.get(group).add(interpreterSetting); - } - - for (String groupName : interpreterGroupOrderList) { - List<InterpreterSetting> interpreterSettingList = - nameInterpreterSettingMap.remove(groupName); - if (null != interpreterSettingList) { - for (InterpreterSetting interpreterSetting : interpreterSettingList) { - orderedSettings.add(interpreterSetting); - } - } - } - - List<InterpreterSetting> settings = new ArrayList<>(); - - for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) { - for (InterpreterSetting interpreterSetting : interpreterSettingList) { - settings.add(interpreterSetting); - } - } - - Collections.sort(settings, new Comparator<InterpreterSetting>() { + List<InterpreterSetting> orderedSettings = new ArrayList<>(interpreterSettings.values()); + Collections.sort(orderedSettings, new Comparator<InterpreterSetting>() { @Override public int compare(InterpreterSetting o1, InterpreterSetting o2) { - return o1.getName().compareTo(o2.getName()); + int i = interpreterGroupOrderList.indexOf(o1.getGroup()); + int j = interpreterGroupOrderList.indexOf(o2.getGroup()); + if (i < 0) { + LOGGER.warn("InterpreterGroup " + o1.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + i = Integer.MAX_VALUE; + } + if (j < 0) { + LOGGER.warn("InterpreterGroup " + o2.getGroup() + + " is not specified in " + ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER.getVarName()); + // move the unknown interpreter to last + j = Integer.MAX_VALUE; + } + if (i < j) { + return -1; + } else if (i > j) { + return 1; + } else { + return 0; + } } }); - - orderedSettings.addAll(settings); - return orderedSettings; } } - public void close(InterpreterSetting interpreterSetting) { - interpreterSetting.closeAndRemoveAllInterpreterGroups(); - } - - public void close() { - List<Thread> closeThreads = new LinkedList<>(); - synchronized (interpreterSettings) { - Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { - Thread t = new Thread() { - public void run() { - intpSetting.closeAndRemoveAllInterpreterGroups(); - } - }; - t.start(); - closeThreads.add(t); - } + @VisibleForTesting + public List<String> getSettingIds() { + List<String> settingIds = new ArrayList<>(); + for (InterpreterSetting interpreterSetting : get()) { + settingIds.add(interpreterSetting.getId()); } + return settingIds; + } - for (Thread t : closeThreads) { - try { - t.join(); - } catch (InterruptedException e) { - logger.error("Can't close interpreterGroup", e); - } - } + public void close(String settingId) { + get(settingId).close(); } - public void shutdown() { + public void close() { List<Thread> closeThreads = new LinkedList<>(); synchronized (interpreterSettings) { Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); for (final InterpreterSetting intpSetting : intpSettings) { Thread t = new Thread() { public void run() { - intpSetting.shutdownAndRemoveAllInterpreterGroups(); + intpSetting.close(); } }; t.start(); @@ -1129,8 +893,9 @@ public class InterpreterSettingManager { try { t.join(); } catch (InterruptedException e) { - logger.error("Can't close interpreterGroup", e); + LOGGER.error("Can't close interpreterGroup", e); } } } + }
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java new file mode 100644 index 0000000..1d7d916 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/ManagedInterpreterGroup.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.zeppelin.interpreter; + +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.scheduler.Job; +import org.apache.zeppelin.scheduler.Scheduler; +import org.apache.zeppelin.scheduler.SchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.List; + +/** + * ManagedInterpreterGroup runs under zeppelin server + */ +public class ManagedInterpreterGroup extends InterpreterGroup { + + private static final Logger LOGGER = LoggerFactory.getLogger(ManagedInterpreterGroup.class); + + private InterpreterSetting interpreterSetting; + private RemoteInterpreterProcess remoteInterpreterProcess; // attached remote interpreter process + + /** + * Create InterpreterGroup with given id and interpreterSetting, used in ZeppelinServer + * @param id + * @param interpreterSetting + */ + ManagedInterpreterGroup(String id, InterpreterSetting interpreterSetting) { + super(id); + this.interpreterSetting = interpreterSetting; + } + + public InterpreterSetting getInterpreterSetting() { + return interpreterSetting; + } + + public synchronized RemoteInterpreterProcess getOrCreateInterpreterProcess() { + if (remoteInterpreterProcess == null) { + LOGGER.info("Create InterperterProcess for InterpreterGroup: " + getId()); + remoteInterpreterProcess = interpreterSetting.createInterpreterProcess(); + } + return remoteInterpreterProcess; + } + + public RemoteInterpreterProcess getRemoteInterpreterProcess() { + return remoteInterpreterProcess; + } + + + /** + * Close all interpreter instances in this group + */ + public synchronized void close() { + LOGGER.info("Close InterpreterGroup: " + id); + for (String sessionId : sessions.keySet()) { + close(sessionId); + } + } + + /** + * Close all interpreter instances in this session + * @param sessionId + */ + public synchronized void close(String sessionId) { + LOGGER.info("Close Session: " + sessionId); + close(sessions.remove(sessionId)); + //TODO(zjffdu) whether close InterpreterGroup if there's no session left in Zeppelin Server + if (sessions.isEmpty() && interpreterSetting != null) { + LOGGER.info("Remove this InterpreterGroup {} as all the sessions are closed", id); + interpreterSetting.removeInterpreterGroup(id); + if (remoteInterpreterProcess != null) { + LOGGER.info("Kill RemoteIntetrpreterProcess"); + remoteInterpreterProcess.stop(); + remoteInterpreterProcess = null; + } + } + } + + private void close(Collection<Interpreter> interpreters) { + if (interpreters == null) { + return; + } + + for (Interpreter interpreter : interpreters) { + Scheduler scheduler = interpreter.getScheduler(); + for (Job job : scheduler.getJobsRunning()) { + job.abort(); + job.setStatus(Job.Status.ABORT); + LOGGER.info("Job " + job.getJobName() + " aborted "); + } + for (Job job : scheduler.getJobsWaiting()) { + job.abort(); + job.setStatus(Job.Status.ABORT); + LOGGER.info("Job " + job.getJobName() + " aborted "); + } + + interpreter.close(); + //TODO(zjffdu) move the close of schedule to Interpreter + if (null != scheduler) { + SchedulerFactory.singleton().removeScheduler(scheduler.getName()); + } + } + } + + public synchronized List<Interpreter> getOrCreateSession(String user, String sessionId) { + if (sessions.containsKey(sessionId)) { + return sessions.get(sessionId); + } else { + List<Interpreter> interpreters = interpreterSetting.createInterpreters(user, sessionId); + for (Interpreter interpreter : interpreters) { + interpreter.setInterpreterGroup(this); + } + LOGGER.info("Create Session {} in InterpreterGroup {} for user {}", sessionId, id, user); + sessions.put(sessionId, interpreters); + return interpreters; + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java index 3838f63..0817595 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -17,19 +17,17 @@ package org.apache.zeppelin.interpreter.install; import org.apache.commons.io.FileUtils; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Logger; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.util.Util; import org.sonatype.aether.RepositoryException; + import java.io.File; import java.io.IOException; import java.net.URL; import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java new file mode 100644 index 0000000..b139404 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputBuffer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +/** + * This element stores the buffered + * append-data of paragraph's output. + */ +public class AppendOutputBuffer { + + private String noteId; + private String paragraphId; + private int index; + private String data; + + public AppendOutputBuffer(String noteId, String paragraphId, int index, String data) { + this.noteId = noteId; + this.paragraphId = paragraphId; + this.index = index; + this.data = data; + } + + public String getNoteId() { + return noteId; + } + + public String getParagraphId() { + return paragraphId; + } + + public int getIndex() { + return index; + } + + public String getData() { + return data; + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java new file mode 100644 index 0000000..2a88dc2 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/AppendOutputRunner.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +/** + * This thread sends paragraph's append-data + * periodically, rather than continously, with + * a period of BUFFER_TIME_MS. It handles append-data + * for all paragraphs across all notebooks. + */ +public class AppendOutputRunner implements Runnable { + + private static final Logger logger = + LoggerFactory.getLogger(AppendOutputRunner.class); + public static final Long BUFFER_TIME_MS = new Long(100); + private static final Long SAFE_PROCESSING_TIME = new Long(10); + private static final Long SAFE_PROCESSING_STRING_SIZE = new Long(100000); + + private final BlockingQueue<AppendOutputBuffer> queue = new LinkedBlockingQueue<>(); + private final RemoteInterpreterProcessListener listener; + + public AppendOutputRunner(RemoteInterpreterProcessListener listener) { + this.listener = listener; + } + + @Override + public void run() { + + Map<String, StringBuilder> stringBufferMap = new HashMap<>(); + List<AppendOutputBuffer> list = new LinkedList<>(); + + /* "drainTo" method does not wait for any element + * to be present in the queue, and thus this loop would + * continuosly run (with period of BUFFER_TIME_MS). "take()" method + * waits for the queue to become non-empty and then removes + * one element from it. Rest elements from queue (if present) are + * removed using "drainTo" method. Thus we save on some un-necessary + * cpu-cycles. + */ + try { + list.add(queue.take()); + } catch (InterruptedException e) { + logger.error("Wait for OutputBuffer queue interrupted: " + e.getMessage()); + } + Long processingStartTime = System.currentTimeMillis(); + queue.drainTo(list); + + for (AppendOutputBuffer buffer: list) { + String noteId = buffer.getNoteId(); + String paragraphId = buffer.getParagraphId(); + int index = buffer.getIndex(); + String stringBufferKey = noteId + ":" + paragraphId + ":" + index; + + StringBuilder builder = stringBufferMap.containsKey(stringBufferKey) ? + stringBufferMap.get(stringBufferKey) : new StringBuilder(); + + builder.append(buffer.getData()); + stringBufferMap.put(stringBufferKey, builder); + } + Long processingTime = System.currentTimeMillis() - processingStartTime; + + if (processingTime > SAFE_PROCESSING_TIME) { + logger.warn("Processing time for buffered append-output is high: " + + processingTime + " milliseconds."); + } else { + logger.debug("Processing time for append-output took " + + processingTime + " milliseconds"); + } + + Long sizeProcessed = new Long(0); + for (String stringBufferKey : stringBufferMap.keySet()) { + StringBuilder buffer = stringBufferMap.get(stringBufferKey); + sizeProcessed += buffer.length(); + String[] keys = stringBufferKey.split(":"); + listener.onOutputAppend(keys[0], keys[1], Integer.parseInt(keys[2]), buffer.toString()); + } + + if (sizeProcessed > SAFE_PROCESSING_STRING_SIZE) { + logger.warn("Processing size for buffered append-output is high: " + + sizeProcessed + " characters."); + } else { + logger.debug("Processing size for append-output is " + + sizeProcessed + " characters"); + } + } + + public void appendBuffer(String noteId, String paragraphId, int index, String outputToAppend) { + queue.offer(new AppendOutputBuffer(noteId, paragraphId, index, outputToAppend)); + } + +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java new file mode 100644 index 0000000..b2cb78f --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.pool2.BasePooledObjectFactory; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; + +/** + * + */ +public class ClientFactory extends BasePooledObjectFactory<Client>{ + private String host; + private int port; + Map<Client, TSocket> clientSocketMap = new HashMap<>(); + + public ClientFactory(String host, int port) { + this.host = host; + this.port = port; + } + + @Override + public Client create() throws Exception { + TSocket transport = new TSocket(host, port); + try { + transport.open(); + } catch (TTransportException e) { + throw new InterpreterException(e); + } + + TProtocol protocol = new TBinaryProtocol(transport); + Client client = new RemoteInterpreterService.Client(protocol); + + synchronized (clientSocketMap) { + clientSocketMap.put(client, transport); + } + return client; + } + + @Override + public PooledObject<Client> wrap(Client client) { + return new DefaultPooledObject<>(client); + } + + @Override + public void destroyObject(PooledObject<Client> p) { + synchronized (clientSocketMap) { + if (clientSocketMap.containsKey(p.getObject())) { + clientSocketMap.get(p.getObject()).close(); + clientSocketMap.remove(p.getObject()); + } + } + } + + @Override + public boolean validateObject(PooledObject<Client> p) { + return p.getObject().getOutputProtocol().getTransport().isOpen(); + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java new file mode 100644 index 0000000..064abd5 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/InterpreterContextRunnerPool.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.zeppelin.interpreter.InterpreterContextRunner; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class InterpreterContextRunnerPool { + Logger logger = LoggerFactory.getLogger(InterpreterContextRunnerPool.class); + private Map<String, List<InterpreterContextRunner>> interpreterContextRunners; + + public InterpreterContextRunnerPool() { + interpreterContextRunners = new HashMap<>(); + + } + + // add runner + public void add(String noteId, InterpreterContextRunner runner) { + synchronized (interpreterContextRunners) { + if (!interpreterContextRunners.containsKey(noteId)) { + interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>()); + } + + interpreterContextRunners.get(noteId).add(runner); + } + } + + // replace all runners to noteId + public void addAll(String noteId, List<InterpreterContextRunner> runners) { + synchronized (interpreterContextRunners) { + if (!interpreterContextRunners.containsKey(noteId)) { + interpreterContextRunners.put(noteId, new LinkedList<InterpreterContextRunner>()); + } + + interpreterContextRunners.get(noteId).addAll(runners); + } + } + + public void clear(String noteId) { + synchronized (interpreterContextRunners) { + interpreterContextRunners.remove(noteId); + } + } + + + public void run(String noteId, String paragraphId) { + synchronized (interpreterContextRunners) { + List<InterpreterContextRunner> list = interpreterContextRunners.get(noteId); + if (list != null) { + for (InterpreterContextRunner r : list) { + if (noteId.equals(r.getNoteId()) && paragraphId.equals(r.getParagraphId())) { + logger.info("run paragraph {} on note {} from InterpreterContext", + r.getParagraphId(), r.getNoteId()); + r.run(); + return; + } + } + } + + throw new InterpreterException("Can not run paragraph " + paragraphId + " on " + noteId); + } + } +} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6203c51/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java new file mode 100644 index 0000000..62c8efd --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import org.apache.zeppelin.display.AngularObject; +import org.apache.zeppelin.display.AngularObjectListener; +import org.apache.zeppelin.interpreter.InterpreterGroup; +import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; + +/** + * Proxy for AngularObject that exists in remote interpreter process + */ +public class RemoteAngularObject extends AngularObject { + + private transient ManagedInterpreterGroup interpreterGroup; + + RemoteAngularObject(String name, Object o, String noteId, String paragraphId, + ManagedInterpreterGroup interpreterGroup, + AngularObjectListener listener) { + super(name, o, noteId, paragraphId, listener); + this.interpreterGroup = interpreterGroup; + } + + @Override + public void set(Object o, boolean emit) { + set(o, emit, true); + } + + public void set(Object o, boolean emitWeb, boolean emitRemoteProcess) { + super.set(o, emitWeb); + + if (emitRemoteProcess) { + // send updated value to remote interpreter + interpreterGroup.getRemoteInterpreterProcess(). + updateRemoteAngularObject( + getName(), getNoteId(), getParagraphId(), o); + } + } +}