Repository: oozie Updated Branches: refs/heads/master 4add492c0 -> f0bceb0bc
OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f0bceb0b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f0bceb0b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f0bceb0b Branch: refs/heads/master Commit: f0bceb0bc94e38cc4bf3818400855818b74c2655 Parents: 4add492 Author: Robert Kanter <[email protected]> Authored: Fri Apr 10 14:03:20 2015 -0700 Committer: Robert Kanter <[email protected]> Committed: Fri Apr 10 14:03:20 2015 -0700 ---------------------------------------------------------------------- .../action/hadoop/SparkActionExecutor.java | 16 ++- .../service/SparkConfigurationService.java | 129 ++++++++++++++++++ core/src/main/resources/oozie-default.xml | 16 ++- .../service/TestSparkConfigurationService.java | 131 +++++++++++++++++++ .../site/twiki/DG_SparkActionExtension.twiki | 24 +++- examples/src/main/apps/spark/workflow.xml | 5 +- release-log.txt | 1 + .../action/hadoop/TestSparkActionExecutor.java | 87 +++++++++++- 8 files changed, 403 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 732d5f0..219a116 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.SparkConfigurationService; import org.jdom.Element; import org.jdom.Namespace; import java.util.ArrayList; import java.util.List; +import java.util.Map; public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; @@ -70,9 +73,20 @@ public class SparkActionExecutor extends JavaActionExecutor { String jarLocation = actionXml.getChildTextTrim("jar", ns); actionConf.set(SPARK_JAR, jarLocation); + StringBuilder sparkOptsSb = new StringBuilder(); + if (master.startsWith("yarn")) { + String resourceManager = actionConf.get(HADOOP_JOB_TRACKER); + Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); + for (Map.Entry<String, String> entry : sparkConfig.entrySet()) { + sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" "); + } + } String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns); if (sparkOpts != null) { - actionConf.set(SPARK_OPTS, sparkOpts); + sparkOptsSb.append(sparkOpts); + } + if (sparkOptsSb.length() > 0) { + actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim()); } return actionConf; http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java new file mode 100644 index 0000000..1b7cf4a --- /dev/null +++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +public class SparkConfigurationService implements Service { + + private static XLog LOG = XLog.getLog(SparkConfigurationService.class); + + public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations"; + + private Map<String, Map<String, String>> sparkConfigs; + private static final String SPARK_CONFIG_FILE = "spark-defaults.conf"; + + @Override + public void init(Services services) throws ServiceException { + loadSparkConfigs(); + } + + @Override + public void destroy() { + sparkConfigs.clear(); + } + + @Override + public Class<? extends Service> getInterface() { + return SparkConfigurationService.class; + } + + private void loadSparkConfigs() throws ServiceException { + sparkConfigs = new HashMap<String, Map<String, String>>(); + File configDir = new File(ConfigurationService.getConfigurationDirectory()); + String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION); + if (confDefs != null) { + for (String confDef : confDefs) { + if (confDef.trim().length() > 0) { + String[] parts = confDef.split("="); + if (parts.length == 2) { + String hostPort = parts[0]; + String confDir = parts[1]; + File dir = new File(confDir); + if (!dir.isAbsolute()) { + dir = new File(configDir, confDir); + } + if (dir.exists()) { + File file = new File(dir, SPARK_CONFIG_FILE); + if (file.exists()) { + Properties props = new Properties(); + FileReader fr = null; + try { + fr = new FileReader(file); + props.load(fr); + fr.close(); + sparkConfigs.put(hostPort, propsToMap(props)); + LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath()); + } catch (IOException ioe) { + LOG.warn("Spark Configuration could not be loaded for {0}: {1}", + hostPort, ioe.getMessage(), ioe); + } finally { + IOUtils.closeSafely(fr); + } + } else { + LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", + hostPort, file.getAbsolutePath()); + } + } else { + LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist", + hostPort, dir.getAbsolutePath()); + } + } else { + LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef); + } + } + } + } else { + LOG.info("Spark Configuration(s) not specified"); + } + } + + private Map<String, String> propsToMap(Properties props) { + Map<String, String> map = new HashMap<String, String>(props.size()); + for (String key : props.stringPropertyNames()) { + map.put(key, props.getProperty(key)); + } + return map; + } + + public Map<String, String> getSparkConfig(String resourceManagerHostPort) { + resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null; + Map<String, String> config = sparkConfigs.get(resourceManagerHostPort); + if (config == null) { + config = sparkConfigs.get("*"); + if (config == null) { + config = new HashMap<String, String>(); + } + } + return config; + } +} + http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 3936fca..ffff7c3 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -130,7 +130,8 @@ org.apache.oozie.service.GroupsService, org.apache.oozie.service.ProxyUserService, org.apache.oozie.service.XLogStreamingService, - org.apache.oozie.service.JvmPauseMonitorService + org.apache.oozie.service.JvmPauseMonitorService, + org.apache.oozie.service.SparkConfigurationService </value> <description> All services to be created and managed by Oozie Services singleton. @@ -2505,4 +2506,17 @@ </description> </property> + <property> + <name>oozie.service.SparkConfigurationService.spark.configurations</name> + <value>*=spark-conf</value> + <description> + Comma separated AUTHORITY=SPARK_CONF_DIR, where AUTHORITY is the HOST:PORT of + the ResourceManager of a YARN cluster. The wildcard '*' configuration is + used when there is no exact match for an authority. The SPARK_CONF_DIR contains + the relevant spark-defaults.conf properties file. If the path is relative is looked within + the Oozie configuration directory; though the path can be absolute. This is only used + when the Spark master is set to either "yarn-client" or "yarn-cluster". + </description> + </property> + </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java new file mode 100644 index 0000000..b2c499d --- /dev/null +++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.service; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.test.XTestCase; +import org.apache.oozie.util.IOUtils; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Map; +import java.util.Properties; + +public class TestSparkConfigurationService extends XTestCase { + + protected void setUp() throws Exception { + super.setUp(); + Services services = new Services(); + services.init(); + } + + protected void tearDown() throws Exception { + Services.get().destroy(); + super.tearDown(); + } + + public void testSparkConfigsEmpty() throws Exception { + SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", ""); + scs.init(Services.get()); + Map<String, String> sparkConfigs = scs.getSparkConfig("foo"); + assertEquals(0, sparkConfigs.size()); + } + + public void testSparkConfigs() throws Exception { + File sparkConf1Dir = new File(getTestCaseConfDir(), "spark-conf-1"); + File sparkConf3Dir = new File(getTestCaseConfDir(), "spark-conf-3"); + File sparkConf4Dir = new File(getTestCaseConfDir(), "spark-conf-4"); + sparkConf1Dir.mkdirs(); + sparkConf3Dir.mkdirs(); + sparkConf4Dir.mkdirs(); + File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf"); + Properties sparkConf1Props = new Properties(); + sparkConf1Props.setProperty("a", "A"); + sparkConf1Props.setProperty("b", "B"); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(sparkConf1); + sparkConf1Props.store(fos, ""); + } finally { + IOUtils.closeSafely(fos); + } + File sparkConf4 = new File(sparkConf4Dir, "spark-defaults.conf"); + Properties sparkConf4Props = new Properties(); + sparkConf4Props.setProperty("y", "Y"); + sparkConf4Props.setProperty("z", "Z"); + fos = null; + try { + fos = new FileOutputStream(sparkConf4); + sparkConf4Props.store(fos, ""); + } finally { + IOUtils.closeSafely(fos); + } + + SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", + "rm1=" + sparkConf1Dir.getAbsolutePath() + // absolute path + ",rm2" + // invalid entry + ",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file + ",rm4=" + sparkConf4Dir.getName()); // relative path + scs.init(Services.get()); + Map<String, String> sparkConfigs = scs.getSparkConfig("foo"); + assertEquals(0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm1"); + assertEquals(2, sparkConfigs.size()); + assertEquals("A", sparkConfigs.get("a")); + assertEquals("B", sparkConfigs.get("b")); + sparkConfigs = scs.getSparkConfig("rm2"); + assertEquals(0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm3"); + assertEquals(0, sparkConfigs.size()); + sparkConfigs = scs.getSparkConfig("rm4"); + assertEquals(2, sparkConfigs.size()); + assertEquals("Y", sparkConfigs.get("y")); + assertEquals("Z", sparkConfigs.get("z")); + + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", + "rm1=" + sparkConf1Dir.getAbsolutePath() + // defined + ",*=" + sparkConf4Dir.getAbsolutePath()); // wildcard + scs.init(Services.get()); + sparkConfigs = scs.getSparkConfig("rm1"); + assertEquals(2, sparkConfigs.size()); + assertEquals("A", sparkConfigs.get("a")); + assertEquals("B", sparkConfigs.get("b")); + sparkConfigs = scs.getSparkConfig("rm2"); + assertEquals(2, sparkConfigs.size()); + assertEquals("Y", sparkConfigs.get("y")); + assertEquals("Z", sparkConfigs.get("z")); + sparkConfigs = scs.getSparkConfig("foo"); + assertEquals(2, sparkConfigs.size()); + assertEquals("Y", sparkConfigs.get("y")); + assertEquals("Z", sparkConfigs.get("z")); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index 8e5cd9b..32ebe12 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -79,7 +79,8 @@ specify multiple =job.xml= files. The =configuration= element, if present, contains configuration properties that are passed to the Spark job. -The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn, or local. +The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master, +or local. The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. @@ -89,7 +90,9 @@ The =class= element if present, indicates the spark's application main class. The =jar= element indicates a comma separated list of jars or python files. -The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. +The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. Spark configuration +options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations= +in oozie-site.xml. The =spark-opts= configs have priority. The =arg= element if present, contains arguments that can be passed to spark application. @@ -137,6 +140,23 @@ Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOU From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console. +---+++ Spark on YARN + +To make the Spark action run on YARN, you need to follow these steps: + +1. Make the spark-assembly jar available to your Spark action + +2. Specify "yarn-client" or "yarn-cluster" in the =master= element + +To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties +either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= + +1. spark.yarn.historyServer.address=http://SPH-HOST:18088 + +2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory + +3. spark.eventLog.enabled=true + ---++ Appendix, Spark XML-Schema ---+++ AE.A Appendix A, Spark XML-Schema http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/examples/src/main/apps/spark/workflow.xml ---------------------------------------------------------------------- diff --git a/examples/src/main/apps/spark/workflow.xml b/examples/src/main/apps/spark/workflow.xml index ecb9eaa..1b1a3e8 100644 --- a/examples/src/main/apps/spark/workflow.xml +++ b/examples/src/main/apps/spark/workflow.xml @@ -22,12 +22,15 @@ <spark xmlns="uri:oozie:spark-action:0.1"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> + <prepare> + <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark"/> + </prepare> <master>${master}</master> <name>Spark-FileCopy</name> <class>org.apache.oozie.example.SparkFileCopy</class> <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar> <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg> - <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output</arg> + <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark</arg> </spark> <ok to="end" /> <error to="fail" /> http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 642cf31..bfa9fdd 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter) OOZIE-2140 Audit Log should be shown in Oozie UI (puru) OOZIE-2139 Coord update doesn't work for job which is submitted by bundle (puru) OOZIE-1726 Oozie does not support _HOST when configuring kerberos security (venkatnrangan via bzhang) http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index a15e76b..f271abc 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.action.hadoop; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; @@ -27,12 +28,17 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; +import org.apache.oozie.service.SparkConfigurationService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import java.io.FileOutputStream; import java.io.OutputStreamWriter; import java.io.File; import java.io.FileInputStream; @@ -42,10 +48,17 @@ import java.io.Writer; import java.text.MessageFormat; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class TestSparkActionExecutor extends ActionExecutorTestCase { private static final String SPARK_FILENAME = "file.txt"; private static final String OUTPUT = "output"; + private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= ]+)"); @Override protected void setSystemProps() throws Exception { @@ -53,10 +66,82 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName()); } - @SuppressWarnings("unchecked") public void testSetupMethods() throws Exception { + _testSetupMethods("local[*]", new HashMap<String, String>()); + _testSetupMethods("yarn", new HashMap<String, String>()); + } + + public void testSetupMethodsWithSparkConfiguration() throws Exception { + File sparkConfDir = new File(getTestCaseConfDir(), "spark-conf"); + sparkConfDir.mkdirs(); + File sparkConf = new File(sparkConfDir, "spark-defaults.conf"); + Properties sparkConfProps = new Properties(); + sparkConfProps.setProperty("a", "A"); + sparkConfProps.setProperty("b", "B"); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(sparkConf); + sparkConfProps.store(fos, ""); + } finally { + IOUtils.closeSafely(fos); + } + SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class); + scs.destroy(); + ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", + getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath()); + scs.init(Services.get()); + + _testSetupMethods("local[*]", new HashMap<String, String>()); + Map<String, String> extraSparkOpts = new HashMap<String, String>(2); + extraSparkOpts.put("a", "A"); + extraSparkOpts.put("b", "B"); + _testSetupMethods("yarn", extraSparkOpts); + } + + @SuppressWarnings("unchecked") + private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses()); + + Element actionXml = XmlUtils.parseXml("<spark>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<master>" + master + "</master>" + + "<mode>client</mode>" + + "<name>Some Name</name>" + + "<class>org.apache.oozie.foo</class>" + + "<jar>" + getNameNodeUri() + "/foo.jar</jar>" + + "<spark-opts>--conf foo=bar</spark-opts>" + + "</spark>"); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + + Context context = new Context(wf, action); + + Configuration conf = ae.createBaseHadoopConf(context, actionXml); + ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir()); + assertEquals(master, conf.get("oozie.spark.master")); + assertEquals("client", conf.get("oozie.spark.mode")); + assertEquals("Some Name", conf.get("oozie.spark.name")); + assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class")); + assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar")); + Map<String, String> sparkOpts = new HashMap<String, String>(); + sparkOpts.put("foo", "bar"); + sparkOpts.putAll(extraSparkOpts); + Matcher m = SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts")); + int count = 0; + while (m.find()) { + count++; + String key = m.group(1); + String val = m.group(2); + assertEquals(sparkOpts.get(key), val); + } + assertEquals(sparkOpts.size(), count); } private String getActionXml() {
