Repository: oozie Updated Branches: refs/heads/master 6b61d878e -> 5a598039a
OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5a598039 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5a598039 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5a598039 Branch: refs/heads/master Commit: 5a598039a8aeabcf46d0f1d13542c94ff9b8475f Parents: 6b61d87 Author: Robert Kanter <[email protected]> Authored: Mon Aug 31 16:32:47 2015 -0700 Committer: Robert Kanter <[email protected]> Committed: Mon Aug 31 16:32:47 2015 -0700 ---------------------------------------------------------------------- .../service/SparkConfigurationService.java | 13 +- core/src/main/resources/oozie-default.xml | 10 ++ .../service/TestSparkConfigurationService.java | 12 ++ .../site/twiki/DG_SparkActionExtension.twiki | 17 +-- release-log.txt | 1 + sharelib/spark/pom.xml | 142 +++++++++++++++++-- .../SparkMain.java | 132 ++++++++++++++++- .../action/hadoop/TestSparkActionExecutor.java | 18 ++- 8 files changed, 310 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java index 1b7cf4a..b29ab8d 100644 --- a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java +++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java @@ -36,10 +36,14 @@ public class SparkConfigurationService implements Service { private static XLog LOG = XLog.getLog(SparkConfigurationService.class); - public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations"; + public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService."; + public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations"; + public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR + = CONF_PREFIX + "spark.configurations.ignore.spark.yarn.jar"; private Map<String, Map<String, String>> sparkConfigs; private static final String SPARK_CONFIG_FILE = "spark-defaults.conf"; + private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar"; @Override public void init(Services services) throws ServiceException { @@ -59,8 +63,9 @@ public class SparkConfigurationService implements Service { private void loadSparkConfigs() throws ServiceException { sparkConfigs = new HashMap<String, Map<String, String>>(); File configDir = new File(ConfigurationService.getConfigurationDirectory()); - String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION); + String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS); if (confDefs != null) { + boolean ignoreSparkYarnJar = ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR); for (String confDef : confDefs) { if (confDef.trim().length() > 0) { String[] parts = confDef.split("="); @@ -80,6 +85,10 @@ public class SparkConfigurationService implements Service { fr = new FileReader(file); props.load(fr); fr.close(); + if (ignoreSparkYarnJar) { + // Ignore spark.yarn.jar because it may interfere with the Spark Sharelib jars + props.remove(SPARK_YARN_JAR_PROP); + } sparkConfigs.put(hostPort, propsToMap(props)); LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath()); } catch (IOException ioe) { http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index fa7754a..8a0bc3b 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2567,6 +2567,16 @@ </property> <property> + <name>oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar</name> + <value>true</value> + <description> + If true, Oozie will ignore the "spark.yarn.jar" property from any Spark configurations specified in + oozie.service.SparkConfigurationService.spark.configurations. If false, Oozie will not ignore it. It is recommended + to leave this as true because it can interfere with the jars in the Spark sharelib. + </description> + </property> + + <property> <name>oozie.email.attachment.enabled</name> <value>true</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java index b2c499d..9d82fdc 100644 --- a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java +++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java @@ -68,6 +68,7 @@ public class TestSparkConfigurationService extends XTestCase { Properties sparkConf1Props = new Properties(); sparkConf1Props.setProperty("a", "A"); sparkConf1Props.setProperty("b", "B"); + sparkConf1Props.setProperty("spark.yarn.jar", "foo"); // should be ignored by default FileOutputStream fos = null; try { fos = new FileOutputStream(sparkConf1); @@ -109,6 +110,17 @@ public class TestSparkConfigurationService extends XTestCase { assertEquals(2, sparkConfigs.size()); assertEquals("Y", sparkConfigs.get("y")); assertEquals("Z", sparkConfigs.get("z")); + scs.destroy(); + // Setting this to false should make it not ignore spark.yarn.jar + ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", + false); + scs.init(Services.get()); + sparkConfigs = scs.getSparkConfig("rm1"); + assertEquals(3, sparkConfigs.size()); + assertEquals("A", sparkConfigs.get("a")); + assertEquals("B", sparkConfigs.get("b")); + assertEquals("foo", sparkConfigs.get("spark.yarn.jar")); + ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", true); scs.destroy(); ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 32ebe12..ee11229 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -82,7 +82,14 @@ properties that are passed to the Spark job. The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, or local. -The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. +The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. This is typically +not required because you can specify it as part of =master= (i.e. master=yarn, mode=client is equivalent to master=yarn-client). +A local =master= always runs in client mode. + +Depending on the =master= (and =mode=) entered, the Spark job will run differently as follows: + * local mode: everything runs here in the Launcher Job. + * yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn. + * yarn-cluster mode: the driver and executor run in Yarn. The =name= element indicates the name of the spark application. @@ -142,14 +149,8 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr ---+++ Spark on YARN -To make the Spark action run on YARN, you need to follow these steps: - -1. Make the spark-assembly jar available to your Spark action - -2. Specify "yarn-client" or "yarn-cluster" in the =master= element - To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties -either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= +either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml. 1. spark.yarn.historyServer.address=http://SPH-HOST:18088 http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ce62dc0..a7c180b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter) OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter) OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter) OOZIE-2245 Service to periodically check database schema (rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index 6f7e74a..0878ae6 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -68,41 +68,76 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-graphx_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-hive_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> <exclusion> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> </exclusion> <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>jul-to-slf4j</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> </exclusion> <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> </exclusion> </exclusions> </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-core</artifactId> - <classifier>tests</classifier> - <scope>test</scope> + <groupId>org.apache.spark</groupId> + <artifactId>spark-mllib_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> </dependency> - <dependency> <groupId>org.apache.spark</groupId> - <artifactId>spark-graphx_2.10</artifactId> + <artifactId>spark-repl_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-sql_2.10</artifactId> <version>${spark.version}</version> <scope>compile</scope> </dependency> - <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>${spark.version}</version> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-flume_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-bagel_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.apache.oozie</groupId> @@ -117,8 +152,16 @@ </dependency> <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> </dependency> <dependency> @@ -205,5 +248,76 @@ </plugins> </build> + <profiles> + <profile> + <id>hadoop-2</id> + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-yarn_2.10</artifactId> + <version>${spark.version}</version> + <scope>compile</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-web-proxy</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-shuffle</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-app</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aws</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + </profiles> </project> http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java index b18a0b9..5624951 100644 --- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java @@ -22,7 +22,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.spark.deploy.SparkSubmit; +import java.io.File; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; public class SparkMain extends LauncherMain { @@ -31,8 +33,14 @@ public class SparkMain extends LauncherMain { private static final String JOB_NAME_OPTION = "--name"; private static final String CLASS_NAME_OPTION = "--class"; private static final String VERBOSE_OPTION = "--verbose"; - private static final String DELIM = " "; + private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; + private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; + private static final String DIST_FILES = "spark.yarn.dist.files="; + private static final String JARS_OPTION = "--jars"; + private static final String DELIM = "\\s+"; + private String sparkJars = null; + private String sparkClasspath = null; public static void main(String[] args) throws Exception { run(SparkMain.class, args); @@ -47,13 +55,18 @@ public class SparkMain extends LauncherMain { List<String> sparkArgs = new ArrayList<String>(); sparkArgs.add(MASTER_OPTION); - sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_MASTER)); + String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); + sparkArgs.add(master); String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); if (sparkDeployMode != null) { sparkArgs.add(MODE_OPTION); sparkArgs.add(sparkDeployMode); } + boolean yarnClusterMode = master.equals("yarn-cluster") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster")); + boolean yarnClientMode = master.equals("yarn-client") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client")); sparkArgs.add(JOB_NAME_OPTION); sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); @@ -64,19 +77,87 @@ public class SparkMain extends LauncherMain { sparkArgs.add(className); } + String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + + // In local mode, everything runs here in the Launcher Job. + // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn. + // In yarn-cluster mode, the driver and executor run in Yarn. + // Due to this, configuring Spark's classpath is not straightforward (see below) + + // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do + // this depends on what mode is being used: + // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument + // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath + // yarn-client: Passed as comma-separted list via spark.yarn.dist.files + // + // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being + // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have + // to use local paths for these because they're not in the Sharelib. + // + // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only + // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar + // names for the classpaths, as they'll be found in the working directory. + // + // This part looks more complicated than it is because we need to append the jars if the user already set things for + // these options + determineSparkJarsAndClasspath(actionConf, jarPath); + boolean addedExecutorClasspath = false; + boolean addedDriverClasspath = false; + boolean addedDistFiles = false; + boolean addedJars = false; String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); if (StringUtils.isNotEmpty(sparkOpts)) { String[] sparkOptions = sparkOpts.split(DELIM); - for (String opt : sparkOptions) { + for (int i = 0; i < sparkOptions.length; i++) { + String opt = sparkOptions[i]; + if (sparkJars != null) { + if (opt.equals(JARS_OPTION)) { + sparkArgs.add(opt); + i++; + opt = sparkOptions[i]; + opt = opt + "," + sparkJars; + addedJars = true; + } else if (yarnClientMode && opt.startsWith(DIST_FILES)) { + opt = opt + "," + sparkJars; + addedDistFiles = true; + } + } + if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + if (opt.startsWith(EXECUTOR_CLASSPATH)) { + opt = opt + File.pathSeparator + sparkClasspath; + addedExecutorClasspath = true; + } + if (opt.startsWith(DRIVER_CLASSPATH)) { + opt = opt + File.pathSeparator + sparkClasspath; + addedDriverClasspath = true; + } + } sparkArgs.add(opt); } } + if (!addedJars && sparkJars != null) { + sparkArgs.add("--jars"); + sparkArgs.add(sparkJars); + } + if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + if (!addedExecutorClasspath) { + sparkArgs.add("--conf"); + sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath); + } + if (!addedDriverClasspath) { + sparkArgs.add("--conf"); + sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath); + } + } + if (yarnClientMode && !addedDistFiles && sparkJars != null) { + sparkArgs.add("--conf"); + sparkArgs.add(DIST_FILES + sparkJars); + } if (!sparkArgs.contains(VERBOSE_OPTION)) { sparkArgs.add(VERBOSE_OPTION); } - String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); sparkArgs.add(jarPath); for (String arg : args) { @@ -103,4 +184,47 @@ public class SparkMain extends LauncherMain { System.out.flush(); SparkSubmit.main(args); } + + private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) { + // distCache gets all of the Sharelib and user jars (from the Distributed Cache) + // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache + // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths + // sparkClasspath becomes all of the jar names in sparkJars (without paths) + // We also remove the Spark job jar and job.jar + String[] distCache = new String[]{}; + String dCache = actionConf.get("mapreduce.job.classpath.files"); + if (dCache != null) { + distCache = dCache.split(","); + } + String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator); + StringBuilder cp = new StringBuilder(); + StringBuilder jars = new StringBuilder(); + HashSet<String> distCacheJars = new HashSet<String>(distCache.length); + for (String path : distCache) { + // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here + if (!path.equals(jarPath)) { + String name = path.substring(path.lastIndexOf("/") + 1); + distCacheJars.add(name); + cp.append(name).append(File.pathSeparator); + jars.append(path).append(","); + } + } + for (String path : classpath) { + if (!path.startsWith("job.jar") && path.endsWith(".jar")) { + String name = path.substring(path.lastIndexOf("/") + 1); + if (!distCacheJars.contains(name)) { + jars.append(path).append(","); + } + cp.append(name).append(File.pathSeparator); + } + } + if (cp.length() > 0) { + cp.setLength(cp.length() - 1); + sparkClasspath = cp.toString(); + } + if (jars.length() > 0) { + jars.setLength(jars.length() - 1); + sparkJars = jars.toString(); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index f271abc..dcd2360 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -67,8 +67,11 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { } public void testSetupMethods() throws Exception { - _testSetupMethods("local[*]", new HashMap<String, String>()); - _testSetupMethods("yarn", new HashMap<String, String>()); + _testSetupMethods("local[*]", new HashMap<String, String>(), "client"); + _testSetupMethods("yarn", new HashMap<String, String>(), "cluster"); + _testSetupMethods("yarn", new HashMap<String, String>(), "client"); + _testSetupMethods("yarn-cluster", new HashMap<String, String>(), null); + _testSetupMethods("yarn-client", new HashMap<String, String>(), null); } public void testSetupMethodsWithSparkConfiguration() throws Exception { @@ -91,15 +94,16 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath()); scs.init(Services.get()); - _testSetupMethods("local[*]", new HashMap<String, String>()); + _testSetupMethods("local[*]", new HashMap<String, String>(), "client"); Map<String, String> extraSparkOpts = new HashMap<String, String>(2); extraSparkOpts.put("a", "A"); extraSparkOpts.put("b", "B"); - _testSetupMethods("yarn", extraSparkOpts); + _testSetupMethods("yarn-cluster", extraSparkOpts, null); + _testSetupMethods("yarn-client", extraSparkOpts, null); } @SuppressWarnings("unchecked") - private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception { + private void _testSetupMethods(String master, Map<String, String> extraSparkOpts, String mode) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses()); @@ -107,7 +111,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + "<master>" + master + "</master>" + - "<mode>client</mode>" + + (mode != null ? "<mode>" + mode + "</mode>" : "") + "<name>Some Name</name>" + "<class>org.apache.oozie.foo</class>" + "<jar>" + getNameNodeUri() + "/foo.jar</jar>" + @@ -126,7 +130,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { Configuration conf = ae.createBaseHadoopConf(context, actionXml); ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); assertEquals(master, conf.get("oozie.spark.master")); - assertEquals("client", conf.get("oozie.spark.mode")); + assertEquals(mode, conf.get("oozie.spark.mode")); assertEquals("Some Name", conf.get("oozie.spark.name")); assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class")); assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar"));
