Repository: zeppelin Updated Branches: refs/heads/master d1bc07298 -> ca7c9cd81
[ZEPPELIN-3490] Yarn cluster mode doesn't work with dependencies set by interpreter tab ### What is this PR for? Passing dependencies set by interpreter tab to spark job by setting those dependencies to `--jars` option in yarn cluster mode ### What type of PR is it? [Bug Fix] ### Todos * [x] - Add `--jars` option to yarn cluster mode ### What is the Jira issue? * https://issues.apache.org/jira/browse/ZEPPELIN-3490 ### How should this be tested? 1. Add a dependency into interpreter tab 2. Set 'yarn-cluster' of Spark Interpreter 3. Test it ### Screenshots (if appropriate) ### Questions: * Does the licenses files need update? No * Is there breaking changes for older versions? No * Does this needs documentation? No Author: Jongyoul Lee <[email protected]> Closes #3015 from jongyoul/ZEPPELIN-3490 and squashes the following commits: b3eb6cbb0 [Jongyoul Lee] Added a test in case where localRepoPath doesn't have any jar inside it 836657417 [Jongyoul Lee] Fixed code rebased wrong d8aad5acc [Jongyoul Lee] Add `--jars` option to `SparkInterpreterLauncher` class Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ca7c9cd8 Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ca7c9cd8 Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ca7c9cd8 Branch: refs/heads/master Commit: ca7c9cd8156d3647cfbd20291f37e2153c2f2326 Parents: d1bc072 Author: Jongyoul Lee <[email protected]> Authored: Wed Jul 4 15:17:29 2018 +0900 Committer: Jongyoul Lee <[email protected]> Committed: Fri Jul 6 15:16:25 2018 +0900 ---------------------------------------------------------------------- .../launcher/SparkInterpreterLauncher.java | 24 +++++++++++ .../launcher/SparkInterpreterLauncherTest.java | 44 ++++++++++++++++++++ 2 files changed, 68 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca7c9cd8/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java index ab95e0b..f1a8012 100644 --- a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java @@ -17,6 +17,11 @@ package org.apache.zeppelin.interpreter.launcher; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.stream.StreamSupport; import org.apache.commons.lang3.StringUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; @@ -76,6 +81,25 @@ public class SparkInterpreterLauncher extends StandardInterpreterLauncher { !useProxyUserEnv.equals("false"))) { sparkConfBuilder.append(" --proxy-user " + context.getUserName()); } + Path localRepoPath = + Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + if (isYarnMode() + && getDeployMode().equals("cluster") + && Files.exists(localRepoPath) + && Files.isDirectory(localRepoPath)) { + try { + StreamSupport.stream( + Files.newDirectoryStream(localRepoPath, entry -> Files.isRegularFile(entry)) + .spliterator(), + false) + .map(jar -> jar.toAbsolutePath().toString()) + .reduce((x, y) -> x.concat(",").concat(y)) + .ifPresent(extraJars -> sparkConfBuilder.append(" --jars ").append(extraJars)); + } catch (IOException e) { + LOGGER.error("Cannot make a list of additional jars from localRepo: {}", localRepoPath, e); + } + + } env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString()); http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca7c9cd8/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java ---------------------------------------------------------------------- diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index c2abd60..119a67e 100644 --- a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -17,6 +17,10 @@ package org.apache.zeppelin.interpreter.launcher; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; @@ -177,6 +181,45 @@ public class SparkInterpreterLauncherTest { InterpreterOption option = new InterpreterOption(); option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + FileUtils.deleteDirectory(localRepoPath.toFile()); + Files.createDirectories(localRepoPath); + Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); + + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("spark", interpreterProcess.getInterpreterSettingName()); + assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); + assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 3); + assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); + assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); + assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1 --jars " + Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString(), interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); + FileUtils.deleteDirectory(localRepoPath.toFile()); + } + + @Test + public void testYarnClusterMode_3() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty("SPARK_HOME", "/user/spark"); + properties.setProperty("property_1", "value_1"); + properties.setProperty("master", "yarn"); + properties.setProperty("spark.submit.deployMode", "cluster"); + properties.setProperty("spark.files", "file_1"); + properties.setProperty("spark.jars", "jar_1"); + + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); + Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), context.getInterpreterSettingId()); + FileUtils.deleteDirectory(localRepoPath.toFile()); + Files.createDirectories(localRepoPath); + InterpreterClient client = launcher.launch(context); assertTrue( client instanceof RemoteInterpreterManagedProcess); RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; @@ -188,5 +231,6 @@ public class SparkInterpreterLauncherTest { assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME")); assertEquals("true", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER")); assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties --conf spark.files='file_1' --conf spark.jars='jar_1' --conf spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --proxy-user user1", interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")); + FileUtils.deleteDirectory(localRepoPath.toFile()); } }
