http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java deleted file mode 100644 index 752b4e2..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ /dev/null @@ -1,459 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter; - -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.zeppelin.dep.Dependency; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; -import com.google.gson.annotations.SerializedName; -import com.google.gson.internal.StringMap; - -import static org.apache.zeppelin.notebook.utility.IdHashes.generateId; - -/** - * Interpreter settings - */ -public class InterpreterSetting { - - private static final Logger logger = LoggerFactory.getLogger(InterpreterSetting.class); - private static final String SHARED_PROCESS = "shared_process"; - private String id; - private String name; - // always be null in case of InterpreterSettingRef - private String group; - private transient Map<String, String> infos; - - // Map of the note and paragraphs which has runtime infos generated by this interpreter setting. - // This map is used to clear the infos in paragraph when the interpretersetting is restarted - private transient Map<String, Set<String>> runtimeInfosToBeCleared; - - /** - * properties can be either Map<String, DefaultInterpreterProperty> or - * Map<String, InterpreterProperty> - * properties should be: - * - Map<String, InterpreterProperty> when Interpreter instances are saved to - * `conf/interpreter.json` file - * - Map<String, DefaultInterpreterProperty> when Interpreters are registered - * : this is needed after https://github.com/apache/zeppelin/pull/1145 - * which changed the way of getting default interpreter setting AKA interpreterSettingsRef - */ - private Object properties; - private Status status; - private String errorReason; - - @SerializedName("interpreterGroup") - private List<InterpreterInfo> interpreterInfos; - private final transient Map<String, InterpreterGroup> interpreterGroupRef = new HashMap<>(); - private List<Dependency> dependencies = new LinkedList<>(); - private InterpreterOption option; - private transient String path; - - @SerializedName("runner") - private InterpreterRunner interpreterRunner; - - @Deprecated - private transient InterpreterGroupFactory interpreterGroupFactory; - - private final transient ReentrantReadWriteLock.ReadLock interpreterGroupReadLock; - private final transient ReentrantReadWriteLock.WriteLock interpreterGroupWriteLock; - - public InterpreterSetting() { - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); - interpreterGroupReadLock = lock.readLock(); - interpreterGroupWriteLock = lock.writeLock(); - } - - public InterpreterSetting(String id, String name, String group, - List<InterpreterInfo> interpreterInfos, Object properties, List<Dependency> dependencies, - InterpreterOption option, String path, InterpreterRunner runner) { - this(); - this.id = id; - this.name = name; - this.group = group; - this.interpreterInfos = interpreterInfos; - this.properties = properties; - this.dependencies = dependencies; - this.option = option; - this.path = path; - this.status = Status.READY; - this.interpreterRunner = runner; - } - - public InterpreterSetting(String name, String group, List<InterpreterInfo> interpreterInfos, - Object properties, List<Dependency> dependencies, InterpreterOption option, String path, - InterpreterRunner runner) { - this(generateId(), name, group, interpreterInfos, properties, dependencies, option, path, - runner); - } - - /** - * Create interpreter from interpreterSettingRef - * - * @param o interpreterSetting from interpreterSettingRef - */ - public InterpreterSetting(InterpreterSetting o) { - this(generateId(), o.getName(), o.getGroup(), o.getInterpreterInfos(), o.getProperties(), - o.getDependencies(), o.getOption(), o.getPath(), o.getInterpreterRunner()); - } - - public String getId() { - return id; - } - - public String getName() { - return name; - } - - public String getGroup() { - return group; - } - - private String getInterpreterProcessKey(String user, String noteId) { - InterpreterOption option = getOption(); - String key; - if (getOption().isExistingProcess) { - key = Constants.EXISTING_PROCESS; - } else if (getOption().isProcess()) { - key = (option.perUserIsolated() ? user : "") + ":" + (option.perNoteIsolated() ? noteId : ""); - } else { - key = SHARED_PROCESS; - } - - //logger.debug("getInterpreterProcessKey: {} for InterpreterSetting Id: {}, Name: {}", - // key, getId(), getName()); - return key; - } - - private boolean isEqualInterpreterKeyProcessKey(String refKey, String processKey) { - InterpreterOption option = getOption(); - int validCount = 0; - if (getOption().isProcess() - && !(option.perUserIsolated() == true && option.perNoteIsolated() == true)) { - - List<String> processList = Arrays.asList(processKey.split(":")); - List<String> refList = Arrays.asList(refKey.split(":")); - - if (refList.size() <= 1 || processList.size() <= 1) { - return refKey.equals(processKey); - } - - if (processList.get(0).equals("") || processList.get(0).equals(refList.get(0))) { - validCount = validCount + 1; - } - - if (processList.get(1).equals("") || processList.get(1).equals(refList.get(1))) { - validCount = validCount + 1; - } - - return (validCount >= 2); - } else { - return refKey.equals(processKey); - } - } - - String getInterpreterSessionKey(String user, String noteId) { - InterpreterOption option = getOption(); - String key; - if (option.isExistingProcess()) { - key = Constants.EXISTING_PROCESS; - } else if (option.perNoteScoped() && option.perUserScoped()) { - key = user + ":" + noteId; - } else if (option.perUserScoped()) { - key = user; - } else if (option.perNoteScoped()) { - key = noteId; - } else { - key = "shared_session"; - } - - logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + - "{}", key, noteId, user, getName()); - return key; - } - - public InterpreterGroup getInterpreterGroup(String user, String noteId) { - String key = getInterpreterProcessKey(user, noteId); - if (!interpreterGroupRef.containsKey(key)) { - String interpreterGroupId = getId() + ":" + key; - InterpreterGroup intpGroup = - interpreterGroupFactory.createInterpreterGroup(interpreterGroupId, getOption()); - - interpreterGroupWriteLock.lock(); - logger.debug("create interpreter group with groupId:" + interpreterGroupId); - interpreterGroupRef.put(key, intpGroup); - interpreterGroupWriteLock.unlock(); - } - try { - interpreterGroupReadLock.lock(); - return interpreterGroupRef.get(key); - } finally { - interpreterGroupReadLock.unlock(); - } - } - - public Collection<InterpreterGroup> getAllInterpreterGroups() { - try { - interpreterGroupReadLock.lock(); - return new LinkedList<>(interpreterGroupRef.values()); - } finally { - interpreterGroupReadLock.unlock(); - } - } - - void closeAndRemoveInterpreterGroup(String noteId, String user) { - if (user.equals("anonymous")) { - user = ""; - } - String processKey = getInterpreterProcessKey(user, noteId); - String sessionKey = getInterpreterSessionKey(user, noteId); - List<InterpreterGroup> groupToRemove = new LinkedList<>(); - InterpreterGroup groupItem; - for (String intpKey : new HashSet<>(interpreterGroupRef.keySet())) { - if (isEqualInterpreterKeyProcessKey(intpKey, processKey)) { - interpreterGroupWriteLock.lock(); - // TODO(jl): interpreterGroup has two or more sessionKeys inside it. thus we should not - // remove interpreterGroup if it has two or more values. - groupItem = interpreterGroupRef.get(intpKey); - interpreterGroupWriteLock.unlock(); - groupToRemove.add(groupItem); - } - for (InterpreterGroup groupToClose : groupToRemove) { - // TODO(jl): Fix the logic removing session. Now, it's handled into groupToClose.clsose() - groupToClose.close(interpreterGroupRef, intpKey, sessionKey); - } - groupToRemove.clear(); - } - - //Remove session because all interpreters in this session are closed - //TODO(jl): Change all code to handle interpreter one by one or all at once - - } - - void closeAndRemoveAllInterpreterGroups() { - for (String processKey : new HashSet<>(interpreterGroupRef.keySet())) { - InterpreterGroup interpreterGroup = interpreterGroupRef.get(processKey); - for (String sessionKey : new HashSet<>(interpreterGroup.keySet())) { - interpreterGroup.close(interpreterGroupRef, processKey, sessionKey); - } - } - } - - void shutdownAndRemoveAllInterpreterGroups() { - for (InterpreterGroup interpreterGroup : interpreterGroupRef.values()) { - interpreterGroup.shutdown(); - } - } - - public Object getProperties() { - return properties; - } - - public Properties getFlatProperties() { - Properties p = new Properties(); - if (properties != null) { - Map<String, InterpreterProperty> propertyMap = (Map<String, InterpreterProperty>) properties; - for (String key : propertyMap.keySet()) { - InterpreterProperty tmp = propertyMap.get(key); - p.put(tmp.getName() != null ? tmp.getName() : key, - tmp.getValue() != null ? tmp.getValue().toString() : null); - } - } - return p; - } - - public List<Dependency> getDependencies() { - if (dependencies == null) { - return new LinkedList<>(); - } - return dependencies; - } - - public void setDependencies(List<Dependency> dependencies) { - this.dependencies = dependencies; - } - - public InterpreterOption getOption() { - if (option == null) { - option = new InterpreterOption(); - } - - return option; - } - - public void setOption(InterpreterOption option) { - this.option = option; - } - - public String getPath() { - return path; - } - - public void setPath(String path) { - this.path = path; - } - - public List<InterpreterInfo> getInterpreterInfos() { - return interpreterInfos; - } - - void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { - this.interpreterGroupFactory = interpreterGroupFactory; - } - - void appendDependencies(List<Dependency> dependencies) { - for (Dependency dependency : dependencies) { - if (!this.dependencies.contains(dependency)) { - this.dependencies.add(dependency); - } - } - } - - void setInterpreterOption(InterpreterOption interpreterOption) { - this.option = interpreterOption; - } - - public void setProperties(Map<String, InterpreterProperty> p) { - this.properties = p; - } - - void setGroup(String group) { - this.group = group; - } - - void setName(String name) { - this.name = name; - } - - /*** - * Interpreter status - */ - public enum Status { - DOWNLOADING_DEPENDENCIES, - ERROR, - READY - } - - public Status getStatus() { - return status; - } - - public void setStatus(Status status) { - this.status = status; - } - - public String getErrorReason() { - return errorReason; - } - - public void setErrorReason(String errorReason) { - this.errorReason = errorReason; - } - - public void setInfos(Map<String, String> infos) { - this.infos = infos; - } - - public Map<String, String> getInfos() { - return infos; - } - - public InterpreterRunner getInterpreterRunner() { - return interpreterRunner; - } - - public void setInterpreterRunner(InterpreterRunner interpreterRunner) { - this.interpreterRunner = interpreterRunner; - } - - public void addNoteToPara(String noteId, String paraId) { - if (runtimeInfosToBeCleared == null) { - runtimeInfosToBeCleared = new HashMap<>(); - } - Set<String> paraIdSet = runtimeInfosToBeCleared.get(noteId); - if (paraIdSet == null) { - paraIdSet = new HashSet<>(); - runtimeInfosToBeCleared.put(noteId, paraIdSet); - } - paraIdSet.add(paraId); - } - - public Map<String, Set<String>> getNoteIdAndParaMap() { - return runtimeInfosToBeCleared; - } - - public void clearNoteIdAndParaMap() { - runtimeInfosToBeCleared = null; - } - - // For backward compatibility of interpreter.json format after ZEPPELIN-2654 - public void convertPermissionsFromUsersToOwners(JsonObject jsonObject) { - if (jsonObject != null) { - JsonObject option = jsonObject.getAsJsonObject("option"); - if (option != null) { - JsonArray users = option.getAsJsonArray("users"); - if (users != null) { - if (this.option.getOwners() == null) { - this.option.owners = new LinkedList<>(); - } - for (JsonElement user : users) { - this.option.getOwners().add(user.getAsString()); - } - } - } - } - } - - // For backward compatibility of interpreter.json format after ZEPPELIN-2403 - public void convertFlatPropertiesToPropertiesWithWidgets() { - StringMap newProperties = new StringMap(); - if (properties != null && properties instanceof StringMap) { - StringMap p = (StringMap) properties; - - for (Object o : p.entrySet()) { - Map.Entry entry = (Map.Entry) o; - if (!(entry.getValue() instanceof StringMap)) { - StringMap newProperty = new StringMap(); - newProperty.put("name", entry.getKey()); - newProperty.put("value", entry.getValue()); - newProperty.put("type", InterpreterPropertyType.TEXTAREA.getValue()); - newProperties.put(entry.getKey().toString(), newProperty); - } else { - // already converted - return; - } - } - - this.properties = newProperties; - } - } -}
http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java deleted file mode 100644 index 12545d6..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ /dev/null @@ -1,1136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.lang.reflect.Type; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; -import java.nio.charset.StandardCharsets; -import java.nio.file.DirectoryStream.Filter; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.PosixFilePermission; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.EnumSet; -import java.util.Enumeration; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang.ArrayUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; -import org.apache.zeppelin.dep.Dependency; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.interpreter.Interpreter.RegisteredInterpreter; -import org.apache.zeppelin.scheduler.Job; -import org.apache.zeppelin.scheduler.Job.Status; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.sonatype.aether.RepositoryException; -import org.sonatype.aether.repository.Authentication; -import org.sonatype.aether.repository.Proxy; -import org.sonatype.aether.repository.RemoteRepository; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; -import com.google.gson.JsonObject; -import com.google.gson.JsonParser; -import com.google.gson.internal.StringMap; -import com.google.gson.reflect.TypeToken; - -import static java.nio.file.attribute.PosixFilePermission.OWNER_READ; -import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE; - -/** - * TBD - */ -public class InterpreterSettingManager { - - private static final Logger logger = LoggerFactory.getLogger(InterpreterSettingManager.class); - private static final String SHARED_SESSION = "shared_session"; - private static final Map<String, Object> DEFAULT_EDITOR = ImmutableMap.of( - "language", (Object) "text", - "editOnDblClick", false); - - private final ZeppelinConfiguration zeppelinConfiguration; - private final Path interpreterDirPath; - private final Path interpreterBindingPath; - - /** - * This is only references with default settings, name and properties - * key: InterpreterSetting.name - */ - private final Map<String, InterpreterSetting> interpreterSettingsRef; - /** - * This is used by creating and running Interpreters - * key: InterpreterSetting.id <- This is becuase backward compatibility - */ - private final Map<String, InterpreterSetting> interpreterSettings; - private final Map<String, List<String>> interpreterBindings; - - private final DependencyResolver dependencyResolver; - private final List<RemoteRepository> interpreterRepositories; - - private final InterpreterOption defaultOption; - - private final Map<String, URLClassLoader> cleanCl; - - @Deprecated - private String[] interpreterClassList; - private String[] interpreterGroupOrderList; - private InterpreterGroupFactory interpreterGroupFactory; - - private final Gson gson; - - public InterpreterSettingManager(ZeppelinConfiguration zeppelinConfiguration, - DependencyResolver dependencyResolver, InterpreterOption interpreterOption) - throws IOException, RepositoryException { - this.zeppelinConfiguration = zeppelinConfiguration; - this.interpreterDirPath = Paths.get(zeppelinConfiguration.getInterpreterDir()); - logger.debug("InterpreterRootPath: {}", interpreterDirPath); - this.interpreterBindingPath = Paths.get(zeppelinConfiguration.getInterpreterSettingPath()); - logger.debug("InterpreterBindingPath: {}", interpreterBindingPath); - - this.interpreterSettingsRef = Maps.newConcurrentMap(); - this.interpreterSettings = Maps.newConcurrentMap(); - this.interpreterBindings = Maps.newConcurrentMap(); - - this.dependencyResolver = dependencyResolver; - this.interpreterRepositories = dependencyResolver.getRepos(); - - this.defaultOption = interpreterOption; - - this.cleanCl = Collections.synchronizedMap(new HashMap<String, URLClassLoader>()); - - String replsConf = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETERS); - this.interpreterClassList = replsConf.split(","); - String groupOrder = zeppelinConfiguration.getString(ConfVars.ZEPPELIN_INTERPRETER_GROUP_ORDER); - this.interpreterGroupOrderList = groupOrder.split(","); - - GsonBuilder gsonBuilder = new GsonBuilder(); - gsonBuilder.setPrettyPrinting(); - this.gson = gsonBuilder.create(); - - init(); - } - - /** - * Remember this method doesn't keep current connections after being called - */ - private void loadFromFile() { - if (!Files.exists(interpreterBindingPath)) { - // nothing to read - return; - } - InterpreterInfoSaving infoSaving; - try (BufferedReader jsonReader = - Files.newBufferedReader(interpreterBindingPath, StandardCharsets.UTF_8)) { - JsonParser jsonParser = new JsonParser(); - JsonObject jsonObject = jsonParser.parse(jsonReader).getAsJsonObject(); - infoSaving = gson.fromJson(jsonObject.toString(), InterpreterInfoSaving.class); - - for (String k : infoSaving.interpreterSettings.keySet()) { - InterpreterSetting setting = infoSaving.interpreterSettings.get(k); - - setting.convertFlatPropertiesToPropertiesWithWidgets(); - - List<InterpreterInfo> infos = setting.getInterpreterInfos(); - - // Convert json StringMap to Properties - StringMap<StringMap> p = (StringMap<StringMap>) setting.getProperties(); - Map<String, InterpreterProperty> properties = new HashMap(); - for (String key : p.keySet()) { - StringMap<String> fields = (StringMap<String>) p.get(key); - String type = InterpreterPropertyType.TEXTAREA.getValue(); - try { - type = InterpreterPropertyType.byValue(fields.get("type")).getValue(); - } catch (Exception e) { - logger.warn("Incorrect type of property {} in settings {}", key, - setting.getId()); - } - properties.put(key, new InterpreterProperty(key, fields.get("value"), type)); - } - setting.setProperties(properties); - - // Always use separate interpreter process - // While we decided to turn this feature on always (without providing - // enable/disable option on GUI). - // previously created setting should turn this feature on here. - setting.getOption().setRemote(true); - - setting.convertPermissionsFromUsersToOwners( - jsonObject.getAsJsonObject("interpreterSettings").getAsJsonObject(setting.getId())); - - // Update transient information from InterpreterSettingRef - InterpreterSetting interpreterSettingObject = - interpreterSettingsRef.get(setting.getGroup()); - if (interpreterSettingObject == null) { - logger.warn("can't get InterpreterSetting " + - "Information From loaded Interpreter Setting Ref - {} ", setting.getGroup()); - continue; - } - String depClassPath = interpreterSettingObject.getPath(); - setting.setPath(depClassPath); - - for (InterpreterInfo info : infos) { - if (info.getEditor() == null) { - Map<String, Object> editor = getEditorFromSettingByClassName(interpreterSettingObject, - info.getClassName()); - info.setEditor(editor); - } - } - - setting.setInterpreterGroupFactory(interpreterGroupFactory); - - loadInterpreterDependencies(setting); - interpreterSettings.put(k, setting); - } - - interpreterBindings.putAll(infoSaving.interpreterBindings); - - if (infoSaving.interpreterRepositories != null) { - for (RemoteRepository repo : infoSaving.interpreterRepositories) { - if (!dependencyResolver.getRepos().contains(repo)) { - this.interpreterRepositories.add(repo); - } - } - } - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void saveToFile() throws IOException { - String jsonString; - - synchronized (interpreterSettings) { - InterpreterInfoSaving info = new InterpreterInfoSaving(); - info.interpreterBindings = interpreterBindings; - info.interpreterSettings = interpreterSettings; - info.interpreterRepositories = interpreterRepositories; - - jsonString = info.toJson(); - } - - if (!Files.exists(interpreterBindingPath)) { - Files.createFile(interpreterBindingPath); - - try { - Set<PosixFilePermission> permissions = EnumSet.of(OWNER_READ, OWNER_WRITE); - Files.setPosixFilePermissions(interpreterBindingPath, permissions); - } catch (UnsupportedOperationException e) { - // File system does not support Posix file permissions (likely windows) - continue anyway. - logger.warn("unable to setPosixFilePermissions on '{}'.", interpreterBindingPath); - } - } - - FileOutputStream fos = new FileOutputStream(interpreterBindingPath.toFile(), false); - OutputStreamWriter out = new OutputStreamWriter(fos); - out.append(jsonString); - out.close(); - fos.close(); - } - - //TODO(jl): Fix it to remove InterpreterGroupFactory - public void setInterpreterGroupFactory(InterpreterGroupFactory interpreterGroupFactory) { - for (InterpreterSetting setting : interpreterSettings.values()) { - setting.setInterpreterGroupFactory(interpreterGroupFactory); - } - this.interpreterGroupFactory = interpreterGroupFactory; - } - - private void init() throws InterpreterException, IOException, RepositoryException { - String interpreterJson = zeppelinConfiguration.getInterpreterJson(); - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - if (Files.exists(interpreterDirPath)) { - for (Path interpreterDir : Files - .newDirectoryStream(interpreterDirPath, new Filter<Path>() { - @Override - public boolean accept(Path entry) throws IOException { - return Files.exists(entry) && Files.isDirectory(entry); - } - })) { - String interpreterDirString = interpreterDir.toString(); - - /** - * Register interpreter by the following ordering - * 1. Register it from path {ZEPPELIN_HOME}/interpreter/{interpreter_name}/ - * interpreter-setting.json - * 2. Register it from interpreter-setting.json in classpath - * {ZEPPELIN_HOME}/interpreter/{interpreter_name} - * 3. Register it by Interpreter.register - */ - if (!registerInterpreterFromPath(interpreterDirString, interpreterJson)) { - if (!registerInterpreterFromResource(cl, interpreterDirString, interpreterJson)) { - /* - * TODO(jongyoul) - * - Remove these codes below because of legacy code - * - Support ThreadInterpreter - */ - URLClassLoader ccl = new URLClassLoader( - recursiveBuildLibList(interpreterDir.toFile()), cl); - for (String className : interpreterClassList) { - try { - // Load classes - Class.forName(className, true, ccl); - Set<String> interpreterKeys = Interpreter.registeredInterpreters.keySet(); - for (String interpreterKey : interpreterKeys) { - if (className - .equals(Interpreter.registeredInterpreters.get(interpreterKey) - .getClassName())) { - Interpreter.registeredInterpreters.get(interpreterKey) - .setPath(interpreterDirString); - logger.info("Interpreter " + interpreterKey + " found. class=" + className); - cleanCl.put(interpreterDirString, ccl); - } - } - } catch (Throwable t) { - // nothing to do - } - } - } - } - } - } - - for (RegisteredInterpreter registeredInterpreter : Interpreter.registeredInterpreters - .values()) { - logger - .debug("Registered: {} -> {}. Properties: {}", registeredInterpreter.getInterpreterKey(), - registeredInterpreter.getClassName(), registeredInterpreter.getProperties()); - } - - // RegisteredInterpreters -> interpreterSettingRef - InterpreterInfo interpreterInfo; - for (RegisteredInterpreter r : Interpreter.registeredInterpreters.values()) { - interpreterInfo = - new InterpreterInfo(r.getClassName(), r.getName(), r.isDefaultInterpreter(), - r.getEditor()); - add(r.getGroup(), interpreterInfo, r.getProperties(), defaultOption, r.getPath(), - r.getRunner()); - } - - for (String settingId : interpreterSettingsRef.keySet()) { - InterpreterSetting setting = interpreterSettingsRef.get(settingId); - logger.info("InterpreterSettingRef name {}", setting.getName()); - } - - loadFromFile(); - - // if no interpreter settings are loaded, create default set - if (0 == interpreterSettings.size()) { - Map<String, InterpreterSetting> temp = new HashMap<>(); - InterpreterSetting interpreterSetting; - for (InterpreterSetting setting : interpreterSettingsRef.values()) { - interpreterSetting = createFromInterpreterSettingRef(setting); - temp.put(setting.getName(), interpreterSetting); - } - - for (String group : interpreterGroupOrderList) { - if (null != (interpreterSetting = temp.remove(group))) { - interpreterSettings.put(interpreterSetting.getId(), interpreterSetting); - } - } - - for (InterpreterSetting setting : temp.values()) { - interpreterSettings.put(setting.getId(), setting); - } - - saveToFile(); - } - - for (String settingId : interpreterSettings.keySet()) { - InterpreterSetting setting = interpreterSettings.get(settingId); - logger.info("InterpreterSetting group {} : id={}, name={}", setting.getGroup(), settingId, - setting.getName()); - } - } - - private boolean registerInterpreterFromResource(ClassLoader cl, String interpreterDir, - String interpreterJson) throws IOException, RepositoryException { - URL[] urls = recursiveBuildLibList(new File(interpreterDir)); - ClassLoader tempClassLoader = new URLClassLoader(urls, cl); - - Enumeration<URL> interpreterSettings = tempClassLoader.getResources(interpreterJson); - if (!interpreterSettings.hasMoreElements()) { - return false; - } - for (URL url : Collections.list(interpreterSettings)) { - try (InputStream inputStream = url.openStream()) { - logger.debug("Reading {} from {}", interpreterJson, url); - List<RegisteredInterpreter> registeredInterpreterList = - getInterpreterListFromJson(inputStream); - registerInterpreters(registeredInterpreterList, interpreterDir); - } - } - return true; - } - - private boolean registerInterpreterFromPath(String interpreterDir, String interpreterJson) - throws IOException, RepositoryException { - - Path interpreterJsonPath = Paths.get(interpreterDir, interpreterJson); - if (Files.exists(interpreterJsonPath)) { - logger.debug("Reading {}", interpreterJsonPath); - List<RegisteredInterpreter> registeredInterpreterList = - getInterpreterListFromJson(interpreterJsonPath); - registerInterpreters(registeredInterpreterList, interpreterDir); - return true; - } - return false; - } - - private List<RegisteredInterpreter> getInterpreterListFromJson(Path filename) - throws FileNotFoundException { - return getInterpreterListFromJson(new FileInputStream(filename.toFile())); - } - - private List<RegisteredInterpreter> getInterpreterListFromJson(InputStream stream) { - Type registeredInterpreterListType = new TypeToken<List<RegisteredInterpreter>>() { - }.getType(); - return gson.fromJson(new InputStreamReader(stream), registeredInterpreterListType); - } - - private void registerInterpreters(List<RegisteredInterpreter> registeredInterpreters, - String absolutePath) throws IOException, RepositoryException { - - for (RegisteredInterpreter registeredInterpreter : registeredInterpreters) { - InterpreterInfo interpreterInfo = - new InterpreterInfo(registeredInterpreter.getClassName(), registeredInterpreter.getName(), - registeredInterpreter.isDefaultInterpreter(), registeredInterpreter.getEditor()); - // use defaultOption if it is not specified in interpreter-setting.json - InterpreterOption option = registeredInterpreter.getOption() == null ? defaultOption : - registeredInterpreter.getOption(); - add(registeredInterpreter.getGroup(), interpreterInfo, registeredInterpreter.getProperties(), - option, absolutePath, registeredInterpreter.getRunner()); - } - - } - - public InterpreterSetting getDefaultInterpreterSetting(List<InterpreterSetting> settings) { - if (settings == null || settings.isEmpty()) { - return null; - } - return settings.get(0); - } - - public InterpreterSetting getDefaultInterpreterSetting(String noteId) { - return getDefaultInterpreterSetting(getInterpreterSettings(noteId)); - } - - public List<InterpreterSetting> getInterpreterSettings(String noteId) { - List<String> interpreterSettingIds = getNoteInterpreterSettingBinding(noteId); - LinkedList<InterpreterSetting> settings = new LinkedList<>(); - - Iterator<String> iter = interpreterSettingIds.iterator(); - while (iter.hasNext()) { - String id = iter.next(); - InterpreterSetting setting = get(id); - if (setting == null) { - // interpreter setting is removed from factory. remove id from here, too - iter.remove(); - } else { - settings.add(setting); - } - } - return settings; - } - - private List<String> getNoteInterpreterSettingBinding(String noteId) { - LinkedList<String> bindings = new LinkedList<>(); - List<String> settingIds = interpreterBindings.get(noteId); - if (settingIds != null) { - bindings.addAll(settingIds); - } - return bindings; - } - - private InterpreterSetting createFromInterpreterSettingRef(String name) { - Preconditions.checkNotNull(name, "reference name should be not null"); - InterpreterSetting settingRef = interpreterSettingsRef.get(name); - return createFromInterpreterSettingRef(settingRef); - } - - private InterpreterSetting createFromInterpreterSettingRef(InterpreterSetting o) { - // should return immutable objects - List<InterpreterInfo> infos = (null == o.getInterpreterInfos()) ? - new ArrayList<InterpreterInfo>() : new ArrayList<>(o.getInterpreterInfos()); - List<Dependency> deps = (null == o.getDependencies()) ? - new ArrayList<Dependency>() : new ArrayList<>(o.getDependencies()); - Map<String, InterpreterProperty> props = - convertInterpreterProperties((Map<String, DefaultInterpreterProperty>) o.getProperties()); - InterpreterOption option = InterpreterOption.fromInterpreterOption(o.getOption()); - - InterpreterSetting setting = new InterpreterSetting(o.getName(), o.getName(), - infos, props, deps, option, o.getPath(), o.getInterpreterRunner()); - setting.setInterpreterGroupFactory(interpreterGroupFactory); - return setting; - } - - private Map<String, InterpreterProperty> convertInterpreterProperties( - Map<String, DefaultInterpreterProperty> defaultProperties) { - Map<String, InterpreterProperty> properties = new HashMap<>(); - - for (String key : defaultProperties.keySet()) { - DefaultInterpreterProperty defaultInterpreterProperty = defaultProperties.get(key); - properties.put(key, new InterpreterProperty(key, defaultInterpreterProperty.getValue(), - defaultInterpreterProperty.getType())); - } - return properties; - } - - public Map<String, Object> getEditorSetting(Interpreter interpreter, String user, String noteId, - String replName) { - Map<String, Object> editor = DEFAULT_EDITOR; - String group = StringUtils.EMPTY; - try { - String defaultSettingName = getDefaultInterpreterSetting(noteId).getName(); - List<InterpreterSetting> intpSettings = getInterpreterSettings(noteId); - for (InterpreterSetting intpSetting : intpSettings) { - String[] replNameSplit = replName.split("\\."); - if (replNameSplit.length == 2) { - group = replNameSplit[0]; - } - // when replName is 'name' of interpreter - if (defaultSettingName.equals(intpSetting.getName())) { - editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); - } - // when replName is 'alias name' of interpreter or 'group' of interpreter - if (replName.equals(intpSetting.getName()) || group.equals(intpSetting.getName())) { - editor = getEditorFromSettingByClassName(intpSetting, interpreter.getClassName()); - break; - } - } - } catch (NullPointerException e) { - // Use `debug` level because this log occurs frequently - logger.debug("Couldn't get interpreter editor setting"); - } - return editor; - } - - public Map<String, Object> getEditorFromSettingByClassName(InterpreterSetting intpSetting, - String className) { - List<InterpreterInfo> intpInfos = intpSetting.getInterpreterInfos(); - for (InterpreterInfo intpInfo : intpInfos) { - - if (className.equals(intpInfo.getClassName())) { - if (intpInfo.getEditor() == null) { - break; - } - return intpInfo.getEditor(); - } - } - return DEFAULT_EDITOR; - } - - private void loadInterpreterDependencies(final InterpreterSetting setting) { - setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - setting.setErrorReason(null); - interpreterSettings.put(setting.getId(), setting); - synchronized (interpreterSettings) { - final Thread t = new Thread() { - public void run() { - try { - // dependencies to prevent library conflict - File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + - setting.getId()); - if (localRepoDir.exists()) { - try { - FileUtils.forceDelete(localRepoDir); - } catch (FileNotFoundException e) { - logger.info("A file that does not exist cannot be deleted, nothing to worry", e); - } - } - - // load dependencies - List<Dependency> deps = setting.getDependencies(); - if (deps != null) { - for (Dependency d : deps) { - File destDir = new File( - zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); - - if (d.getExclusions() != null) { - dependencyResolver.load(d.getGroupArtifactVersion(), d.getExclusions(), - new File(destDir, setting.getId())); - } else { - dependencyResolver - .load(d.getGroupArtifactVersion(), new File(destDir, setting.getId())); - } - } - } - - setting.setStatus(InterpreterSetting.Status.READY); - setting.setErrorReason(null); - } catch (Exception e) { - logger.error(String.format("Error while downloading repos for interpreter group : %s," + - " go to interpreter setting page click on edit and save it again to make " + - "this interpreter work properly. : %s", - setting.getGroup(), e.getLocalizedMessage()), e); - setting.setErrorReason(e.getLocalizedMessage()); - setting.setStatus(InterpreterSetting.Status.ERROR); - } finally { - interpreterSettings.put(setting.getId(), setting); - } - } - }; - t.start(); - } - } - - /** - * Overwrite dependency jar under local-repo/{interpreterId} - * if jar file in original path is changed - */ - private void copyDependenciesFromLocalPath(final InterpreterSetting setting) { - setting.setStatus(InterpreterSetting.Status.DOWNLOADING_DEPENDENCIES); - interpreterSettings.put(setting.getId(), setting); - synchronized (interpreterSettings) { - final Thread t = new Thread() { - public void run() { - try { - List<Dependency> deps = setting.getDependencies(); - if (deps != null) { - for (Dependency d : deps) { - File destDir = new File( - zeppelinConfiguration.getRelativeDir(ConfVars.ZEPPELIN_DEP_LOCALREPO)); - - int numSplits = d.getGroupArtifactVersion().split(":").length; - if (!(numSplits >= 3 && numSplits <= 6)) { - dependencyResolver.copyLocalDependency(d.getGroupArtifactVersion(), - new File(destDir, setting.getId())); - } - } - } - setting.setStatus(InterpreterSetting.Status.READY); - } catch (Exception e) { - logger.error(String.format("Error while copying deps for interpreter group : %s," + - " go to interpreter setting page click on edit and save it again to make " + - "this interpreter work properly.", - setting.getGroup()), e); - setting.setErrorReason(e.getLocalizedMessage()); - setting.setStatus(InterpreterSetting.Status.ERROR); - } finally { - interpreterSettings.put(setting.getId(), setting); - } - } - }; - t.start(); - } - } - - /** - * Return ordered interpreter setting list. - * The list does not contain more than one setting from the same interpreter class. - * Order by InterpreterClass (order defined by ZEPPELIN_INTERPRETERS), Interpreter setting name - */ - public List<String> getDefaultInterpreterSettingList() { - // this list will contain default interpreter setting list - List<String> defaultSettings = new LinkedList<>(); - - // to ignore the same interpreter group - Map<String, Boolean> interpreterGroupCheck = new HashMap<>(); - - List<InterpreterSetting> sortedSettings = get(); - - for (InterpreterSetting setting : sortedSettings) { - if (defaultSettings.contains(setting.getId())) { - continue; - } - - if (!interpreterGroupCheck.containsKey(setting.getName())) { - defaultSettings.add(setting.getId()); - interpreterGroupCheck.put(setting.getName(), true); - } - } - return defaultSettings; - } - - List<RegisteredInterpreter> getRegisteredInterpreterList() { - return new ArrayList<>(Interpreter.registeredInterpreters.values()); - } - - - private boolean findDefaultInterpreter(List<InterpreterInfo> infos) { - for (InterpreterInfo interpreterInfo : infos) { - if (interpreterInfo.isDefaultInterpreter()) { - return true; - } - } - return false; - } - - public InterpreterSetting createNewSetting(String name, String group, - List<Dependency> dependencies, InterpreterOption option, Map<String, InterpreterProperty> p) - throws IOException { - if (name.indexOf(".") >= 0) { - throw new IOException("'.' is invalid for InterpreterSetting name."); - } - InterpreterSetting setting = createFromInterpreterSettingRef(group); - setting.setName(name); - setting.setGroup(group); - setting.appendDependencies(dependencies); - setting.setInterpreterOption(option); - setting.setProperties(p); - setting.setInterpreterGroupFactory(interpreterGroupFactory); - interpreterSettings.put(setting.getId(), setting); - loadInterpreterDependencies(setting); - saveToFile(); - return setting; - } - - private InterpreterSetting add(String group, InterpreterInfo interpreterInfo, - Map<String, DefaultInterpreterProperty> interpreterProperties, InterpreterOption option, - String path, InterpreterRunner runner) - throws InterpreterException, IOException, RepositoryException { - ArrayList<InterpreterInfo> infos = new ArrayList<>(); - infos.add(interpreterInfo); - return add(group, infos, new ArrayList<Dependency>(), option, interpreterProperties, path, - runner); - } - - /** - * @param group InterpreterSetting reference name - */ - public InterpreterSetting add(String group, ArrayList<InterpreterInfo> interpreterInfos, - List<Dependency> dependencies, InterpreterOption option, - Map<String, DefaultInterpreterProperty> interpreterProperties, String path, - InterpreterRunner runner) { - Preconditions.checkNotNull(group, "name should not be null"); - Preconditions.checkNotNull(interpreterInfos, "interpreterInfos should not be null"); - Preconditions.checkNotNull(dependencies, "dependencies should not be null"); - Preconditions.checkNotNull(option, "option should not be null"); - Preconditions.checkNotNull(interpreterProperties, "properties should not be null"); - - InterpreterSetting interpreterSetting; - - synchronized (interpreterSettingsRef) { - if (interpreterSettingsRef.containsKey(group)) { - interpreterSetting = interpreterSettingsRef.get(group); - - // Append InterpreterInfo - List<InterpreterInfo> infos = interpreterSetting.getInterpreterInfos(); - boolean hasDefaultInterpreter = findDefaultInterpreter(infos); - for (InterpreterInfo interpreterInfo : interpreterInfos) { - if (!infos.contains(interpreterInfo)) { - if (!hasDefaultInterpreter && interpreterInfo.isDefaultInterpreter()) { - hasDefaultInterpreter = true; - infos.add(0, interpreterInfo); - } else { - infos.add(interpreterInfo); - } - } - } - - // Append dependencies - List<Dependency> dependencyList = interpreterSetting.getDependencies(); - for (Dependency dependency : dependencies) { - if (!dependencyList.contains(dependency)) { - dependencyList.add(dependency); - } - } - - // Append properties - Map<String, DefaultInterpreterProperty> properties = - (Map<String, DefaultInterpreterProperty>) interpreterSetting.getProperties(); - for (String key : interpreterProperties.keySet()) { - if (!properties.containsKey(key)) { - properties.put(key, interpreterProperties.get(key)); - } - } - - } else { - interpreterSetting = - new InterpreterSetting(group, null, interpreterInfos, interpreterProperties, - dependencies, option, path, runner); - interpreterSettingsRef.put(group, interpreterSetting); - } - } - - if (dependencies.size() > 0) { - loadInterpreterDependencies(interpreterSetting); - } - - interpreterSetting.setInterpreterGroupFactory(interpreterGroupFactory); - return interpreterSetting; - } - - /** - * map interpreter ids into noteId - * - * @param noteId note id - * @param ids InterpreterSetting id list - */ - public void setInterpreters(String user, String noteId, List<String> ids) throws IOException { - putNoteInterpreterSettingBinding(user, noteId, ids); - } - - private void putNoteInterpreterSettingBinding(String user, String noteId, - List<String> settingList) throws IOException { - List<String> unBindedSettings = new LinkedList<>(); - - synchronized (interpreterSettings) { - List<String> oldSettings = interpreterBindings.get(noteId); - if (oldSettings != null) { - for (String oldSettingId : oldSettings) { - if (!settingList.contains(oldSettingId)) { - unBindedSettings.add(oldSettingId); - } - } - } - interpreterBindings.put(noteId, settingList); - saveToFile(); - - for (String settingId : unBindedSettings) { - InterpreterSetting setting = get(settingId); - removeInterpretersForNote(setting, user, noteId); - } - } - } - - public void removeInterpretersForNote(InterpreterSetting interpreterSetting, String user, - String noteId) { - //TODO(jl): This is only for hotfix. You should fix it as a beautiful way - InterpreterOption interpreterOption = interpreterSetting.getOption(); - if (!(InterpreterOption.SHARED.equals(interpreterOption.perNote) - && InterpreterOption.SHARED.equals(interpreterOption.perUser))) { - interpreterSetting.closeAndRemoveInterpreterGroup(noteId, ""); - } - } - - public String getInterpreterSessionKey(String user, String noteId, InterpreterSetting setting) { - InterpreterOption option = setting.getOption(); - String key; - if (option.isExistingProcess()) { - key = Constants.EXISTING_PROCESS; - } else if (option.perNoteScoped() && option.perUserScoped()) { - key = user + ":" + noteId; - } else if (option.perUserScoped()) { - key = user; - } else if (option.perNoteScoped()) { - key = noteId; - } else { - key = SHARED_SESSION; - } - - logger.debug("Interpreter session key: {}, for note: {}, user: {}, InterpreterSetting Name: " + - "{}", key, noteId, user, setting.getName()); - return key; - } - - - public List<String> getInterpreters(String noteId) { - return getNoteInterpreterSettingBinding(noteId); - } - - public void closeNote(String user, String noteId) { - // close interpreters in this note session - List<InterpreterSetting> settings = getInterpreterSettings(noteId); - if (settings == null || settings.size() == 0) { - return; - } - - logger.info("closeNote: {}", noteId); - for (InterpreterSetting setting : settings) { - removeInterpretersForNote(setting, user, noteId); - } - } - - public Map<String, InterpreterSetting> getAvailableInterpreterSettings() { - return interpreterSettingsRef; - } - - private URL[] recursiveBuildLibList(File path) throws MalformedURLException { - URL[] urls = new URL[0]; - if (path == null || !path.exists()) { - return urls; - } else if (path.getName().startsWith(".")) { - return urls; - } else if (path.isDirectory()) { - File[] files = path.listFiles(); - if (files != null) { - for (File f : files) { - urls = (URL[]) ArrayUtils.addAll(urls, recursiveBuildLibList(f)); - } - } - return urls; - } else { - return new URL[]{path.toURI().toURL()}; - } - } - - public List<RemoteRepository> getRepositories() { - return this.interpreterRepositories; - } - - public void addRepository(String id, String url, boolean snapshot, Authentication auth, - Proxy proxy) throws IOException { - dependencyResolver.addRepo(id, url, snapshot, auth, proxy); - saveToFile(); - } - - public void removeRepository(String id) throws IOException { - dependencyResolver.delRepo(id); - saveToFile(); - } - - public void removeNoteInterpreterSettingBinding(String user, String noteId) throws IOException { - List<String> settingIds = interpreterBindings.remove(noteId); - if (settingIds != null) { - for (String settingId : settingIds) { - InterpreterSetting setting = get(settingId); - if (setting != null) { - this.removeInterpretersForNote(setting, user, noteId); - } - } - } - saveToFile(); - } - - /** - * Change interpreter property and restart - */ - public void setPropertyAndRestart(String id, InterpreterOption option, - Map<String, InterpreterProperty> properties, - List<Dependency> dependencies) throws IOException { - synchronized (interpreterSettings) { - InterpreterSetting intpSetting = interpreterSettings.get(id); - if (intpSetting != null) { - try { - stopJobAllInterpreter(intpSetting); - - intpSetting.closeAndRemoveAllInterpreterGroups(); - intpSetting.setOption(option); - intpSetting.setProperties(properties); - intpSetting.setDependencies(dependencies); - loadInterpreterDependencies(intpSetting); - - saveToFile(); - } catch (Exception e) { - loadFromFile(); - throw e; - } - } else { - throw new InterpreterException("Interpreter setting id " + id + " not found"); - } - } - } - - public void restart(String settingId, String noteId, String user) { - InterpreterSetting intpSetting = interpreterSettings.get(settingId); - Preconditions.checkNotNull(intpSetting); - synchronized (interpreterSettings) { - intpSetting = interpreterSettings.get(settingId); - // Check if dependency in specified path is changed - // If it did, overwrite old dependency jar with new one - if (intpSetting != null) { - //clean up metaInfos - intpSetting.setInfos(null); - copyDependenciesFromLocalPath(intpSetting); - - stopJobAllInterpreter(intpSetting); - if (user.equals("anonymous")) { - intpSetting.closeAndRemoveAllInterpreterGroups(); - } else { - intpSetting.closeAndRemoveInterpreterGroup(noteId, user); - } - - } else { - throw new InterpreterException("Interpreter setting id " + settingId + " not found"); - } - } - } - - public void restart(String id) { - restart(id, "", "anonymous"); - } - - private void stopJobAllInterpreter(InterpreterSetting intpSetting) { - if (intpSetting != null) { - for (InterpreterGroup intpGroup : intpSetting.getAllInterpreterGroups()) { - for (List<Interpreter> interpreters : intpGroup.values()) { - for (Interpreter intp : interpreters) { - for (Job job : intp.getScheduler().getJobsRunning()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); - } - for (Job job : intp.getScheduler().getJobsWaiting()) { - job.abort(); - job.setStatus(Status.ABORT); - logger.info("Job " + job.getJobName() + " aborted "); - } - } - } - } - } - } - - public InterpreterSetting get(String name) { - synchronized (interpreterSettings) { - return interpreterSettings.get(name); - } - } - - public void remove(String id) throws IOException { - synchronized (interpreterSettings) { - if (interpreterSettings.containsKey(id)) { - InterpreterSetting intp = interpreterSettings.get(id); - intp.closeAndRemoveAllInterpreterGroups(); - - interpreterSettings.remove(id); - for (List<String> settings : interpreterBindings.values()) { - Iterator<String> it = settings.iterator(); - while (it.hasNext()) { - String settingId = it.next(); - if (settingId.equals(id)) { - it.remove(); - } - } - } - saveToFile(); - } - } - - File localRepoDir = new File(zeppelinConfiguration.getInterpreterLocalRepoPath() + "/" + id); - FileUtils.deleteDirectory(localRepoDir); - } - - /** - * Get interpreter settings - */ - public List<InterpreterSetting> get() { - synchronized (interpreterSettings) { - List<InterpreterSetting> orderedSettings = new LinkedList<>(); - - Map<String, List<InterpreterSetting>> nameInterpreterSettingMap = new HashMap<>(); - for (InterpreterSetting interpreterSetting : interpreterSettings.values()) { - String group = interpreterSetting.getGroup(); - if (!nameInterpreterSettingMap.containsKey(group)) { - nameInterpreterSettingMap.put(group, new ArrayList<InterpreterSetting>()); - } - nameInterpreterSettingMap.get(group).add(interpreterSetting); - } - - for (String groupName : interpreterGroupOrderList) { - List<InterpreterSetting> interpreterSettingList = - nameInterpreterSettingMap.remove(groupName); - if (null != interpreterSettingList) { - for (InterpreterSetting interpreterSetting : interpreterSettingList) { - orderedSettings.add(interpreterSetting); - } - } - } - - List<InterpreterSetting> settings = new ArrayList<>(); - - for (List<InterpreterSetting> interpreterSettingList : nameInterpreterSettingMap.values()) { - for (InterpreterSetting interpreterSetting : interpreterSettingList) { - settings.add(interpreterSetting); - } - } - - Collections.sort(settings, new Comparator<InterpreterSetting>() { - @Override - public int compare(InterpreterSetting o1, InterpreterSetting o2) { - return o1.getName().compareTo(o2.getName()); - } - }); - - orderedSettings.addAll(settings); - - return orderedSettings; - } - } - - public void close(InterpreterSetting interpreterSetting) { - interpreterSetting.closeAndRemoveAllInterpreterGroups(); - } - - public void close() { - List<Thread> closeThreads = new LinkedList<>(); - synchronized (interpreterSettings) { - Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { - Thread t = new Thread() { - public void run() { - intpSetting.closeAndRemoveAllInterpreterGroups(); - } - }; - t.start(); - closeThreads.add(t); - } - } - - for (Thread t : closeThreads) { - try { - t.join(); - } catch (InterruptedException e) { - logger.error("Can't close interpreterGroup", e); - } - } - } - - public void shutdown() { - List<Thread> closeThreads = new LinkedList<>(); - synchronized (interpreterSettings) { - Collection<InterpreterSetting> intpSettings = interpreterSettings.values(); - for (final InterpreterSetting intpSetting : intpSettings) { - Thread t = new Thread() { - public void run() { - intpSetting.shutdownAndRemoveAllInterpreterGroups(); - } - }; - t.start(); - closeThreads.add(t); - } - } - - for (Thread t : closeThreads) { - try { - t.join(); - } catch (InterruptedException e) { - logger.error("Can't close interpreterGroup", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java deleted file mode 100644 index 3838f63..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java +++ /dev/null @@ -1,290 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.interpreter.install; - -import org.apache.commons.io.FileUtils; -import org.apache.log4j.ConsoleAppender; -import org.apache.log4j.Logger; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.dep.DependencyResolver; -import org.apache.zeppelin.util.Util; -import org.sonatype.aether.RepositoryException; -import java.io.File; -import java.io.IOException; -import java.net.URL; -import java.util.LinkedList; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Commandline utility to install interpreter from maven repository - */ -public class InstallInterpreter { - private final File interpreterListFile; - private final File interpreterBaseDir; - private final List<AvailableInterpreterInfo> availableInterpreters; - private final String localRepoDir; - private URL proxyUrl; - private String proxyUser; - private String proxyPassword; - - /** - * - * @param interpreterListFile - * @param interpreterBaseDir interpreter directory for installing binaries - * @throws IOException - */ - public InstallInterpreter(File interpreterListFile, File interpreterBaseDir, String localRepoDir) - throws IOException { - this.interpreterListFile = interpreterListFile; - this.interpreterBaseDir = interpreterBaseDir; - this.localRepoDir = localRepoDir; - availableInterpreters = new LinkedList<>(); - readAvailableInterpreters(); - } - - - /** - * Information for available informations - */ - private static class AvailableInterpreterInfo { - public final String name; - public final String artifact; - public final String description; - - public AvailableInterpreterInfo(String name, String artifact, String description) { - this.name = name; - this.artifact = artifact; - this.description = description; - } - } - - private void readAvailableInterpreters() throws IOException { - if (!interpreterListFile.isFile()) { - System.err.println("Can't find interpreter list " + interpreterListFile.getAbsolutePath()); - return; - } - String text = FileUtils.readFileToString(interpreterListFile); - String[] lines = text.split("\n"); - - Pattern pattern = Pattern.compile("(\\S+)\\s+(\\S+)\\s+(.*)"); - - int lineNo = 0; - for (String line : lines) { - lineNo++; - if (line == null || line.length() == 0 || line.startsWith("#")) { - continue; - } - - Matcher match = pattern.matcher(line); - if (match.groupCount() != 3) { - System.err.println("Error on line " + lineNo + ", " + line); - continue; - } - - match.find(); - - String name = match.group(1); - String artifact = match.group(2); - String description = match.group(3); - - availableInterpreters.add(new AvailableInterpreterInfo(name, artifact, description)); - } - } - - public List<AvailableInterpreterInfo> list() { - for (AvailableInterpreterInfo info : availableInterpreters) { - System.out.println(info.name + "\t\t\t" + info.description); - } - - return availableInterpreters; - } - - public void installAll() { - for (AvailableInterpreterInfo info : availableInterpreters) { - install(info.name, info.artifact); - } - } - - public void install(String [] names) { - for (String name : names) { - install(name); - } - } - - public void install(String name) { - // find artifact name - for (AvailableInterpreterInfo info : availableInterpreters) { - if (name.equals(info.name)) { - install(name, info.artifact); - return; - } - } - - throw new RuntimeException("Can't find interpreter '" + name + "'"); - } - - public void install(String [] names, String [] artifacts) { - if (names.length != artifacts.length) { - throw new RuntimeException("Length of given names and artifacts are different"); - } - - for (int i = 0; i < names.length; i++) { - install(names[i], artifacts[i]); - } - } - - public void install(String name, String artifact) { - DependencyResolver depResolver = new DependencyResolver(localRepoDir); - if (proxyUrl != null) { - depResolver.setProxy(proxyUrl, proxyUser, proxyPassword); - } - - File installDir = new File(interpreterBaseDir, name); - if (installDir.exists()) { - System.err.println("Directory " + installDir.getAbsolutePath() - + " already exists" - + "\n\nSkipped"); - return; - } - - System.out.println("Install " + name + "(" + artifact + ") to " - + installDir.getAbsolutePath() + " ... "); - - try { - depResolver.load(artifact, installDir); - System.out.println("Interpreter " + name + " installed under " + - installDir.getAbsolutePath() + "."); - startTip(); - } catch (RepositoryException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - } - - public void setProxy(URL proxyUrl, String proxyUser, String proxyPassword) { - this.proxyUrl = proxyUrl; - this.proxyUser = proxyUser; - this.proxyPassword = proxyPassword; - } - - public static void usage() { - System.out.println("Options"); - System.out.println(" -l, --list List available interpreters"); - System.out.println(" -a, --all Install all available interpreters"); - System.out.println(" -n, --name [NAMES] Install interpreters (comma separated " + - "list)" + - "e.g. md,shell,jdbc,python,angular"); - System.out.println(" -t, --artifact [ARTIFACTS] (Optional with -n) custom artifact names" + - ". " + - "(comma separated list correspond to --name) " + - "e.g. customGroup:customArtifact:customVersion"); - System.out.println(" --proxy-url [url] (Optional) proxy url. http(s)://host:port"); - System.out.println(" --proxy-user [user] (Optional) proxy user"); - System.out.println(" --proxy-password [password] (Optional) proxy password"); - } - - public static void main(String [] args) throws IOException { - if (args.length == 0) { - usage(); - return; - } - - ZeppelinConfiguration conf = ZeppelinConfiguration.create(); - InstallInterpreter installer = new InstallInterpreter( - new File(conf.getInterpreterListPath()), - new File(conf.getInterpreterDir()), - conf.getInterpreterLocalRepoPath()); - - String names = null; - String artifacts = null; - URL proxyUrl = null; - String proxyUser = null; - String proxyPassword = null; - boolean all = false; - - for (int i = 0; i < args.length; i++) { - String arg = args[i].toLowerCase(Locale.US); - switch (arg) { - case "--list": - case "-l": - installer.list(); - System.exit(0); - break; - case "--all": - case "-a": - all = true; - break; - case "--name": - case "-n": - names = args[++i]; - break; - case "--artifact": - case "-t": - artifacts = args[++i]; - break; - case "--version": - case "-v": - Util.getVersion(); - break; - case "--proxy-url": - proxyUrl = new URL(args[++i]); - break; - case "--proxy-user": - proxyUser = args[++i]; - break; - case "--proxy-password": - proxyPassword = args[++i]; - break; - case "--help": - case "-h": - usage(); - System.exit(0); - break; - default: - System.out.println("Unknown option " + arg); - } - } - - if (proxyUrl != null) { - installer.setProxy(proxyUrl, proxyUser, proxyPassword); - } - - if (all) { - installer.installAll(); - System.exit(0); - } - - if (names != null) { - if (artifacts != null) { - installer.install(names.split(","), artifacts.split(",")); - } else { - installer.install(names.split(",")); - } - } - } - - private static void startTip() { - System.out.println("\n1. Restart Zeppelin" - + "\n2. Create interpreter setting in 'Interpreter' menu on Zeppelin GUI" - + "\n3. Then you can bind the interpreter on your note"); - } -} http://git-wip-us.apache.org/repos/asf/zeppelin/blob/8d4902e7/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java ---------------------------------------------------------------------- diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java deleted file mode 100644 index 0ac7116..0000000 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.remote; - -import java.util.List; - -import org.apache.thrift.TException; -import org.apache.zeppelin.display.AngularObject; -import org.apache.zeppelin.display.AngularObjectRegistry; -import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.Gson; - -/** - * Proxy for AngularObjectRegistry that exists in remote interpreter process - */ -public class RemoteAngularObjectRegistry extends AngularObjectRegistry { - Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); - private InterpreterGroup interpreterGroup; - - public RemoteAngularObjectRegistry(String interpreterId, - AngularObjectRegistryListener listener, - InterpreterGroup interpreterGroup) { - super(interpreterId, listener); - this.interpreterGroup = interpreterGroup; - } - - private RemoteInterpreterProcess getRemoteInterpreterProcess() { - return interpreterGroup.getRemoteInterpreterProcess(); - } - - /** - * When ZeppelinServer side code want to add angularObject to the registry, - * this method should be used instead of add() - * @param name - * @param o - * @param noteId - * @return - */ - public AngularObject addAndNotifyRemoteProcess(String name, Object o, String noteId, String - paragraphId) { - Gson gson = new Gson(); - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); - if (!remoteInterpreterProcess.isRunning()) { - return super.add(name, o, noteId, paragraphId, true); - } - - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); - return super.add(name, o, noteId, paragraphId, true); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); - } - } - return null; - } - - /** - * When ZeppelinServer side code want to remove angularObject from the registry, - * this method should be used instead of remove() - * @param name - * @param noteId - * @param paragraphId - * @return - */ - public AngularObject removeAndNotifyRemoteProcess(String name, String noteId, String - paragraphId) { - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); - if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { - return super.remove(name, noteId, paragraphId); - } - - Client client = null; - boolean broken = false; - try { - client = remoteInterpreterProcess.getClient(); - client.angularObjectRemove(name, noteId, paragraphId); - return super.remove(name, noteId, paragraphId); - } catch (TException e) { - broken = true; - logger.error("Error", e); - } catch (Exception e) { - logger.error("Error", e); - } finally { - if (client != null) { - remoteInterpreterProcess.releaseClient(client, broken); - } - } - return null; - } - - public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { - List<AngularObject> all = getAll(noteId, paragraphId); - for (AngularObject ao : all) { - removeAndNotifyRemoteProcess(ao.getName(), noteId, paragraphId); - } - } - - @Override - protected AngularObject createNewAngularObject(String name, Object o, String noteId, String - paragraphId) { - return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, - getAngularObjectListener()); - } -}