This is an automated email from the ASF dual-hosted git repository. jongyoul pushed a commit to branch ZEPPELIN-6400-remove-zepconf-from-interpreter in repository https://gitbox.apache.org/repos/asf/zeppelin.git
commit a3f6c6730b3706876cd3e109a6bd2847713be574 Author: Jongyoul Lee <[email protected]> AuthorDate: Fri Mar 6 17:48:53 2026 +0900 [ZEPPELIN-6400] Remove ZeppelinConfiguration dependency from zeppelin-interpreter module Move ZeppelinConfiguration to zeppelin-zengine so it is no longer included in the shaded interpreter JAR. This prevents the Maven shade plugin from corrupting config string literals, which caused classpath- order-dependent configuration loading failures. Key changes: - Replace ZeppelinConfiguration usage in zeppelin-interpreter with Properties-based configuration - Update InterpreterLauncher, LifecycleManager, RecoveryStorage, DependencyResolver to accept Properties instead of ZeppelinConfiguration - Update all launcher plugins (Docker, K8s, YARN, Flink) accordingly - Move ZeppelinConfiguration.java and ZeppelinLocationStrategy.java from zeppelin-interpreter to zeppelin-zengine - Update callers in zeppelin-zengine, zeppelin-server, flink, and markdown interpreters Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../apache/zeppelin/flink/FlinkInterpreter.java | 5 +-- .../zeppelin/flink/FlinkScala212Interpreter.scala | 6 +-- .../zeppelin/flink/FlinkScalaInterpreter.scala | 11 +++-- .../apache/zeppelin/markdown/FlexmarkParser.java | 5 +-- .../org/apache/zeppelin/markdown/Markdown.java | 4 +- .../zeppelin/markdown/FlexmarkParserTest.java | 2 - .../zeppelin/dep/AbstractDependencyResolver.java | 23 +++++----- .../main/java/org/apache/zeppelin/dep/Booter.java | 5 +-- .../apache/zeppelin/dep/DependencyResolver.java | 10 ++--- .../apache/zeppelin/interpreter/Interpreter.java | 6 --- .../zeppelin/interpreter/InterpreterOption.java | 12 +++--- .../zeppelin/interpreter/LazyOpenInterpreter.java | 6 --- .../zeppelin/interpreter/LifecycleManager.java | 9 ++-- .../interpreter/launcher/InterpreterLauncher.java | 43 ++++++++++++------- .../lifecycle/NullLifecycleManager.java | 7 ++-- .../lifecycle/TimeoutLifecycleManager.java | 21 ++++++---- .../interpreter/recovery/RecoveryStorage.java | 7 +--- .../remote/RemoteInterpreterServer.java | 34 ++++++++------- .../zeppelin/scheduler/SchedulerFactory.java | 16 +++++-- .../zeppelin/conf/ZeppelinConfigurationTest.java | 49 ---------------------- .../java/org/apache/zeppelin/dep/BooterTest.java | 5 +-- .../zeppelin/dep/DependencyResolverTest.java | 3 +- .../zeppelin/helium/ApplicationLoaderTest.java | 3 +- .../remote/RemoteInterpreterServerTest.java | 2 +- .../launcher/DockerInterpreterLauncher.java | 16 +++---- .../launcher/DockerInterpreterProcess.java | 31 +++++++------- .../launcher/DockerInterpreterProcessTest.java | 29 ++++++++----- .../launcher/FlinkInterpreterLauncher.java | 7 ++-- .../launcher/K8sStandardInterpreterLauncher.java | 22 +++++----- .../zeppelin/interpreter/launcher/K8sUtils.java | 10 ++--- .../K8sStandardInterpreterLauncherTest.java | 12 ++++-- .../launcher/YarnInterpreterLauncher.java | 8 ++-- .../launcher/YarnRemoteInterpreterProcess.java | 15 ++++--- .../zeppelin/service/InterpreterService.java | 8 +++- .../zeppelin/service/InterpreterServiceTest.java | 2 +- .../zeppelin/conf/ZeppelinConfiguration.java | 0 .../zeppelin/conf/ZeppelinLocationStrategy.java | 0 .../zeppelin/interpreter/InterpreterSetting.java | 4 +- .../interpreter/InterpreterSettingManager.java | 5 ++- .../interpreter/install/InstallInterpreter.java | 3 +- .../launcher/SparkInterpreterLauncher.java | 33 ++++++++------- .../launcher/StandardInterpreterLauncher.java | 20 +++++---- .../recovery/FileSystemRecoveryStorage.java | 4 +- .../interpreter/recovery/LocalRecoveryStorage.java | 7 +++- .../interpreter/recovery/NullRecoveryStorage.java | 2 +- .../interpreter/remote/RemoteInterpreter.java | 1 - .../org/apache/zeppelin/plugin/PluginManager.java | 20 +++++++-- .../interpreter/InterpreterSettingTest.java | 2 +- .../launcher/SparkInterpreterLauncherTest.java | 41 +++++++++++------- .../launcher/StandardInterpreterLauncherTest.java | 20 ++++++--- 50 files changed, 320 insertions(+), 296 deletions(-) diff --git a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java index 244d00f49c..fa37e0fb30 100644 --- a/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java +++ b/flink/flink-scala-2.12/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java @@ -22,7 +22,6 @@ import org.apache.flink.api.scala.ExecutionEnvironment; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment; import org.apache.flink.table.api.TableEnvironment; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.slf4j.Logger; @@ -91,8 +90,8 @@ public class FlinkInterpreter extends Interpreter { Class<?> clazz = Class.forName(innerIntpClassName); return (FlinkScalaInterpreter) - clazz.getConstructor(Properties.class, ClassLoader.class, ZeppelinConfiguration.class) - .newInstance(getProperties(), flinkScalaClassLoader, zConf); + clazz.getConstructor(Properties.class, ClassLoader.class) + .newInstance(getProperties(), flinkScalaClassLoader); } @Override diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala index 070daf6a2b..b7af4ad3b1 100644 --- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala +++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScala212Interpreter.scala @@ -24,15 +24,13 @@ import java.util.Properties import org.apache.zeppelin.interpreter.InterpreterContext import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion -import org.apache.zeppelin.conf.ZeppelinConfiguration import scala.tools.nsc.Settings import scala.tools.nsc.interpreter.{IMain, JPrintWriter} class FlinkScala212Interpreter(override val properties: Properties, - override val flinkScalaClassLoader: ClassLoader, - override val zConf: ZeppelinConfiguration) - extends FlinkScalaInterpreter(properties, flinkScalaClassLoader, zConf) { + override val flinkScalaClassLoader: ClassLoader) + extends FlinkScalaInterpreter(properties, flinkScalaClassLoader) { override def completion(buf: String, cursor: Int, diff --git a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala index c2ceb60360..a11003b8bc 100644 --- a/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala +++ b/flink/flink-scala-2.12/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala @@ -44,7 +44,6 @@ import org.apache.flink.table.catalog.hive.HiveCatalog import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableAggregateFunction, TableFunction} import org.apache.flink.table.module.hive.HiveModule import org.apache.flink.yarn.cli.FlinkYarnSessionCli -import org.apache.zeppelin.conf.ZeppelinConfiguration import org.apache.zeppelin.dep.DependencyResolver import org.apache.zeppelin.flink.internal.FlinkShell import org.apache.zeppelin.flink.internal.FlinkShell._ @@ -66,8 +65,7 @@ import scala.tools.nsc.interpreter.{Completion, IMain, IR, JPrintWriter, Results * @param properties */ abstract class FlinkScalaInterpreter(val properties: Properties, - val flinkScalaClassLoader: ClassLoader, - val zConf: ZeppelinConfiguration) { + val flinkScalaClassLoader: ClassLoader) { private lazy val LOGGER: Logger = LoggerFactory.getLogger(getClass) @@ -800,7 +798,12 @@ abstract class FlinkScalaInterpreter(val properties: Properties, val flinkPackageJars = if (!StringUtils.isBlank(properties.getProperty("flink.execution.packages", ""))) { val packages = properties.getProperty("flink.execution.packages") - val dependencyResolver = new DependencyResolver(System.getProperty("user.home") + "/.m2/repository", zConf) + val dependencyResolver = new DependencyResolver( + System.getProperty("user.home") + "/.m2/repository", + properties.getProperty("zeppelin.proxy.url"), + properties.getProperty("zeppelin.proxy.user"), + properties.getProperty("zeppelin.proxy.password"), + properties.getProperty("zeppelin.interpreter.dep.mvnRepo")) packages.split(",") .flatMap(e => JavaConversions.asScalaBuffer(dependencyResolver.load(e))) .map(e => e.getAbsolutePath).toSeq diff --git a/markdown/src/main/java/org/apache/zeppelin/markdown/FlexmarkParser.java b/markdown/src/main/java/org/apache/zeppelin/markdown/FlexmarkParser.java index 82c5b9d354..47fc6b3951 100644 --- a/markdown/src/main/java/org/apache/zeppelin/markdown/FlexmarkParser.java +++ b/markdown/src/main/java/org/apache/zeppelin/markdown/FlexmarkParser.java @@ -28,7 +28,6 @@ import com.vladsch.flexmark.util.ast.Node; import com.vladsch.flexmark.html.HtmlRenderer; import com.vladsch.flexmark.parser.Parser; import com.vladsch.flexmark.util.data.MutableDataSet; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +43,7 @@ public class FlexmarkParser implements MarkdownParser { private Parser parser; private HtmlRenderer renderer; - public FlexmarkParser(ZeppelinConfiguration zConf) { + public FlexmarkParser(boolean escapeHtml) { MutableDataSet options = new MutableDataSet(); options.set(Parser.EXTENSIONS, Arrays.asList(StrikethroughExtension.create(), TablesExtension.create(), @@ -55,7 +54,7 @@ public class FlexmarkParser implements MarkdownParser { EmojiExtension.create())); options.set(HtmlRenderer.SOFT_BREAK, "<br />\n"); options.set(EmojiExtension.USE_IMAGE_TYPE, UNICODE_ONLY); - options.set(HtmlRenderer.ESCAPE_HTML, zConf.isZeppelinNotebookMarkdownEscapeHtml()); + options.set(HtmlRenderer.ESCAPE_HTML, escapeHtml); parser = Parser.builder(options).build(); renderer = HtmlRenderer.builder(options).build(); } diff --git a/markdown/src/main/java/org/apache/zeppelin/markdown/Markdown.java b/markdown/src/main/java/org/apache/zeppelin/markdown/Markdown.java index 46733e43a0..8d51fa8140 100644 --- a/markdown/src/main/java/org/apache/zeppelin/markdown/Markdown.java +++ b/markdown/src/main/java/org/apache/zeppelin/markdown/Markdown.java @@ -74,7 +74,9 @@ public class Markdown extends Interpreter { LOGGER.debug("Creating {} markdown interpreter", parserType); if (MarkdownParserType.FLEXMARK.toString().equals(parserType)) { - return new FlexmarkParser(zConf); + boolean escapeHtml = Boolean.parseBoolean( + getProperty("zeppelin.notebook.markdown.escape.html", "true")); + return new FlexmarkParser(escapeHtml); } else { // default parser return new Markdown4jParser(); diff --git a/markdown/src/test/java/org/apache/zeppelin/markdown/FlexmarkParserTest.java b/markdown/src/test/java/org/apache/zeppelin/markdown/FlexmarkParserTest.java index 01540b09c4..ef9327e89e 100644 --- a/markdown/src/test/java/org/apache/zeppelin/markdown/FlexmarkParserTest.java +++ b/markdown/src/test/java/org/apache/zeppelin/markdown/FlexmarkParserTest.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.markdown; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterResult; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -44,7 +43,6 @@ class FlexmarkParserTest { Properties props = new Properties(); props.put(Markdown.MARKDOWN_PARSER_TYPE, Markdown.PARSER_TYPE_FLEXMARK); md = new Markdown(props); - md.setZeppelinConfiguration(ZeppelinConfiguration.load()); md.open(); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java index b44e096e4a..1b3693a20e 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/AbstractDependencyResolver.java @@ -24,7 +24,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.eclipse.aether.RepositorySystem; import org.eclipse.aether.RepositorySystemSession; import org.eclipse.aether.repository.Authentication; @@ -49,26 +48,28 @@ public abstract class AbstractDependencyResolver { protected RepositorySystemSession session; private Proxy proxy = null; - protected AbstractDependencyResolver(String localRepoPath, ZeppelinConfiguration zConf) { - if (zConf.getZeppelinProxyUrl() != null) { + protected AbstractDependencyResolver(String localRepoPath, + String proxyUrl, String proxyUser, String proxyPassword, String mvnRepoUrl) { + if (proxyUrl != null) { try { - URL proxyUrl = new URL(zConf.getZeppelinProxyUrl()); - Authentication auth = new AuthenticationBuilder().addUsername(zConf.getZeppelinProxyUser()).addPassword(zConf.getZeppelinProxyPassword()).build(); - proxy = new Proxy(proxyUrl.getProtocol(), proxyUrl.getHost(), proxyUrl.getPort(), auth); + URL parsedProxyUrl = new URL(proxyUrl); + Authentication auth = new AuthenticationBuilder() + .addUsername(proxyUser).addPassword(proxyPassword).build(); + proxy = new Proxy(parsedProxyUrl.getProtocol(), parsedProxyUrl.getHost(), + parsedProxyUrl.getPort(), auth); } catch (MalformedURLException e) { - LOGGER.error("Proxy Url {} is not valid - skipping Proxy config", zConf.getZeppelinProxyUrl(), e); + LOGGER.error("Proxy Url {} is not valid - skipping Proxy config", proxyUrl, e); } } session = Booter.newRepositorySystemSession(system, localRepoPath); - repos.addAll(Booter.newCentralRepositorys(proxy, zConf)); // add maven central + repos.addAll(Booter.newCentralRepositorys(proxy, mvnRepoUrl)); // add maven central repos.add(Booter.newLocalRepository()); } - protected AbstractDependencyResolver(String localRepoPath, Proxy proxy, - ZeppelinConfiguration zConf) { + protected AbstractDependencyResolver(String localRepoPath, Proxy proxy, String mvnRepoUrl) { this.proxy = proxy; session = Booter.newRepositorySystemSession(system, localRepoPath); - repos.addAll(Booter.newCentralRepositorys(proxy, zConf)); // add maven central + repos.addAll(Booter.newCentralRepositorys(proxy, mvnRepoUrl)); // add maven central repos.add(Booter.newLocalRepository()); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java index b0e89b96a4..16822c6ae7 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/Booter.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.dep; import org.apache.commons.lang3.Validate; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.maven.repository.internal.MavenRepositorySystemUtils; import org.eclipse.aether.DefaultRepositorySystemSession; import org.eclipse.aether.RepositorySystem; @@ -83,10 +82,10 @@ public class Booter { } public static List<RemoteRepository> newCentralRepositorys(Proxy proxy, - ZeppelinConfiguration zConf) { + String mvnRepoUrl) { String mvnRepoEnv = System.getenv("ZEPPELIN_INTERPRETER_DEP_MVNREPO"); if (mvnRepoEnv == null) { - mvnRepoEnv = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO); + mvnRepoEnv = mvnRepoUrl; } if (mvnRepoEnv == null) { mvnRepoEnv = "https://repo1.maven.org/maven2/"; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java index cd7eea5a05..e513ac2b66 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/dep/DependencyResolver.java @@ -27,7 +27,6 @@ import java.util.List; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.eclipse.aether.RepositoryException; import org.eclipse.aether.artifact.Artifact; import org.eclipse.aether.artifact.DefaultArtifact; @@ -56,12 +55,13 @@ public class DependencyResolver extends AbstractDependencyResolver { "org.apache.zeppelin:zeppelin-interpreter", "org.apache.zeppelin:zeppelin-server"}; - public DependencyResolver(String localRepoPath, ZeppelinConfiguration zConf) { - super(localRepoPath, zConf); + public DependencyResolver(String localRepoPath, + String proxyUrl, String proxyUser, String proxyPassword, String mvnRepoUrl) { + super(localRepoPath, proxyUrl, proxyUser, proxyPassword, mvnRepoUrl); } - public DependencyResolver(String localRepoPath, Proxy proxy, ZeppelinConfiguration zConf) { - super(localRepoPath, proxy, zConf); + public DependencyResolver(String localRepoPath, Proxy proxy, String mvnRepoUrl) { + super(localRepoPath, proxy, mvnRepoUrl); } public List<File> load(String artifact) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java index 9c3c9e59f5..bf09cb7106 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/Interpreter.java @@ -22,7 +22,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.zeppelin.annotation.Experimental; import org.apache.zeppelin.annotation.ZeppelinApi; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; import org.apache.zeppelin.scheduler.SchedulerFactory; @@ -150,7 +149,6 @@ public abstract class Interpreter { private URL[] classloaderUrls; protected Properties properties; protected String userName; - protected ZeppelinConfiguration zConf; @ZeppelinApi public Interpreter(Properties properties) { @@ -200,10 +198,6 @@ public abstract class Interpreter { return this.userName; } - public void setZeppelinConfiguration(ZeppelinConfiguration zConf) { - this.zConf = zConf; - } - public void setInterpreterGroup(InterpreterGroup interpreterGroup) { this.interpreterGroup = interpreterGroup; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java index f7b17e362e..116d260488 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOption.java @@ -20,8 +20,6 @@ package org.apache.zeppelin.interpreter; import java.util.ArrayList; import java.util.List; -import org.apache.zeppelin.conf.ZeppelinConfiguration; - /** * */ @@ -29,7 +27,7 @@ public class InterpreterOption { public static final transient String SHARED = "shared"; public static final transient String SCOPED = "scoped"; public static final transient String ISOLATED = "isolated"; - private transient ZeppelinConfiguration zConf; + private transient boolean usernameForceLowerCase; // always set it as true, keep this field just for backward compatibility boolean remote = true; @@ -60,8 +58,8 @@ public class InterpreterOption { this.host = host; } - public void setConf(ZeppelinConfiguration zConf) { - this.zConf = zConf; + public void setUsernameForceLowerCase(boolean usernameForceLowerCase) { + this.usernameForceLowerCase = usernameForceLowerCase; } public boolean permissionIsSet() { @@ -73,7 +71,7 @@ public class InterpreterOption { } public List<String> getOwners() { - if (null != owners && zConf.isUsernameForceLowerCase()) { + if (null != owners && usernameForceLowerCase) { List<String> lowerCaseUsers = new ArrayList<>(); for (String owner : owners) { lowerCaseUsers.add(owner.toLowerCase()); @@ -117,7 +115,7 @@ public class InterpreterOption { option.setPermission = other.setPermission; option.owners = (null == other.owners) ? new ArrayList<>() : new ArrayList<>(other.owners); - option.zConf = other.zConf; + option.usernameForceLowerCase = other.usernameForceLowerCase; return option; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java index ec236ce1f2..f794a58283 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LazyOpenInterpreter.java @@ -21,7 +21,6 @@ import java.net.URL; import java.util.List; import java.util.Properties; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.scheduler.Scheduler; @@ -205,11 +204,6 @@ public class LazyOpenInterpreter this.intp.setUserName(userName); } - @Override - public void setZeppelinConfiguration(ZeppelinConfiguration zConf) { - this.intp.setZeppelinConfiguration(zConf); - } - @Override public String getUserName() { return this.intp.getUserName(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java index 8f44b2f962..71528adf6d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/LifecycleManager.java @@ -19,19 +19,20 @@ package org.apache.zeppelin.interpreter; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; +import java.util.Properties; + /** * Abstract base class for managing the lifecycle of interpreters */ public abstract class LifecycleManager { - protected ZeppelinConfiguration zConf; + protected Properties properties; protected RemoteInterpreterServer remoteInterpreterServer; - public LifecycleManager(ZeppelinConfiguration zConf, RemoteInterpreterServer remoteInterpreterServer) { - this.zConf = zConf; + public LifecycleManager(Properties properties, RemoteInterpreterServer remoteInterpreterServer) { + this.properties = properties; this.remoteInterpreterServer = remoteInterpreterServer; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 7190bea489..79accd19b4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -20,26 +20,33 @@ package org.apache.zeppelin.interpreter.launcher; import java.io.IOException; import java.util.Properties; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE; - /** * Component to Launch interpreter process. */ public abstract class InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(InterpreterLauncher.class); - private static final String SPECIAL_CHARACTER="{}()<>&*β|=?;[]$β#~!.\"%/\\:+,`"; + private static final String SPECIAL_CHARACTER="{}()<>&*'|=?;[]$β#~!.\"%/\\:+,`"; + + private static final String CONNECT_TIMEOUT_KEY = "zeppelin.interpreter.connect.timeout"; + private static final long DEFAULT_CONNECT_TIMEOUT = 600000L; + + private static final String CONNECTION_POOL_SIZE_KEY = "zeppelin.interpreter.connection.poolsize"; + private static final int DEFAULT_CONNECTION_POOL_SIZE = 100; - protected final ZeppelinConfiguration zConf; + private static final String RECOVERY_ENABLED_KEY = "zeppelin.recovery.storage.class"; + private static final String NULL_RECOVERY_CLASS = + "org.apache.zeppelin.interpreter.recovery.NullRecoveryStorage"; + + protected final Properties zProperties; protected final RecoveryStorage recoveryStorage; - protected InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - this.zConf = zConf; + protected InterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + this.zProperties = zProperties; this.recoveryStorage = recoveryStorage; } @@ -49,21 +56,20 @@ public abstract class InterpreterLauncher { * @return */ protected int getConnectTimeout(InterpreterLaunchContext context) { - int connectTimeout = - (int) zConf.getTime(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + int connectTimeout = (int) Long.parseLong( + zProperties.getProperty(CONNECT_TIMEOUT_KEY, + String.valueOf(DEFAULT_CONNECT_TIMEOUT))); Properties properties = context.getProperties(); - if (properties != null && properties.containsKey( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())) { - connectTimeout = Integer.parseInt(properties.getProperty( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())); + if (properties != null && properties.containsKey(CONNECT_TIMEOUT_KEY)) { + connectTimeout = Integer.parseInt(properties.getProperty(CONNECT_TIMEOUT_KEY)); } return connectTimeout; } protected int getConnectPoolSize(InterpreterLaunchContext context) { return Integer.parseInt(context.getProperties().getProperty( - ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getVarName(), - ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE.getIntValue() + "")); + CONNECTION_POOL_SIZE_KEY, + String.valueOf(DEFAULT_CONNECTION_POOL_SIZE))); } public static String escapeSpecialCharacter(String command) { @@ -86,7 +92,7 @@ public abstract class InterpreterLauncher { */ public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { // try to recover it first - if (zConf.isRecoveryEnabled()) { + if (isRecoveryEnabled()) { InterpreterClient recoveredClient = recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); if (recoveredClient != null) { @@ -106,6 +112,11 @@ public abstract class InterpreterLauncher { return launchDirectly(context); } + private boolean isRecoveryEnabled() { + String recoveryClass = zProperties.getProperty(RECOVERY_ENABLED_KEY, NULL_RECOVERY_CLASS); + return !NULL_RECOVERY_CLASS.equals(recoveryClass); + } + /** * launch interpreter process directly without recovering. * diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java index da09c0fbd6..3360982687 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/NullLifecycleManager.java @@ -18,18 +18,19 @@ package org.apache.zeppelin.interpreter.lifecycle; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; +import java.util.Properties; + /** * Do nothing for the lifecycle of interpreter. User need to explicitly start/stop interpreter. */ public class NullLifecycleManager extends LifecycleManager { - public NullLifecycleManager(ZeppelinConfiguration zConf, + public NullLifecycleManager(Properties properties, RemoteInterpreterServer remoteInterpreterServer) { - super(zConf, remoteInterpreterServer); + super(properties, remoteInterpreterServer); } @Override diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java index e6bb97c7b0..a8b163813d 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/lifecycle/TimeoutLifecycleManager.java @@ -18,13 +18,13 @@ package org.apache.zeppelin.interpreter.lifecycle; import org.apache.thrift.TException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.LifecycleManager; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer; import org.apache.zeppelin.scheduler.ExecutorFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Properties; import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -41,15 +41,20 @@ public class TimeoutLifecycleManager extends LifecycleManager { private static final Logger LOGGER = LoggerFactory.getLogger(TimeoutLifecycleManager.class); + private static final long DEFAULT_CHECK_INTERVAL = 60000L; + private static final long DEFAULT_TIMEOUT_THRESHOLD = 3600000L; + private long lastBusyTimeInMillis; - public TimeoutLifecycleManager(ZeppelinConfiguration zConf, + public TimeoutLifecycleManager(Properties properties, RemoteInterpreterServer remoteInterpreterServer) { - super(zConf, remoteInterpreterServer); - long checkInterval = zConf.getTime(ZeppelinConfiguration.ConfVars - .ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL); - long timeoutThreshold = zConf.getTime( - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD); + super(properties, remoteInterpreterServer); + long checkInterval = Long.parseLong(properties.getProperty( + "zeppelin.interpreter.lifecyclemanager.timeout.checkinterval", + String.valueOf(DEFAULT_CHECK_INTERVAL))); + long timeoutThreshold = Long.parseLong(properties.getProperty( + "zeppelin.interpreter.lifecyclemanager.timeout.threshold", + String.valueOf(DEFAULT_TIMEOUT_THRESHOLD))); ScheduledExecutorService checkScheduler = ExecutorFactory.singleton() .createOrGetScheduled("TimeoutLifecycleManager", 1); checkScheduler.scheduleAtFixedRate(() -> { @@ -64,7 +69,7 @@ public class TimeoutLifecycleManager extends LifecycleManager { LOGGER.debug("Check idle time of interpreter"); } }, checkInterval, checkInterval, MILLISECONDS); - LOGGER.info("TimeoutLifecycleManager is started with checkInterval: {}, timeoutThreshold: ΒΈ{}", checkInterval, + LOGGER.info("TimeoutLifecycleManager is started with checkInterval: {}, timeoutThreshold: {}", checkInterval, timeoutThreshold); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java index b7c87c0753..7ca2618c67 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.interpreter.recovery; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import java.io.IOException; @@ -32,13 +31,9 @@ import java.util.Map; */ public abstract class RecoveryStorage { - protected ZeppelinConfiguration zConf; protected Map<String, InterpreterClient> restoredClients; - // TODO(zjffdu) The constructor is inconsistent between base class and its implementation. - // The implementation actually use InterpreterSettingManager, the interface should also use it. - public RecoveryStorage(ZeppelinConfiguration zConf) { - this.zConf = zConf; + public RecoveryStorage() { } /** diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 6728e7f441..e733fd57f8 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -25,7 +25,6 @@ import org.apache.thrift.TException; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.apache.thrift.transport.TTransportException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; @@ -144,7 +143,7 @@ public class RemoteInterpreterServer extends Thread // pod (RemoteInterpreterServer) private boolean isForceShutdown = true; - private ZeppelinConfiguration zConf; + private Properties zProperties; private static Thread shutdownThread; @@ -200,10 +199,8 @@ public class RemoteInterpreterServer extends Thread @Override public void init(Map<String, String> properties) throws InterpreterRPCException, TException { - this.zConf = ZeppelinConfiguration.load(); - for (Map.Entry<String, String> entry : properties.entrySet()) { - this.zConf.setProperty(entry.getKey(), entry.getValue()); - } + this.zProperties = new Properties(); + this.zProperties.putAll(properties); try { lifecycleManager = createLifecycleManager(); @@ -213,8 +210,8 @@ public class RemoteInterpreterServer extends Thread } if (!isTest) { - int connectionPoolSize = - this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE); + int connectionPoolSize = Integer.parseInt( + zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100")); LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}", connectionPoolSize); intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, @@ -249,8 +246,8 @@ public class RemoteInterpreterServer extends Thread shutDownThread.start(); } - public ZeppelinConfiguration getConf() { - return this.zConf; + public Properties getProperties() { + return this.zProperties; } public LifecycleManager getLifecycleManager() { @@ -270,11 +267,13 @@ public class RemoteInterpreterServer extends Thread } private LifecycleManager createLifecycleManager() throws Exception { - String lifecycleManagerClass = zConf.getLifecycleManagerClass(); + String lifecycleManagerClass = zProperties.getProperty( + "zeppelin.interpreter.lifecyclemanager.class", + "org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager"); Class<?> clazz = Class.forName(lifecycleManagerClass); LOGGER.info("Creating interpreter lifecycle manager: {}", lifecycleManagerClass); - return (LifecycleManager) clazz.getConstructor(ZeppelinConfiguration.class, RemoteInterpreterServer.class) - .newInstance(zConf, this); + return (LifecycleManager) clazz.getConstructor(Properties.class, RemoteInterpreterServer.class) + .newInstance(zProperties, this); } public static void main(String[] args) throws Exception { @@ -335,7 +334,11 @@ public class RemoteInterpreterServer extends Thread properties.get("zeppelin.interpreter.output.limit")); } - depLoader = new DependencyResolver(localRepoPath, zConf); + depLoader = new DependencyResolver(localRepoPath, + zProperties.getProperty("zeppelin.proxy.url"), + zProperties.getProperty("zeppelin.proxy.user"), + zProperties.getProperty("zeppelin.proxy.password"), + zProperties.getProperty("zeppelin.interpreter.dep.mvnRepo")); appLoader = new ApplicationLoader(resourcePool, depLoader); resultCacheInSeconds = @@ -361,7 +364,6 @@ public class RemoteInterpreterServer extends Thread interpreter.setInterpreterGroup(interpreterGroup); interpreter.setUserName(userName); - interpreter.setZeppelinConfiguration(zConf); interpreterGroup.addInterpreterToSession(new LazyOpenInterpreter(interpreter), sessionId); @@ -484,7 +486,7 @@ public class RemoteInterpreterServer extends Thread this.intpEventServerHost = host; this.intpEventServerPort = port; intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort, - this.zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECTION_POOL_SIZE)); + Integer.parseInt(zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100"))); intpEventClient.setIntpGroupId(interpreterGroupId); this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java index e3213467fc..a4178704b5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/SchedulerFactory.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.scheduler; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.util.ExecutorUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,12 +49,23 @@ public class SchedulerFactory { } private SchedulerFactory() { - int threadPoolSize = ZeppelinConfiguration - .getStaticInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE); + int threadPoolSize = getSchedulerPoolSize(); LOGGER.info("Scheduler Thread Pool Size: {}", threadPoolSize); executor = ExecutorFactory.singleton().createOrGet(SCHEDULER_EXECUTOR_NAME, threadPoolSize); } + private static int getSchedulerPoolSize() { + String envValue = System.getenv("ZEPPELIN_INTERPRETER_SCHEDULER_POOL_SIZE"); + if (envValue != null) { + return Integer.parseInt(envValue); + } + String propValue = System.getProperty("zeppelin.scheduler.threadpool.size"); + if (propValue != null) { + return Integer.parseInt(propValue); + } + return 100; + } + public void destroy() { LOGGER.info("Destroy all executors"); synchronized (schedulers) { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java deleted file mode 100644 index 55c68d48a2..0000000000 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/conf/ZeppelinConfigurationTest.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.zeppelin.conf; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.time.format.DateTimeParseException; - -import org.junit.jupiter.api.Test; - -class ZeppelinConfigurationTest { - - @Test - void testTimeUnitToMill() { - assertEquals(10L, ZeppelinConfiguration.timeUnitToMill("10ms")); - assertEquals(2000L, ZeppelinConfiguration.timeUnitToMill("2s")); - assertEquals(60000L, ZeppelinConfiguration.timeUnitToMill("1m")); - assertEquals(3600000L, ZeppelinConfiguration.timeUnitToMill("1h")); - } - - @Test - void testTimeUnitToMill_WithoutUnit_1() { - assertThrows(DateTimeParseException.class, () -> { - ZeppelinConfiguration.timeUnitToMill("60000"); - }); - } - - @Test - void testTimeUnitToMill_WithoutUnit_2() { - assertThrows(DateTimeParseException.class, () -> { - ZeppelinConfiguration.timeUnitToMill("0"); - }); - } -} diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/BooterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/BooterTest.java index a2619db308..7ea565d528 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/BooterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/BooterTest.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.dep; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.eclipse.aether.repository.RemoteRepository; import org.junit.jupiter.api.Test; @@ -56,8 +55,8 @@ class BooterTest { @Test void getInterpreterMvnRepoPathTest() { - ZeppelinConfiguration zConf = ZeppelinConfiguration.load("zeppelin-site-test.xml"); - List<RemoteRepository> remoteRepositories = Booter.newCentralRepositorys(null, zConf); + String mvnRepoUrl = "https://repo1.maven.org/maven2/,https://repo2.maven.org/maven2/"; + List<RemoteRepository> remoteRepositories = Booter.newCentralRepositorys(null, mvnRepoUrl); assertNotNull(remoteRepositories); assertEquals(2, remoteRepositories.size()); assertEquals("https://repo1.maven.org/maven2/", remoteRepositories.get(0).getUrl()); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java index f51b683a75..1a41ab4858 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/dep/DependencyResolverTest.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.dep; import org.apache.commons.io.FileUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.eclipse.aether.RepositoryException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -45,7 +44,7 @@ class DependencyResolverTest { System.currentTimeMillis()); testPath = tmpDir.getAbsolutePath() + "/test-repo"; testCopyPath = new File(tmpDir, "test-copy-repo"); - resolver = new DependencyResolver(testPath, ZeppelinConfiguration.load()); + resolver = new DependencyResolver(testPath, null, null, null, null); } @AfterAll diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/helium/ApplicationLoaderTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/helium/ApplicationLoaderTest.java index 3344a38338..39b392eb4a 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/helium/ApplicationLoaderTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/helium/ApplicationLoaderTest.java @@ -18,7 +18,6 @@ package org.apache.zeppelin.helium; import org.apache.commons.io.FileUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.dep.DependencyResolver; import org.apache.zeppelin.interpreter.InterpreterOutput; import org.apache.zeppelin.resource.LocalResourcePool; @@ -54,7 +53,7 @@ class ApplicationLoaderTest { // given LocalResourcePool resourcePool = new LocalResourcePool("pool1"); DependencyResolver dep = - new DependencyResolver(tmpDir.getAbsolutePath(), ZeppelinConfiguration.load()); + new DependencyResolver(tmpDir.getAbsolutePath(), null, null, null, null); ApplicationLoader appLoader = new ApplicationLoader(resourcePool, dep); HeliumPackage pkg1 = createPackageInfo(MockApplication1.class.getName(), "artifact1"); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index c1700cdef1..58ef866a74 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -82,7 +82,7 @@ class RemoteInterpreterServerTest { server.getPort())); server.init(new HashMap<>()); - assertNotNull(server.getConf()); + assertNotNull(server.getProperties()); assertNotNull(server.getLifecycleManager()); } diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java index 91cc7a3a5c..c3fb24080a 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterLauncher.java @@ -16,13 +16,13 @@ */ package org.apache.zeppelin.interpreter.launcher; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; +import java.util.Properties; /** * Interpreter Launcher which use shell script to launch the interpreter process. @@ -32,9 +32,9 @@ public class DockerInterpreterLauncher extends InterpreterLauncher { private InterpreterLaunchContext context; - public DockerInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) + public DockerInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) throws IOException { - super(zConf, recoveryStorage); + super(zProperties, recoveryStorage); } @Override @@ -53,17 +53,17 @@ public class DockerInterpreterLauncher extends InterpreterLauncher { StandardInterpreterLauncher interpreterLauncher = null; if (isSpark()) { - interpreterLauncher = new SparkInterpreterLauncher(zConf, recoveryStorage); + interpreterLauncher = new SparkInterpreterLauncher(zProperties, recoveryStorage); } else if (isFlink()) { - interpreterLauncher = new FlinkInterpreterLauncher(zConf, recoveryStorage); + interpreterLauncher = new FlinkInterpreterLauncher(zProperties, recoveryStorage); } else { - interpreterLauncher = new StandardInterpreterLauncher(zConf, recoveryStorage); + interpreterLauncher = new StandardInterpreterLauncher(zProperties, recoveryStorage); } Map<String, String> env = interpreterLauncher.buildEnvFromProperties(context); return new DockerInterpreterProcess( - zConf, - zConf.getDockerContainerImage(), + zProperties, + zProperties.getProperty("zeppelin.docker.container.image", "apache/zeppelin"), context.getInterpreterGroupId(), context.getInterpreterSettingGroup(), context.getInterpreterSettingName(), diff --git a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java index 643afb2061..f56ec49859 100644 --- a/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java +++ b/zeppelin-plugins/launcher/docker/src/main/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcess.java @@ -53,8 +53,6 @@ import com.spotify.docker.client.messages.ProgressMessage; import org.apache.commons.io.FileUtils; import org.apache.commons.io.filefilter.FileFilterUtils; import org.apache.commons.lang.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars; import org.apache.zeppelin.interpreter.launcher.utils.TarFileEntry; import org.apache.zeppelin.interpreter.launcher.utils.TarUtils; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; @@ -63,7 +61,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.zeppelin.conf.ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB; public class DockerInterpreterProcess extends RemoteInterpreterProcess { private static final Logger LOGGER = LoggerFactory.getLogger(DockerInterpreterProcess.class); @@ -93,7 +90,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { @VisibleForTesting boolean uploadLocalLibToContainter = true; - private ZeppelinConfiguration zConf; + private Properties zProperties; private String zeppelinHome; private final String containerZeppelinHome; @@ -107,7 +104,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { private static final String CONTAINER_UPLOAD_TAR_DIR = "/tmp/zeppelin-tar"; public DockerInterpreterProcess( - ZeppelinConfiguration zConf, + Properties zProperties, String containerImage, String interpreterGroupId, String interpreterGroupName, @@ -128,20 +125,22 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { this.properties = properties; this.envs = new HashMap<>(envs); - this.zConf = zConf; + this.zProperties = zProperties; this.containerName = interpreterGroupId.toLowerCase(); - containerZeppelinHome = zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_HOME); - containerSparkHome = zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME); - uploadLocalLibToContainter = zConf.getBoolean( - ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER); + containerZeppelinHome = zProperties.getProperty( + "zeppelin.docker.container.home", "/opt/zeppelin"); + containerSparkHome = zProperties.getProperty( + "zeppelin.docker.container.spark.home", "/opt/spark"); + uploadLocalLibToContainter = Boolean.parseBoolean( + zProperties.getProperty("zeppelin.docker.upload.local.lib.to.container", "true")); try { this.zeppelinHome = getZeppelinHome(); } catch (IOException e) { LOGGER.error(e.getMessage(), e); } - dockerHost = zConf.getString(ConfVars.ZEPPELIN_DOCKER_HOST); + dockerHost = zProperties.getProperty("zeppelin.docker.host", "http://0.0.0.0:2375"); } @Override @@ -328,7 +327,8 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { envs.remove("PATH"); // set container time zone - envs.put("TZ", zConf.getString(ConfVars.ZEPPELIN_DOCKER_TIME_ZONE)); + envs.put("TZ", zProperties.getProperty("zeppelin.docker.time.zone", + java.util.TimeZone.getDefault().getID())); List<String> listEnv = new ArrayList<>(); for (Map.Entry<String, String> entry : this.envs.entrySet()) { @@ -498,7 +498,7 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { copyFiles.putIfAbsent(intpKeytab, intpKeytab); } // 3.5) zeppelin server keytab file - String zeppelinServerKeytab = zConf.getString(ZEPPELIN_SERVER_KERBEROS_KEYTAB); + String zeppelinServerKeytab = zProperties.getProperty("zeppelin.server.kerberos.keytab", ""); if (!StringUtils.isBlank(zeppelinServerKeytab)) { copyFiles.putIfAbsent(zeppelinServerKeytab, zeppelinServerKeytab); } @@ -636,9 +636,10 @@ public class DockerInterpreterProcess extends RemoteInterpreterProcess { private String getZeppelinHome() throws IOException { // check zeppelinHome is exist - File fileZeppelinHome = new File(zConf.getZeppelinHome()); + String zeppelinHomePath = zProperties.getProperty("zeppelin.home", "./"); + File fileZeppelinHome = new File(zeppelinHomePath); if (fileZeppelinHome.exists() && fileZeppelinHome.isDirectory()) { - return zConf.getZeppelinHome(); + return zeppelinHomePath; } throw new IOException("Can't find zeppelin home path!"); diff --git a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java index 1e9ad7d0bd..d9767ec58e 100644 --- a/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java +++ b/zeppelin-plugins/launcher/docker/src/test/java/org/apache/zeppelin/interpreter/launcher/DockerInterpreterProcessTest.java @@ -32,7 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; class DockerInterpreterProcessTest { @@ -40,8 +39,10 @@ class DockerInterpreterProcessTest { @Test void testCreateIntpProcess() throws IOException { + Properties zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); DockerInterpreterLauncher launcher - = new DockerInterpreterLauncher(zConf, null); + = new DockerInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); @@ -62,21 +63,25 @@ class DockerInterpreterProcessTest { @Test void testEnv() throws IOException { - when(zConf.getString(ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME)) - .thenReturn("my-spark-home"); - when(zConf.getBoolean(ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER)) - .thenReturn(false); - when(zConf.getString(ConfVars.ZEPPELIN_DOCKER_HOST)) - .thenReturn("http://my-docker-host:2375"); - Properties properties = new Properties(); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "5000"); HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); + Properties zProps = new Properties(); + zProps.putAll(zConf.getCompleteConfiguration()); + zProps.setProperty( + ConfVars.ZEPPELIN_DOCKER_CONTAINER_SPARK_HOME.getVarName(), + "my-spark-home"); + zProps.setProperty( + ConfVars.ZEPPELIN_DOCKER_UPLOAD_LOCAL_LIB_TO_CONTAINTER.getVarName(), + "false"); + zProps.setProperty( + ConfVars.ZEPPELIN_DOCKER_HOST.getVarName(), + "http://my-docker-host:2375"); DockerInterpreterProcess intp = spy(new DockerInterpreterProcess( - zConf, + zProps, "interpreter-container:1.0", "shared_process", "sh", @@ -101,8 +106,10 @@ class DockerInterpreterProcessTest { HashMap<String, String> envs = new HashMap<String, String>(); envs.put("MY_ENV1", "V1"); + Properties zProps = new Properties(); + zProps.putAll(zConf.getCompleteConfiguration()); DockerInterpreterProcess intp = new DockerInterpreterProcess( - zConf, + zProps, "interpreter-container:1.0", "shared_process", "sh", diff --git a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java index 847cef2bff..d06bd4b5ed 100644 --- a/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/flink/src/main/java/org/apache/zeppelin/interpreter/launcher/FlinkInterpreterLauncher.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.interpreter.launcher; import com.google.common.base.CharMatcher; import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,8 +42,8 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { "local", "remote", "yarn", "yarn-application", "kubernetes-application"); - public FlinkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); + public FlinkInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + super(zProperties, recoveryStorage); } @Override @@ -157,7 +156,7 @@ public class FlinkInterpreterLauncher extends StandardInterpreterLauncher { } final String flinkScalaVersion = scalaVersion; File flinkInterpreterFolder = - new File(zConf.getInterpreterDir(), "flink"); + new File(zProperties.getProperty("zeppelin.interpreter.dir", "interpreter"), "flink"); List<File> flinkScalaJars = Arrays.stream(flinkInterpreterFolder .listFiles(file -> file.getName().endsWith(".jar"))) diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java index 7a1ce48275..b08e7848de 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncher.java @@ -23,8 +23,8 @@ import java.net.InetAddress; import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; +import java.util.Properties; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; @@ -41,8 +41,8 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(K8sStandardInterpreterLauncher.class); private final KubernetesClient client; - public K8sStandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); + public K8sStandardInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + super(zProperties, recoveryStorage); client = new DefaultKubernetesClient(); } @@ -68,7 +68,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { //The namespace of zeppelin server can only be read from Config.KUBERNETES_NAMESPACE_PATH while it runs in k8s cluster, it may be different from the namespace of interpreter String serverNamespace = K8sUtils.getCurrentK8sNamespace(); return String.format("%s.%s.svc", - zConf.getK8sServiceName(), + zProperties.getProperty("zeppelin.k8s.service.name", "zeppelin-server"), serverNamespace); } else { return context.getIntpEventServerHost(); @@ -96,7 +96,7 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { * Only if a spark interpreter process is running, user impersonation should be possible for --proxy-user */ private boolean isUserImpersonateForSparkInterpreter(InterpreterLaunchContext context) { - return zConf.getZeppelinImpersonateSparkProxyUser() && + return Boolean.parseBoolean(zProperties.getProperty("zeppelin.impersonate.spark.proxy.user", "true")) && context.getOption().isUserImpersonate() && "spark".equalsIgnoreCase(context.getInterpreterSettingGroup()); } @@ -107,9 +107,9 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { return new K8sRemoteInterpreterProcess( client, - K8sUtils.getInterpreterNamespace(context.getProperties(), zConf), - new File(zConf.getK8sTemplatesDir(), "interpreter"), - zConf.getK8sContainerImage(), + K8sUtils.getInterpreterNamespace(context.getProperties(), zProperties), + new File(zProperties.getProperty("zeppelin.k8s.template.dir", "k8s"), "interpreter"), + zProperties.getProperty("zeppelin.k8s.interpreter.container.image"), context.getInterpreterGroupId(), context.getInterpreterSettingGroup(), context.getInterpreterSettingName(), @@ -117,12 +117,12 @@ public class K8sStandardInterpreterLauncher extends InterpreterLauncher { buildEnvFromProperties(context), getZeppelinService(context), getZeppelinServiceRpcPort(context), - zConf.getK8sPortForward(), - zConf.getK8sSparkContainerImage(), + Boolean.parseBoolean(zProperties.getProperty("zeppelin.k8s.portforward", "false")), + zProperties.getProperty("zeppelin.k8s.spark.container.image"), getConnectTimeout(context), getConnectPoolSize(context), isUserImpersonateForSparkInterpreter(context), - zConf.getK8sTimeoutDuringPending()); + Boolean.parseBoolean(zProperties.getProperty("zeppelin.k8s.timeout.during.pending", "true"))); } protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java index f064842753..af417e58c6 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java @@ -29,8 +29,6 @@ import java.util.regex.Pattern; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; - import io.fabric8.kubernetes.client.Config; public class K8sUtils { @@ -104,13 +102,13 @@ public class K8sUtils { /** * Get the namespace of the interpreter. - * Check Order: zeppelin.k8s.interpreter.namespace -> getCurrentK8sNamespace() -> zConf.getK8sNamepsace() + * Check Order: zeppelin.k8s.interpreter.namespace -> getCurrentK8sNamespace() -> zProperties.getProperty("zeppelin.k8s.namespace") * @param properties - * @param zConf + * @param zProperties * @return the interpreter namespace * @throws IOException */ - public static String getInterpreterNamespace(Properties properties, ZeppelinConfiguration zConf) throws IOException { + public static String getInterpreterNamespace(Properties properties, Properties zProperties) throws IOException { if(properties.containsKey("zeppelin.k8s.interpreter.namespace")){ return properties.getProperty("zeppelin.k8s.interpreter.namespace"); } @@ -118,7 +116,7 @@ public class K8sUtils { if (isRunningOnKubernetes()) { return getCurrentK8sNamespace(); } else { - return zConf.getK8sNamepsace(); + return zProperties.getProperty("zeppelin.k8s.namespace", "default"); } } diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java index 651f4a9e6f..5ec671d9f7 100644 --- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sStandardInterpreterLauncherTest.java @@ -43,7 +43,9 @@ class K8sStandardInterpreterLauncherTest { void testK8sLauncher() throws IOException { // given ZeppelinConfiguration zConf = ZeppelinConfiguration.load(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); + Properties zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -72,7 +74,9 @@ class K8sStandardInterpreterLauncherTest { void testK8sLauncherWithSparkAndUserImpersonate() throws IOException { // given ZeppelinConfiguration zConf = ZeppelinConfiguration.load(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); + Properties zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -106,7 +110,9 @@ class K8sStandardInterpreterLauncherTest { void testK8sLauncherWithSparkAndWithoutUserImpersonate() throws IOException { // given ZeppelinConfiguration zConf = ZeppelinConfiguration.load(); - K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zConf, null); + Properties zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + K8sStandardInterpreterLauncher launcher = new K8sStandardInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java index 33ca438d00..d832069aef 100644 --- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnInterpreterLauncher.java @@ -17,7 +17,6 @@ package org.apache.zeppelin.interpreter.launcher; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; @@ -26,6 +25,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import java.util.Properties; /** * Launcher for running interpreter in yarn container. @@ -34,8 +34,8 @@ public class YarnInterpreterLauncher extends InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(YarnInterpreterLauncher.class); - public YarnInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); + public YarnInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + super(zProperties, recoveryStorage); } @Override @@ -48,7 +48,7 @@ public class YarnInterpreterLauncher extends InterpreterLauncher { buildEnvFromProperties(context), getConnectTimeout(context), getConnectPoolSize(context), - zConf); + zProperties); } protected Map<String, String> buildEnvFromProperties(InterpreterLaunchContext context) { diff --git a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java index f47eaf68d7..492cf96530 100644 --- a/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java +++ b/zeppelin-plugins/launcher/yarn/src/main/java/org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.java @@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,7 +77,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { private String host; private int port = -1; - private final ZeppelinConfiguration zConf; + private final Properties zProperties; private final InterpreterLaunchContext launchContext; private final Properties properties; private final Map<String, String> envs; @@ -103,12 +102,12 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { Map<String, String> envs, int connectTimeout, int connectionPoolSize, - ZeppelinConfiguration zConf) { + Properties zProperties) { super(connectTimeout, connectionPoolSize, launchContext.getIntpEventServerHost(), launchContext.getIntpEventServerPort()); - this.zConf = zConf; + this.zProperties = zProperties; this.launchContext = launchContext; this.properties = properties; this.envs = envs; @@ -326,12 +325,12 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { vargs.add("-p"); vargs.add(launchContext.getIntpEventServerPort() + ""); vargs.add("-r"); - vargs.add(zConf.getInterpreterPortRange() + ""); + vargs.add(zProperties.getProperty("zeppelin.interpreter.rpc.portRange", ":") + ""); vargs.add("-i"); vargs.add(launchContext.getInterpreterGroupId()); vargs.add("-l"); vargs.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/" + - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO.getStringValue() + zProperties.getProperty("zeppelin.interpreter.localRepo", "local-repo") + "/" + launchContext.getInterpreterSettingName()); vargs.add("-g"); vargs.add(launchContext.getInterpreterSettingName()); @@ -492,7 +491,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { try (ZipOutputStream interpreterZipStream = new ZipOutputStream(new FileOutputStream(interpreterArchive))) { interpreterZipStream.setLevel(0); - String zeppelinHomeEnv = zConf.getZeppelinHome(); + String zeppelinHomeEnv = zProperties.getProperty("zeppelin.home", "./"); if (org.apache.commons.lang3.StringUtils.isBlank(zeppelinHomeEnv)) { throw new IOException("ZEPPELIN_HOME is not specified"); } @@ -506,7 +505,7 @@ public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess { File interpreterDir = new File(zeppelinHome, "interpreter/" + launchContext.getInterpreterSettingGroup()); addFileToZipStream(interpreterZipStream, interpreterDir, "interpreter"); - File localRepoDir = new File(zConf.getInterpreterLocalRepoPath() + File.separator + File localRepoDir = new File(zProperties.getProperty("zeppelin.interpreter.localRepo", "local-repo") + File.separator + launchContext.getInterpreterSettingName()); if (localRepoDir.exists() && localRepoDir.isDirectory()) { LOGGER.debug("Adding localRepoDir {} to interpreter zip: ", localRepoDir.getAbsolutePath()); diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java index a845bf869a..d1f330b993 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/service/InterpreterService.java @@ -71,7 +71,13 @@ public class InterpreterService { String interpreterBaseDir = zConf.getInterpreterDir(); String localRepoPath = zConf.getInterpreterLocalRepoPath(); - final DependencyResolver dependencyResolver = new DependencyResolver(localRepoPath, zConf); + final DependencyResolver dependencyResolver = new DependencyResolver( + localRepoPath, + zConf.getZeppelinProxyUrl(), + zConf.getZeppelinProxyUser(), + zConf.getZeppelinProxyPassword(), + zConf.getString( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO)); // TODO(jl): Make a rule between an interpreter name and an installation directory List<String> possibleInterpreterDirectories = new ArrayList<>(); diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java index a73c2563d4..d7a89f30ab 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/service/InterpreterServiceTest.java @@ -102,7 +102,7 @@ class InterpreterServiceTest { Path specificInterpreterPath = Files.createDirectory(Paths.get(interpreterDir.toString(), interpreterName)); DependencyResolver dependencyResolver = - new DependencyResolver(localRepoDir.toString(), ZeppelinConfiguration.load()); + new DependencyResolver(localRepoDir.toString(), null, null, null, null); doNothing().when(mockInterpreterSettingManager).refreshInterpreterTemplates(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java similarity index 100% rename from zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinLocationStrategy.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinLocationStrategy.java similarity index 100% rename from zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinLocationStrategy.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinLocationStrategy.java diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 2adaa55a42..f26bc54f0e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -197,7 +197,7 @@ public class InterpreterSetting { public Builder setConf(ZeppelinConfiguration zConf) { interpreterSetting.zConf = zConf; - interpreterSetting.option.setConf(zConf); + interpreterSetting.option.setUsernameForceLowerCase(zConf.isUsernameForceLowerCase()); return this; } @@ -664,7 +664,7 @@ public class InterpreterSetting { public InterpreterSetting setConf(ZeppelinConfiguration zConf) { this.zConf = zConf; - this.option.setConf(zConf); + this.option.setUsernameForceLowerCase(zConf.isUsernameForceLowerCase()); return this; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java index 1f25479aa9..e295938220 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSettingManager.java @@ -178,7 +178,10 @@ public class InterpreterSettingManager implements NoteEventListener { this.interpreterDirPath = Paths.get(zConf.getInterpreterDir()); LOGGER.debug("InterpreterRootPath: {}", interpreterDirPath); this.dependencyResolver = - new DependencyResolver(zConf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO), zConf); + new DependencyResolver(zConf.getString(ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO), + zConf.getZeppelinProxyUrl(), zConf.getZeppelinProxyUser(), + zConf.getZeppelinProxyPassword(), + zConf.getString(ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO)); this.interpreterRepositories = new ArrayList<>(); for (RemoteRepository repo : dependencyResolver.getRepos()) { this.interpreterRepositories.add(Repository.fromRemoteRepository(repo)); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java index 4c9a3ae3cd..3038e01452 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/install/InstallInterpreter.java @@ -163,7 +163,8 @@ public class InstallInterpreter { proxy = new Proxy(proxyUrl.getProtocol(), proxyUrl.getHost(), proxyUrl.getPort(), auth); } DependencyResolver depResolver = - new DependencyResolver(localRepoDir, proxy, zConf); + new DependencyResolver(localRepoDir, proxy, + zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_DEP_MVNREPO)); File installDir = new File(interpreterBaseDir, name); if (installDir.exists()) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index d131c816e0..1200a64d3f 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -36,7 +36,6 @@ import java.util.stream.StreamSupport; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; @@ -54,8 +53,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { private static final String DEFAULT_MASTER = "local[*]"; Optional<String> sparkMaster = Optional.empty(); - public SparkInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); + public SparkInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + super(zProperties, recoveryStorage); } @Override @@ -111,18 +110,21 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { // Because `zeppelin.interpreter.forceShutdown` is initialized in RemoteInterpreterServer // before SparkInterpreter is created. context.getProperties().put("zeppelin.interpreter.forceShutdown", "false"); - } else if (zConf.isOnlyYarnCluster()){ + } else if (Boolean.parseBoolean( + zProperties.getProperty("zeppelin.spark.only_yarn_cluster", "false"))){ throw new IOException("Only yarn-cluster mode is allowed, please set " + - ZeppelinConfiguration.ConfVars.ZEPPELIN_SPARK_ONLY_YARN_CLUSTER.getVarName() + + "zeppelin.spark.only_yarn_cluster" + " to false if you want to use other modes."); } if (isYarnMode(context) && getDeployMode(context).equals("cluster")) { if (sparkProperties.containsKey("spark.files")) { + String confDir = zProperties.getProperty("zeppelin.conf.dir", "conf"); sparkProperties.put("spark.files", sparkProperties.getProperty("spark.files") + "," + - zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + confDir + "/log4j_yarn_cluster.properties"); } else { - sparkProperties.put("spark.files", zConf.getConfDir() + "/log4j_yarn_cluster.properties"); + String confDir = zProperties.getProperty("zeppelin.conf.dir", "conf"); + sparkProperties.put("spark.files", confDir + "/log4j_yarn_cluster.properties"); } sparkProperties.put("spark.yarn.maxAppAttempts", "1"); } @@ -144,7 +146,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { try { List<String> additionalJars = new ArrayList<>(); Path localRepoPath = - Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + Paths.get(zProperties.getProperty("zeppelin.interpreter.localRepo", "local-repo"), + context.getInterpreterSettingId()); if (Files.exists(localRepoPath) && Files.isDirectory(localRepoPath)) { try (DirectoryStream<Path> localRepoStream = Files.newDirectoryStream(localRepoPath, Files::isRegularFile)) { List<String> localRepoJars = StreamSupport.stream(localRepoStream.spliterator(), @@ -154,7 +157,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } } - Path scalaFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter/spark/scala-" + scalaVersion); + String zeppelinHome = zProperties.getProperty("zeppelin.home", ".."); + Path scalaFolder = Paths.get(zeppelinHome, "/interpreter/spark/scala-" + scalaVersion); if (!scalaFolder.toFile().exists()) { throw new IOException("spark scala folder " + scalaFolder.toFile() + " doesn't exist"); } @@ -165,7 +169,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { additionalJars.addAll(scalaJars); } // add zeppelin-interpreter-shaded - Path interpreterFolder = Paths.get(zConf.getZeppelinHome(), "/interpreter"); + Path interpreterFolder = Paths.get(zeppelinHome, "/interpreter"); try (DirectoryStream<Path> interpreterStream = Files.newDirectoryStream(interpreterFolder, Files::isRegularFile)) { List<String> interpreterJars = StreamSupport.stream(interpreterStream.spliterator(), false) @@ -194,7 +198,8 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } StringJoiner sparkConfSJ = new StringJoiner("|"); - if (context.getOption().isUserImpersonate() && zConf.getZeppelinImpersonateSparkProxyUser()) { + if (context.getOption().isUserImpersonate() && Boolean.parseBoolean( + zProperties.getProperty("zeppelin.impersonate.spark.proxy.user", "true"))) { sparkConfSJ.add("--proxy-user"); sparkConfSJ.add(context.getUserName()); sparkProperties.remove("spark.yarn.keytab"); @@ -221,9 +226,9 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { } String keytab = properties.getProperty("spark.yarn.keytab", - zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_KEYTAB)); + zProperties.getProperty("zeppelin.server.kerberos.keytab")); String principal = properties.getProperty("spark.yarn.principal", - zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_SERVER_KERBEROS_PRINCIPAL)); + zProperties.getProperty("zeppelin.server.kerberos.principal")); if (!StringUtils.isBlank(keytab) && !StringUtils.isBlank(principal)) { env.put("ZEPPELIN_SERVER_KERBEROS_KEYTAB", keytab); @@ -386,7 +391,7 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { " for non-local mode, if you specify it in zeppelin-env.sh, please move that into " + " interpreter setting"); } - String zeppelinHome = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME); + String zeppelinHome = zProperties.getProperty("zeppelin.home", ".."); sparkRBasePath = new File(zeppelinHome, "interpreter" + File.separator + "spark" + File.separator + "R"); } else { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index 77e6e7bddc..2a80f98163 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -20,7 +20,6 @@ package org.apache.zeppelin.interpreter.launcher; import org.apache.commons.exec.environment.EnvironmentUtils; import org.apache.commons.lang3.StringUtils; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -33,6 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.util.Map; +import java.util.Properties; /** * Interpreter Launcher which use shell script to launch the interpreter process. @@ -41,8 +41,8 @@ public class StandardInterpreterLauncher extends InterpreterLauncher { private static final Logger LOGGER = LoggerFactory.getLogger(StandardInterpreterLauncher.class); - public StandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recoveryStorage) { - super(zConf, recoveryStorage); + public StandardInterpreterLauncher(Properties zProperties, RecoveryStorage recoveryStorage) { + super(zProperties, recoveryStorage); } @Override @@ -68,14 +68,20 @@ public class StandardInterpreterLauncher extends InterpreterLauncher { false); } else { // create new remote process - String localRepoPath = zConf.getInterpreterLocalRepoPath() + File.separator + String interpreterLocalRepoPath = zProperties.getProperty("zeppelin.interpreter.localRepo", + "local-repo"); + String localRepoPath = interpreterLocalRepoPath + File.separator + context.getInterpreterSettingId(); + String interpreterDir = zProperties.getProperty("zeppelin.interpreter.dir", "interpreter"); + String interpreterPortRange = zProperties.getProperty("zeppelin.interpreter.portRange", ":"); + String remoteRunnerPath = zProperties.getProperty("zeppelin.interpreter.remoterunner", + System.getProperty("zeppelin.home") + "/bin/interpreter.sh"); return new ExecRemoteInterpreterProcess( - context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(), - zConf.getInterpreterDir() + "/" + groupName, localRepoPath, + context.getIntpEventServerPort(), context.getIntpEventServerHost(), interpreterPortRange, + interpreterDir + "/" + groupName, localRepoPath, buildEnvFromProperties(context), connectTimeout, connectionPoolSize, name, context.getInterpreterGroupId(), option.isUserImpersonate(), - runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath()); + runner != null ? runner.getPath() : remoteRunnerPath); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index 26cecb256e..40448f9a92 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -44,6 +44,7 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { private static final Logger LOGGER = LoggerFactory.getLogger(FileSystemRecoveryStorage.class); + private final ZeppelinConfiguration zConf; private FileSystemStorage fs; private Path recoveryDir; private InterpreterSettingManager interpreterSettingManager; @@ -51,7 +52,8 @@ public class FileSystemRecoveryStorage extends RecoveryStorage { public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { - super(zConf); + super(); + this.zConf = zConf; this.interpreterSettingManager = interpreterSettingManager; String recoveryDirProperty = zConf.getString(ZeppelinConfiguration.ConfVars.ZEPPELIN_RECOVERY_DIR); this.fs = new FileSystemStorage(zConf, recoveryDirProperty); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java index 078722e2a6..de088306f8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/LocalRecoveryStorage.java @@ -38,17 +38,20 @@ public class LocalRecoveryStorage extends RecoveryStorage { private static final Logger LOGGER = LoggerFactory.getLogger(LocalRecoveryStorage.class); + private final ZeppelinConfiguration zConf; private InterpreterSettingManager interpreterSettingManager; private File recoveryDir; public LocalRecoveryStorage(ZeppelinConfiguration zConf) { - super(zConf); + super(); + this.zConf = zConf; } public LocalRecoveryStorage(ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { - super(zConf); + super(); + this.zConf = zConf; this.recoveryDir = new File(zConf.getRecoveryDir()); LOGGER.info("Using folder {} to store recovery data", recoveryDir); if (!this.recoveryDir.exists()) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java index ec6a3b058d..46af727b79 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -35,7 +35,7 @@ public class NullRecoveryStorage extends RecoveryStorage { public NullRecoveryStorage(ZeppelinConfiguration zConf, InterpreterSettingManager interpreterSettingManager) throws IOException { - super(zConf); + super(); } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 859e4e2ddc..6bd2e20232 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -77,7 +77,6 @@ public class RemoteInterpreter extends Interpreter { this.sessionId = sessionId; this.className = className; this.setUserName(userName); - this.setZeppelinConfiguration(zConf); } public boolean isOpened() { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java index b229d0f0f5..a81b762527 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/plugin/PluginManager.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import jakarta.inject.Inject; @@ -109,13 +110,24 @@ public class PluginManager { String launcherClassName = "org.apache.zeppelin.interpreter.launcher." + launcherPlugin; LOGGER.info("Loading Interpreter Launcher Plugin: {}", launcherClassName); + Properties zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + zProperties.setProperty("zeppelin.home", zConf.getZeppelinHome()); + zProperties.setProperty("zeppelin.conf.dir", zConf.getConfDir()); + zProperties.setProperty("zeppelin.interpreter.dir", + zConf.getInterpreterDir()); + zProperties.setProperty("zeppelin.interpreter.localRepo", + zConf.getInterpreterLocalRepoPath()); + zProperties.setProperty("zeppelin.interpreter.remoterunner", + zConf.getInterpreterRemoteRunnerPath()); + if (builtinLauncherClassNames.contains(launcherClassName) || Boolean.parseBoolean(System.getProperty("zeppelin.isTest", "false"))) { try { return (InterpreterLauncher) (Class.forName(launcherClassName)) - .getConstructor(ZeppelinConfiguration.class, RecoveryStorage.class) - .newInstance(zConf, recoveryStorage); + .getConstructor(Properties.class, RecoveryStorage.class) + .newInstance(zProperties, recoveryStorage); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) { throw new IOException("Fail to instantiate InterpreterLauncher from classpath directly:" @@ -127,8 +139,8 @@ public class PluginManager { InterpreterLauncher launcher = null; try { launcher = (InterpreterLauncher) (Class.forName(launcherClassName, true, pluginClassLoader)) - .getConstructor(ZeppelinConfiguration.class, RecoveryStorage.class) - .newInstance(zConf, recoveryStorage); + .getConstructor(Properties.class, RecoveryStorage.class) + .newInstance(zProperties, recoveryStorage); } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException | InvocationTargetException e) { throw new IOException("Fail to instantiate Launcher " + launcherPlugin + diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java index f7df6715d5..7d4051687b 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/InterpreterSettingTest.java @@ -577,7 +577,7 @@ class InterpreterSettingTest extends AbstractInterpreterTest{ .setGroup("group") .setOption(interpreterOption) .setIntepreterSettingManager(interpreterSettingManager) - .setDependencyResolver(new DependencyResolver("/tmp", zConf)) + .setDependencyResolver(new DependencyResolver("/tmp", null, null, null, null)) .setConf(zConf) .create(); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index 52ac5a09b5..a3dbd011e2 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -55,6 +55,7 @@ class SparkInterpreterLauncherTest { private String sparkHome; private String zeppelinHome; private ZeppelinConfiguration zConf; + private Properties zProperties; @BeforeEach public void setUp() { @@ -63,12 +64,22 @@ class SparkInterpreterLauncherTest { zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath()); zeppelinHome = zConf.getZeppelinHome(); + zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + zProperties.setProperty("zeppelin.home", zConf.getZeppelinHome()); + zProperties.setProperty("zeppelin.conf.dir", zConf.getConfDir()); + zProperties.setProperty("zeppelin.interpreter.dir", + zConf.getInterpreterDir()); + zProperties.setProperty("zeppelin.interpreter.localRepo", + zConf.getInterpreterLocalRepoPath()); + zProperties.setProperty("zeppelin.interpreter.remoterunner", + zConf.getInterpreterRemoteRunnerPath()); LOGGER.info("ZEPPELIN_HOME: " + zeppelinHome); } @Test void testConnectTimeOut() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty( @@ -91,7 +102,7 @@ class SparkInterpreterLauncherTest { @Test void testLocalMode() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("ENV_1", ""); @@ -121,7 +132,7 @@ class SparkInterpreterLauncherTest { @Test void testYarnClientMode_1() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); @@ -154,7 +165,7 @@ class SparkInterpreterLauncherTest { @Test void testYarnClientMode_2() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); @@ -189,7 +200,7 @@ class SparkInterpreterLauncherTest { @Test void testYarnClusterMode_1() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); @@ -231,7 +242,7 @@ class SparkInterpreterLauncherTest { @Test void testYarnClusterMode_2() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); @@ -282,7 +293,7 @@ class SparkInterpreterLauncherTest { @Test void testYarnClusterMode_3() throws IOException { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("SPARK_HOME", sparkHome); properties.setProperty("property_1", "value_1"); @@ -335,7 +346,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionDirectStreamCapture() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectSparkScalaVersionMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -354,7 +365,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithNonExistentDirectory() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -379,7 +390,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithFileInsteadOfDirectory() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -414,7 +425,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithValidDirectory() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -443,7 +454,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithEmptyDirectory() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -471,7 +482,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithMultipleJars() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -507,7 +518,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithScala213() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( @@ -536,7 +547,7 @@ class SparkInterpreterLauncherTest { @Test void testDetectSparkScalaVersionByReplClassWithUnsupportedScalaVersion() throws Exception { - SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zProperties, null); // Use reflection to access private method Method detectMethod = SparkInterpreterLauncher.class.getDeclaredMethod( diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java index a9db3ee05a..a95c90af2a 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java @@ -31,16 +31,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class StandardInterpreterLauncherTest { - private ZeppelinConfiguration zConf; + private Properties zProperties; @BeforeEach public void setUp() { - zConf = ZeppelinConfiguration.load(); + ZeppelinConfiguration zConf = ZeppelinConfiguration.load(); + zProperties = new Properties(); + zProperties.putAll(zConf.getCompleteConfiguration()); + zProperties.setProperty("zeppelin.home", zConf.getZeppelinHome()); + zProperties.setProperty("zeppelin.conf.dir", zConf.getConfDir()); + zProperties.setProperty("zeppelin.interpreter.dir", + zConf.getInterpreterDir()); + zProperties.setProperty("zeppelin.interpreter.localRepo", + zConf.getInterpreterLocalRepoPath()); + zProperties.setProperty("zeppelin.interpreter.remoterunner", + zConf.getInterpreterRemoteRunnerPath()); } @Test void testLauncher() throws IOException { - StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null); + StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty("ENV_1", "VALUE_1"); properties.setProperty("property_1", "value_1"); @@ -55,7 +65,6 @@ class StandardInterpreterLauncherTest { assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getLongValue(), interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 2); assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1")); assertTrue(interpreterProcess.getEnv().containsKey("INTERPRETER_GROUP_ID")); @@ -65,7 +74,7 @@ class StandardInterpreterLauncherTest { @Test void testConnectTimeOut() throws IOException { - StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zConf, null); + StandardInterpreterLauncher launcher = new StandardInterpreterLauncher(zProperties, null); Properties properties = new Properties(); properties.setProperty( ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); @@ -79,7 +88,6 @@ class StandardInterpreterLauncherTest { assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); assertEquals(10000, interpreterProcess.getConnectTimeout()); - assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertTrue(interpreterProcess.getEnv().size() >= 1); assertTrue(interpreterProcess.getEnv().containsKey("INTERPRETER_GROUP_ID")); assertEquals(true, interpreterProcess.isUserImpersonated());
