Repository: zeppelin
Updated Branches:
  refs/heads/master d1bc07298 -> ca7c9cd81


[ZEPPELIN-3490] Yarn cluster mode doesn't work with dependencies set by 
interpreter tab

### What is this PR for?
Passing dependencies set by interpreter tab to spark job by setting those 
dependencies to `--jars` option in yarn cluster mode

### What type of PR is it?
[Bug Fix]

### Todos
* [x] - Add `--jars` option to yarn cluster mode

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3490

### How should this be tested?
1. Add a dependency into interpreter tab
2. Set 'yarn-cluster' of Spark Interpreter
3. Test it

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jongyoul Lee <[email protected]>

Closes #3015 from jongyoul/ZEPPELIN-3490 and squashes the following commits:

b3eb6cbb0 [Jongyoul Lee] Added a test in case where localRepoPath doesn't have 
any jar inside it
836657417 [Jongyoul Lee] Fixed code rebased wrong
d8aad5acc [Jongyoul Lee] Add `--jars` option to `SparkInterpreterLauncher` class


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/ca7c9cd8
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/ca7c9cd8
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/ca7c9cd8

Branch: refs/heads/master
Commit: ca7c9cd8156d3647cfbd20291f37e2153c2f2326
Parents: d1bc072
Author: Jongyoul Lee <[email protected]>
Authored: Wed Jul 4 15:17:29 2018 +0900
Committer: Jongyoul Lee <[email protected]>
Committed: Fri Jul 6 15:16:25 2018 +0900

----------------------------------------------------------------------
 .../launcher/SparkInterpreterLauncher.java      | 24 +++++++++++
 .../launcher/SparkInterpreterLauncherTest.java  | 44 ++++++++++++++++++++
 2 files changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca7c9cd8/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
 
b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
index ab95e0b..f1a8012 100644
--- 
a/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
+++ 
b/zeppelin-plugins/launcher/spark/src/main/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncher.java
@@ -17,6 +17,11 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.stream.StreamSupport;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
@@ -76,6 +81,25 @@ public class SparkInterpreterLauncher extends 
StandardInterpreterLauncher {
         !useProxyUserEnv.equals("false"))) {
       sparkConfBuilder.append(" --proxy-user " + context.getUserName());
     }
+    Path localRepoPath =
+        Paths.get(zConf.getInterpreterLocalRepoPath(), 
context.getInterpreterSettingId());
+    if (isYarnMode()
+        && getDeployMode().equals("cluster")
+        && Files.exists(localRepoPath)
+        && Files.isDirectory(localRepoPath)) {
+      try {
+        StreamSupport.stream(
+                Files.newDirectoryStream(localRepoPath, entry -> 
Files.isRegularFile(entry))
+                    .spliterator(),
+                false)
+            .map(jar -> jar.toAbsolutePath().toString())
+            .reduce((x, y) -> x.concat(",").concat(y))
+            .ifPresent(extraJars -> sparkConfBuilder.append(" --jars 
").append(extraJars));
+      } catch (IOException e) {
+        LOGGER.error("Cannot make a list of additional jars from localRepo: 
{}", localRepoPath, e);
+      }
+
+    }
 
     env.put("ZEPPELIN_SPARK_CONF", sparkConfBuilder.toString());
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/ca7c9cd8/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
 
b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
index c2abd60..119a67e 100644
--- 
a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
+++ 
b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java
@@ -17,6 +17,10 @@
 
 package org.apache.zeppelin.interpreter.launcher;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.conf.ZeppelinConfiguration;
 import org.apache.zeppelin.interpreter.InterpreterOption;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
@@ -177,6 +181,45 @@ public class SparkInterpreterLauncherTest {
     InterpreterOption option = new InterpreterOption();
     option.setUserImpersonate(true);
     InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), 
context.getInterpreterSettingId());
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+    Files.createDirectories(localRepoPath);
+    Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), 
"test.jar"));
+
+    InterpreterClient client = launcher.launch(context);
+    assertTrue( client instanceof RemoteInterpreterManagedProcess);
+    RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
+    assertEquals("spark", interpreterProcess.getInterpreterSettingName());
+    
assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
+    
assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
+    assertEquals(zConf.getInterpreterRemoteRunnerPath(), 
interpreterProcess.getInterpreterRunner());
+    assertTrue(interpreterProcess.getEnv().size() >= 3);
+    assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
+    assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
+    assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties 
--conf spark.files='file_1' --conf spark.jars='jar_1' --conf 
spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false --proxy-user user1 --jars " + 
Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar").toString(), 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    Files.deleteIfExists(Paths.get(localRepoPath.toAbsolutePath().toString(), 
"test.jar"));
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+  }
+
+  @Test
+  public void testYarnClusterMode_3() throws IOException {
+    ZeppelinConfiguration zConf = new ZeppelinConfiguration();
+    SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, 
null);
+    Properties properties = new Properties();
+    properties.setProperty("SPARK_HOME", "/user/spark");
+    properties.setProperty("property_1", "value_1");
+    properties.setProperty("master", "yarn");
+    properties.setProperty("spark.submit.deployMode", "cluster");
+    properties.setProperty("spark.files", "file_1");
+    properties.setProperty("spark.jars", "jar_1");
+
+    InterpreterOption option = new InterpreterOption();
+    option.setUserImpersonate(true);
+    InterpreterLaunchContext context = new 
InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", 
"groupId", "spark", "spark", 0, "host");
+    Path localRepoPath = Paths.get(zConf.getInterpreterLocalRepoPath(), 
context.getInterpreterSettingId());
+    FileUtils.deleteDirectory(localRepoPath.toFile());
+    Files.createDirectories(localRepoPath);
+
     InterpreterClient client = launcher.launch(context);
     assertTrue( client instanceof RemoteInterpreterManagedProcess);
     RemoteInterpreterManagedProcess interpreterProcess = 
(RemoteInterpreterManagedProcess) client;
@@ -188,5 +231,6 @@ public class SparkInterpreterLauncherTest {
     assertEquals("/user/spark", interpreterProcess.getEnv().get("SPARK_HOME"));
     assertEquals("true", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
     assertEquals(" --master yarn --files .//conf/log4j_yarn_cluster.properties 
--conf spark.files='file_1' --conf spark.jars='jar_1' --conf 
spark.submit.deployMode='cluster' --conf spark.yarn.isPython=true --conf 
spark.yarn.submit.waitAppCompletion=false --proxy-user user1", 
interpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
+    FileUtils.deleteDirectory(localRepoPath.toFile());
   }
 }

Reply via email to