OOZIE-2687 Create XML schema for launcher configurations (asasvari)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8b247f28 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8b247f28 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8b247f28 Branch: refs/heads/master Commit: 8b247f28f0496af3f217ec4becb3f5f931f08511 Parents: 69c5091 Author: Attila Sasvari <asasv...@cloudera.com> Authored: Mon Sep 18 12:58:01 2017 +0200 Committer: Attila Sasvari <asasv...@cloudera.com> Committed: Mon Sep 18 12:58:01 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 325 ++++++++---- .../action/hadoop/SqoopActionExecutor.java | 2 +- .../oozie/action/ssh/SshActionExecutor.java | 10 +- .../org/apache/oozie/jms/MessageReceiver.java | 2 +- .../oozie/service/LiteWorkflowStoreService.java | 1 - .../org/apache/oozie/service/SchemaService.java | 9 +- .../org/apache/oozie/util/WritableUtils.java | 7 + .../workflow/lite/LiteWorkflowAppParser.java | 30 +- .../oozie/workflow/lite/LiteWorkflowLib.java | 2 - core/src/main/resources/oozie-default.xml | 43 +- .../action/hadoop/ActionExecutorTestCase.java | 39 +- .../oozie/action/hadoop/LauncherMainTester.java | 9 + .../action/hadoop/TestJavaActionExecutor.java | 212 +++++++- .../apache/oozie/service/TestSchemaService.java | 497 ++++++++++++++++--- .../oozie/util/TestMetricsInstrumentation.java | 4 +- .../lite/TestLiteWorkflowAppParser.java | 51 ++ pom.xml | 2 +- release-log.txt | 1 + .../apache/oozie/action/hadoop/LauncherAM.java | 8 + .../apache/oozie/action/hadoop/UDFTester.java | 2 +- 20 files changed, 1042 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 49fd4b8..9d1afb5 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -22,6 +22,28 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.io.Closeables; import com.google.common.primitives.Ints; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -41,6 +63,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -98,8 +121,6 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -107,12 +128,18 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Objects; +import java.util.Properties; import java.util.Properties; import java.util.Set; +import java.util.Set; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Closeables; public class JavaActionExecutor extends ActionExecutor { - public static final String RUNNING = "RUNNING"; public static final String SUCCEEDED = "SUCCEEDED"; public static final String KILLED = "KILLED"; @@ -122,6 +149,11 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_NAME_NODE = "fs.default.name"; public static final String OOZIE_COMMON_LIBDIR = "oozie"; + public static final String DEFAULT_LAUNCHER_VCORES = "oozie.launcher.default.vcores"; + public static final String DEFAULT_LAUNCHER_MEMORY_MB = "oozie.launcher.default.memory.mb"; + public static final String DEFAULT_LAUNCHER_PRIORITY = "oozie.launcher.default.priority"; + public static final String DEFAULT_LAUNCHER_QUEUE = "oozie.launcher.default.queue"; + public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; @@ -260,6 +292,12 @@ public class JavaActionExecutor extends ActionExecutor { conf.set(HADOOP_NAME_NODE, nameNode); conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); + // FIXME - think about this! + Element e = actionXml.getChild("config-class", ns); + if (e != null) { + conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } + return conf; } @@ -299,6 +337,11 @@ public class JavaActionExecutor extends ActionExecutor { throw convertException(ex); } XConfiguration.copy(launcherConf, conf); + // Inject config-class for launcher to use for action + Element e = actionXml.getChild("config-class", ns); + if (e != null) { + conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } checkForDisallowedProps(launcherConf, "launcher configuration"); return conf; } @@ -876,13 +919,20 @@ public class JavaActionExecutor extends ActionExecutor { if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) { opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS)); } + List<Element> javaopts = actionXml.getChildren("java-opt", ns); - for (Element opt: javaopts) { - opts.append(" ").append(opt.getTextTrim()); + + // Either one or more <java-opt> element or one <java-opts> can be present since oozie-workflow-0.4 + if (!javaopts.isEmpty()) { + for (Element opt : javaopts) { + opts.append(" ").append(opt.getTextTrim()); + } } - Element opt = actionXml.getChild("java-opts", ns); - if (opt != null) { - opts.append(" ").append(opt.getTextTrim()); + else { + Element opt = actionXml.getChild("java-opts", ns); + if (opt != null) { + opts.append(" ").append(opt.getTextTrim()); + } } launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); @@ -942,6 +992,7 @@ public class JavaActionExecutor extends ActionExecutor { } Element actionXml = XmlUtils.parseXml(action.getConf()); + LOG.debug("ActionXML: {0}", action.getConf()); // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); @@ -950,6 +1001,14 @@ public class JavaActionExecutor extends ActionExecutor { LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); + String jobName = actionConf.get(HADOOP_JOB_NAME); + if (jobName == null || jobName.isEmpty()) { + jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", + getType(), context.getWorkflow().getAppName(), + action.getName(), context.getWorkflow().getId()); + actionConf.set(HADOOP_JOB_NAME, jobName); + } + injectActionCallback(context, actionConf); if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { @@ -1052,7 +1111,7 @@ public class JavaActionExecutor extends ActionExecutor { ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(), - credentials); + credentials, actionXml); yarnClient.submitApplication(appContext); launcherId = appId.toString(); @@ -1138,22 +1197,19 @@ public class JavaActionExecutor extends ActionExecutor { } private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, - String user, Context context, Configuration actionConf, String actionName, - Credentials credentials) + String user, Context context, Configuration actionConf, String actionName, + Credentials credentials, Element actionXml) throws IOException, HadoopAccessorException, URISyntaxException { ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - String appName = getAppName(context); - + setResources(launcherJobConf, appContext); + setPriority(launcherJobConf, appContext); + setQueue(launcherJobConf, appContext); appContext.setApplicationId(appId); - appContext.setApplicationName(appName); + setApplicationName(context, actionName, appContext); appContext.setApplicationType("Oozie Launcher"); - Priority pri = Records.newRecord(Priority.class); - int priority = 0; // TODO: OYA: Add a constant or a config - pri.setPriority(priority); - appContext.setPriority(pri); - appContext.setQueue("default"); // TODO: will be possible to set in <launcher> + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // Set the resources to localize @@ -1170,40 +1226,13 @@ public class JavaActionExecutor extends ActionExecutor { localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR); amContainer.setLocalResources(localResources); - // Set the environment variables - Map<String, String> env = new HashMap<String, String>(); - // This adds the Hadoop jars to the classpath in the Launcher JVM - ClasspathUtils.setupClasspath(env, launcherJobConf); - - if (needToAddMapReduceToClassPath()) { - ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); - } - - addActionSpecificEnvVars(env); - amContainer.setEnvironment(Collections.unmodifiableMap(env)); - - // Set the command - List<String> vargs = new ArrayList<String>(6); - vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) - + "/bin/java"); - - vargs.add("-Dlog4j.configuration=container-log4j.properties"); - vargs.add("-Dlog4j.debug=true"); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0); - vargs.add("-Dhadoop.root.logger=INFO,CLA"); - vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); - vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); - - Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); - vargs.add("-Djava.io.tmpdir=" + amTmpDir); + setEnvironmentVariables(launcherJobConf, amContainer); + List<String> vargs = createCommand(context); + setJavaOpts(launcherJobConf, actionXml, vargs); vargs.add(LauncherAM.class.getCanonicalName()); - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + - Path.SEPARATOR + ApplicationConstants.STDOUT); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + - Path.SEPARATOR + ApplicationConstants.STDERR); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); @@ -1221,51 +1250,181 @@ public class JavaActionExecutor extends ActionExecutor { amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } - // Set Resources - // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores) - Resource resource = Resource.newInstance(2048, 1); - appContext.setResource(resource); appContext.setCancelTokensWhenComplete(true); return appContext; } - Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context, - final WorkflowAction action, - final Configuration actionConf) throws Exception { - if (context == null || action == null) { - LOG.warn("context or action is null"); - return null; + private List<String> createCommand(Context context) { + List<String> vargs = new ArrayList<String>(6); + vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) + + "/bin/java"); + + vargs.add("-Dlog4j.configuration=container-log4j.properties"); + vargs.add("-Dlog4j.debug=true"); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024); + vargs.add("-Dhadoop.root.logger=INFO,CLA"); + vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); + vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); + return vargs; + } + + private void setJavaOpts(Configuration launcherJobConf, Element actionXml, List<String> vargs) { + // Note: for backward compatibility reasons, we have to support the <java-opts> tag inside the <java> action + // If both java/java-opt(s) and launcher/java-opts are defined, we pick java/java-opts + // We also display a warning to let users know that they should migrate their workflow + StringBuilder javaOpts = new StringBuilder(); + boolean oldJavaOpts = handleJavaOpts(actionXml, javaOpts); + if (oldJavaOpts) { + vargs.add(javaOpts.toString()); + } + + final String oozieLauncherJavaOpts = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY); + if (oozieLauncherJavaOpts != null) { + if (oldJavaOpts) { + LOG.warn("<java-opts> was defined inside the <launcher> tag -- ignored"); + } else { + vargs.add(oozieLauncherJavaOpts); + } + } + } + + private boolean handleJavaOpts(Element actionXml, StringBuilder javaOpts) { + Namespace ns = actionXml.getNamespace(); + boolean oldJavaOpts = false; + @SuppressWarnings("unchecked") + List<Element> javaopts = actionXml.getChildren("java-opt", ns); + for (Element opt: javaopts) { + javaOpts.append(" ").append(opt.getTextTrim()); + oldJavaOpts = true; + } + Element opt = actionXml.getChild("java-opts", ns); + if (opt != null) { + javaOpts.append(" ").append(opt.getTextTrim()); + oldJavaOpts = true; + } + + if (oldJavaOpts) { + LOG.warn("Note: <java-opts> inside the action is used in the workflow. Please move <java-opts> tag under" + + " the <launcher> element. See the documentation for details"); + } + return oldJavaOpts; + } + + private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) { + String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), + context.getWorkflow().getAppName(), actionName, + context.getWorkflow().getId()); + appContext.setApplicationName(jobName); + } + + private void setEnvironmentVariables(Configuration launcherJobConf, ContainerLaunchContext amContainer) throws IOException { + Map<String, String> env = new HashMap<>(); + + final String oozieLauncherEnvProperty = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY); + if (oozieLauncherEnvProperty != null) { + Map<String, String> environmentVars = extractEnvVarsFromOozieLauncherProps(oozieLauncherEnvProperty); + env.putAll(environmentVars); } - if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && !UserGroupInformation.isSecurityEnabled()) { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); - return null; + // This adds the Hadoop jars to the classpath in the Launcher JVM + ClasspathUtils.setupClasspath(env, launcherJobConf); + + if (needToAddMapReduceToClassPath()) { + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + } + + addActionSpecificEnvVars(env); + amContainer.setEnvironment(ImmutableMap.copyOf(env)); + } + + private void setQueue(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + String queue; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY) != null) { + queue = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY); + } else { + queue = Preconditions.checkNotNull(ConfigurationService.get(DEFAULT_LAUNCHER_QUEUE), "Default queue is undefined"); + } + appContext.setQueue(queue); + } + + private void setPriority(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + int priority; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY) != null) { + priority = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY, -1); + } else { + int defaultPriority = ConfigurationService.getInt(DEFAULT_LAUNCHER_PRIORITY); + priority = defaultPriority; + } + Priority pri = Records.newRecord(Priority.class); + pri.setPriority(priority); + appContext.setPriority(pri); + } + + private void setResources(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { + int memory; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY) != null) { + memory = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, -1); + Preconditions.checkArgument(memory > 0, "Launcher memory is 0 or negative"); + } else { + int defaultMemory = ConfigurationService.getInt(DEFAULT_LAUNCHER_MEMORY_MB, -1); + Preconditions.checkArgument(defaultMemory > 0, "Default launcher memory is 0 or negative"); + memory = defaultMemory; } - final XConfiguration wfJobConf = getWorkflowConf(context); - if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && - wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)) && - !UserGroupInformation.isSecurityEnabled()) { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); - return null; + int vcores; + if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY) != null) { + vcores = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, -1); + Preconditions.checkArgument(vcores > 0, "Launcher vcores is 0 or negative"); + } else { + int defaultVcores = ConfigurationService.getInt(DEFAULT_LAUNCHER_VCORES); + Preconditions.checkArgument(defaultVcores > 0, "Default launcher vcores is 0 or negative"); + vcores = defaultVcores; } + Resource resource = Resource.newInstance(memory, vcores); + appContext.setResource(resource); + } - final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context, action); - if (credPropertiesMap.isEmpty()) { - LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); - return credPropertiesMap; + private Map<String, String> extractEnvVarsFromOozieLauncherProps(String oozieLauncherEnvProperty) { + Map<String, String> envMap = new LinkedHashMap<>(); + for (String envVar : StringUtils.split(oozieLauncherEnvProperty, File.pathSeparatorChar)) { + String[] env = StringUtils.split(envVar, '='); + Preconditions.checkArgument(env.length == 2, "Invalid launcher setting for environment variables: \"%s\". " + + "<env> should contain a list of ENV_VAR_NAME=VALUE separated by the '%s' character. " + + "Example on Unix: A=foo1:B=foo2", oozieLauncherEnvProperty, File.pathSeparator); + envMap.put(env[0], env[1]); } + return envMap; + } - for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - if (entry.getValue() != null) { - final CredentialsProperties prop = entry.getValue(); - LOG.debug("Credential Properties set for action : " + action.getId()); - for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) { - final String key = propEntry.getKey(); - final String value = propEntry.getValue(); - actionConf.set(key, value); - LOG.debug("property : '" + key + "', value : '" + value + "'"); + protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, + WorkflowAction action, Configuration actionConf) throws Exception { + HashMap<String, CredentialsProperties> credPropertiesMap = null; + if (context != null && action != null) { + if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { + XConfiguration wfJobConf = getWorkflowConf(context); + if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || + !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { + credPropertiesMap = getActionCredentialsProperties(context, action); + if (!credPropertiesMap.isEmpty()) { + for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + CredentialsProperties prop = entry.getValue(); + LOG.debug("Credential Properties set for action : " + action.getId()); + for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { + String key = propEntry.getKey(); + String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); + } + } + } + } else { + LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + } + } else { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java index 8fdc50c..a0dfd31 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java @@ -77,8 +77,8 @@ public class SqoopActionExecutor extends JavaActionExecutor { if (e != null) { String strConf = XmlUtils.prettyPrint(e).toString(); XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); - checkForDisallowedProps(inlineConf, "inline configuration"); XConfiguration.copy(inlineConf, actionConf); + checkForDisallowedProps(inlineConf, "inline configuration"); } } catch (IOException ex) { throw convertException(ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java index 7e33485..5890b8c 100644 --- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java @@ -329,8 +329,8 @@ public class SshActionExecutor extends ActionExecutor { * * @param command Command to execute as String. * @return exit status of the execution. - * @throws IOException if process exits with status nonzero. - * @throws InterruptedException if process does not run properly. + * @throws IOException if processSettings exits with status nonzero. + * @throws InterruptedException if processSettings does not run properly. */ public int executeCommand(String command) throws IOException, InterruptedException { Runtime runtime = Runtime.getRuntime(); @@ -396,7 +396,7 @@ public class SshActionExecutor extends ActionExecutor { * @param action action object. * @param recoveryId action id + run number to enable recovery in rerun * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments - * @return process id of the running command. + * @return processSettings id of the running command. * @throws IOException thrown if failed to run the command. * @throws InterruptedException thrown if any interruption happens. */ @@ -468,7 +468,7 @@ public class SshActionExecutor extends ActionExecutor { } /** - * Get the return value of a process. + * Get the return value of a processSettings. * * @param command command to be executed. * @return zero if execution is successful and any non zero value for failure. @@ -679,7 +679,7 @@ public class SshActionExecutor extends ActionExecutor { * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the * store content may exceed this length. - * @return the exit value of the process. + * @return the exit value of the processSettings. * @throws IOException */ private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java index 47bfd2b..87d0c5e 100644 --- a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java +++ b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java @@ -49,7 +49,7 @@ public class MessageReceiver implements MessageListener { } /** - * Get the MessageHandler that will process the message + * Get the MessageHandler that will processSettings the message * * @return message handler */ http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java index ffc29af..97a75ff 100644 --- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java +++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java @@ -18,7 +18,6 @@ package org.apache.oozie.service; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.oozie.action.control.EndActionExecutor; import org.apache.oozie.action.control.ForkActionExecutor; http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/service/SchemaService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SchemaService.java b/core/src/main/java/org/apache/oozie/service/SchemaService.java index 137e2c0..9d2a521 100644 --- a/core/src/main/java/org/apache/oozie/service/SchemaService.java +++ b/core/src/main/java/org/apache/oozie/service/SchemaService.java @@ -29,15 +29,17 @@ import javax.xml.transform.stream.StreamSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.schema.ResourceResolver; import org.xml.sax.SAXException; + /** * Service that loads Oozie workflow definition schema and registered extension * schemas. */ + public class SchemaService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService."; @@ -95,9 +97,12 @@ public class SchemaService implements Service { } List<StreamSource> sources = new ArrayList<StreamSource>(); for (String schemaName : schemaNames) { - sources.add(new StreamSource(IOUtils.getResourceAsStream(schemaName, -1))); + StreamSource s = new StreamSource(IOUtils.getResourceAsStream(schemaName, -1)); + s.setSystemId(schemaName); + sources.add(s); } SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); + factory.setResourceResolver(new ResourceResolver()); return factory.newSchema(sources.toArray(new StreamSource[sources.size()])); } http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/util/WritableUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java b/core/src/main/java/org/apache/oozie/util/WritableUtils.java index aa027e3..5a4cb24 100644 --- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java +++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java @@ -151,6 +151,13 @@ public class WritableUtils { } } + /** + * Write string list. + * + * @param dataOutput the data output + * @param list the list + * @throws IOException Signals that an I/O exception has occurred. + */ public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { dataOutput.writeInt(list.size()); for (String str : list) { http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index a74e5c7..aa0e06b 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -68,6 +68,7 @@ import org.xml.sax.SAXException; */ public class LiteWorkflowAppParser { + private static final String LAUNCHER_E = "launcher"; private static final String DECISION_E = "decision"; private static final String ACTION_E = "action"; private static final String END_E = "end"; @@ -214,6 +215,7 @@ public class LiteWorkflowAppParser { private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf) throws WorkflowException { Namespace ns = root.getNamespace(); + LiteWorkflowApp def = null; GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ? null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); @@ -264,10 +266,10 @@ public class LiteWorkflowAppParser { } eActionConf = elem; if (SUBWORKFLOW_E.equals(elem.getName())) { - handleDefaultsAndGlobal(gData, null, elem); + handleDefaultsAndGlobal(gData, null, elem, ns); } else { - handleDefaultsAndGlobal(gData, configDefault, elem); + handleDefaultsAndGlobal(gData, configDefault, elem, ns); } } } @@ -300,9 +302,11 @@ public class LiteWorkflowAppParser { } else if (eNode.getName().equals(GLOBAL)) { if(jobConf.get(OOZIE_GLOBAL) != null) { gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); - handleDefaultsAndGlobal(gData, null, eNode); + handleDefaultsAndGlobal(gData, null, eNode, ns); } + gData = parseGlobalSection(ns, eNode); + } else if (eNode.getName().equals(PARAMETERS)) { // No operation is required } else { @@ -438,7 +442,7 @@ public class LiteWorkflowAppParser { } } - Configuration globalConf = null; + Configuration globalConf = new XConfiguration(); Element globalConfigurationElement = global.getChild(CONFIGURATION, ns); if (globalConfigurationElement != null) { try { @@ -447,12 +451,18 @@ public class LiteWorkflowAppParser { throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); } } + + Element globalLauncherElement = global.getChild(LAUNCHER_E, ns); + if (globalLauncherElement != null) { + LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(globalConf, globalLauncherElement, ns); + launcherConfigHandler.processSettings(); + } gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf); } return gData; } - private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement) + private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement, Namespace ns) throws WorkflowException { ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName()); @@ -497,7 +507,7 @@ public class LiteWorkflowAppParser { // If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's // <configuration> and <job-xml> fields. We also merge this with those from the <global> section, if given. If none are // defined, empty values are placed. Exceptions are thrown if there's an error parsing, but not if they're not given. - if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { + if (GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { @SuppressWarnings("unchecked") List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs); if (gData != null && gData.jobXmls != null) { @@ -524,12 +534,20 @@ public class LiteWorkflowAppParser { if (gData != null && gData.conf != null) { XConfiguration.copy(gData.conf, actionConf); } + + Element launcherConfiguration = actionElement.getChild(LAUNCHER_E, actionNs); + if (launcherConfiguration != null) { + LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(actionConf, launcherConfiguration, actionNs); + launcherConfigHandler.processSettings(); + } + Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs); if (actionConfiguration != null) { //copy and override XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())), actionConf); } + int position = actionElement.indexOf(actionConfiguration); actionElement.removeContent(actionConfiguration); //replace with enhanced one Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false)); http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java index 23df086..2e09889 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java @@ -31,8 +31,6 @@ import org.apache.hadoop.conf.Configuration; import javax.xml.validation.Schema; import java.io.StringReader; -import java.util.Date; -import java.util.Map; //TODO javadoc public abstract class LiteWorkflowLib implements WorkflowLib { http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 0d174b5..2389b99 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1607,17 +1607,18 @@ will be the requeue interval for the actions which are waiting for a long time w <property> <name>oozie.service.SchemaService.wf.schemas</name> <value> + oozie-common-1.0.xsd, oozie-workflow-0.1.xsd,oozie-workflow-0.2.xsd,oozie-workflow-0.2.5.xsd,oozie-workflow-0.3.xsd,oozie-workflow-0.4.xsd, - oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd, - shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd, + oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd,oozie-workflow-1.0.xsd, + shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,shell-action-1.0.xsd, email-action-0.1.xsd,email-action-0.2.xsd, - hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd, - sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd, + hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd,hive-action-1.0.xsd, + sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,sqoop-action-1.0.xsd, ssh-action-0.1.xsd,ssh-action-0.2.xsd, distcp-action-0.1.xsd,distcp-action-0.2.xsd, oozie-sla-0.1.xsd,oozie-sla-0.2.xsd, - hive2-action-0.1.xsd, hive2-action-0.2.xsd, - spark-action-0.1.xsd,spark-action-0.2.xsd + hive2-action-0.1.xsd,hive2-action-0.2.xsd,hive2-action-1.0.xsd, + spark-action-0.1.xsd,spark-action-0.2.xsd,spark-action-1.0.xsd </value> <description> List of schemas for workflows (separated by commas). @@ -3074,5 +3075,35 @@ will be the requeue interval for the actions which are waiting for a long time w Most users should not have to change this. </description> </property> + <property> + <name>oozie.launcher.default.vcores</name> + <value>1</value> + <description> + The default number of vcores that are allocated for the Launcher AMs + </description> + </property> + + <property> + <name>oozie.launcher.default.memory.mb</name> + <value>2048</value> + <description> + The default amount of memory in MBs that is allocated for the Launcher AMs + </description> + </property> + <property> + <name>oozie.launcher.default.priority</name> + <value>0</value> + <description> + The default YARN priority of the Launcher AM + </description> + </property> + + <property> + <name>oozie.launcher.default.queue</name> + <value>default</value> + <description> + The default YARN queue where the Launcher AM is placed + </description> + </property> </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java index d74160a..dd90536 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java @@ -35,9 +35,7 @@ import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; -import org.apache.oozie.test.XFsTestCase; import org.apache.oozie.test.XHCatTestCase; -import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -244,7 +242,7 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase { protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName) throws Exception { Path appUri = new Path(getAppPath(), "workflow.xml"); - String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; + String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; content += "<start to='end' />"; content += "<end name='end' /></workflow-app>"; writeToFile(content, getAppPath(), "workflow.xml"); @@ -300,6 +298,41 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase { return workflow; } + protected WorkflowJobBean createBaseWorkflowWithLauncherConfig(XConfiguration protoConf, String actionName) throws Exception { + Path appUri = new Path(getAppPath(), "workflow.xml"); + + String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; + content += "<global>" + + "<launcher>" + + " <vcores>2</vcores>" + + " <memory.mb>1024</memory.mb>" + + " <queue>default</queue>" + + " <priority>1</priority>" + + " <java-opts>-verbose:class</java-opts>" + + "</launcher>" + + "</global>"; + + content += "<start to='end' />"; + content += "<end name='end' /></workflow-app>"; + writeToFile(content, getAppPath(), "workflow.xml"); + + WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", + new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, + "end")) + .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); + XConfiguration wfConf = new XConfiguration(); + wfConf.set(OozieClient.USER_NAME, getTestUser()); + wfConf.set(OozieClient.APP_PATH, appUri.toString()); + + WorkflowJobBean workflow = createWorkflow(app, wfConf, protoConf); + + WorkflowActionBean action = new WorkflowActionBean(); + action.setName(actionName); + action.setId(Services.get().get(UUIDService.class).generateChildId(workflow.getId(), actionName)); + workflow.getActions().add(action); + return workflow; + } + private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, XConfiguration protoConf) throws Exception { WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java index 6cee7a8..43c71b0 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java @@ -31,6 +31,15 @@ public class LauncherMainTester { System.out.println("Hello World!"); } + String testJavaOpts = System.getProperty("testJavaOpts"); + if (testJavaOpts != null && Boolean.parseBoolean(testJavaOpts)) { + throw new RuntimeException("Failing on purpose"); + } + String env = System.getenv("LAUNCHER_ENVIRON"); + if (env != null && env.equals("foo1")) { + throw new RuntimeException("Failing on purpose"); + } + if (args.length == 1) { if (args[0].equals("throwable")) { throw new Throwable("throwing throwable"); http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index d1d78fd..02e60c0 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -29,14 +29,7 @@ import java.io.Writer; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; -import java.util.Arrays; -import java.util.Collections; -import java.util.Date; -import java.util.EnumSet; -import java.util.HashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; @@ -51,6 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -86,7 +80,9 @@ import org.junit.Test; public class TestJavaActionExecutor extends ActionExecutorTestCase { - public static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address"; + private static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address"; + private static final String MAPRED_CHILD_JAVA_OPTS = "mapred.child.java.opts"; + private static final String MAPREDUCE_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @Override protected void beforeSetUp() throws Exception { @@ -273,8 +269,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { ae.setupLauncherConf(conf, actionXml, getFsTestCaseDir(), context); assertEquals("MAIN-CLASS", actionConf.get("oozie.action.java.main", "null")); assertEquals("org.apache.oozie.action.hadoop.JavaMain", ae.getLauncherMain(conf, actionXml)); - assertTrue(conf.get("mapred.child.java.opts").contains("JAVA-OPTS")); - assertTrue(conf.get("mapreduce.map.java.opts").contains("JAVA-OPTS")); + assertTrue(conf.get(MAPRED_CHILD_JAVA_OPTS).contains("JAVA-OPTS")); + assertTrue(conf.get(MAPREDUCE_MAP_JAVA_OPTS).contains("JAVA-OPTS")); assertEquals(Arrays.asList("A1", "A2"), Arrays.asList(LauncherAMUtils.getMainArguments(conf))); actionXml = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + @@ -1114,7 +1110,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { WorkflowApp app = new LiteWorkflowApp("testApp", wfxml, new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")). addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); - Configuration conf = Services.get().get(HadoopAccessorService.class). + Configuration conf = getHadoopAccessorService(). createConfiguration(new URI(getNameNodeUri()).getAuthority()); conf.set(OozieClient.APP_PATH, getNameNodeUri() + "/testPath"); conf.set(OozieClient.LOG_TOKEN, "testToken"); @@ -1226,8 +1222,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Configuration conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1251,8 +1247,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1278,8 +1274,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); - assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); + assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); + assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1305,8 +1301,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); - assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); + assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1333,8 +1329,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); - assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); + assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); + assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); } public void testActionLibsPath() throws Exception { @@ -2262,6 +2258,176 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertEquals("/user/map%20dev/test-case/shell/script/shell%201.sh", actPath); } + public void testSubmitOKWithVcoresAndMemory() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.vcores</name><value>1</value></property>" + + " <property><name>oozie.launcher.memory.mb</name><value>1024</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); + ActionExecutor ae = new JavaActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertNull(context.getAction().getData()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + } + + public void testSubmitOKWithLauncherJavaOpts() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.javaopts</name><value>-DtestJavaOpts=true</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); + ActionExecutor ae = new JavaActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); + assertNull(context.getAction().getData()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); + } + + public void testSubmitFailsWithNegativeVcores() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.vcores</name><value>-1</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + + boolean exception = false; + try { + submitAction(context); + } catch (ActionExecutorException e) { + exception = true; + assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass()); + } + + assertTrue("Exception was not caught", exception); + } + + public void testSubmitFailsWithNegativeMemory() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.memory.mb</name><value>-1</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + + boolean exception = false; + try { + submitAction(context); + } catch (ActionExecutorException e) { + exception = true; + assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass()); + } + + assertTrue("Exception was not caught", exception); + } + + public void testSubmitOKWithLauncherEnvVars() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.env</name><value>A=foo1" + File.pathSeparator + "B=foo2</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); + ActionExecutor ae = new JavaActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertNull(context.getAction().getData()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + } + + public void testEnvVarsPropagatedFromLauncherConfig() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.env</name><value>LAUNCHER_ENVIRON=foo1" + File.pathSeparator + "B=foo2</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); + ActionExecutor ae = new JavaActionExecutor(); + ae.check(context, context.getAction()); + assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); + assertNull(context.getAction().getData()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); + } + + public void testSubmitFailsWithInvalidLauncherEnvVars() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.env</name><value>Afoo1" + File.pathSeparator + "B=foo2</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + try { + submitAction(context); + fail(); + } + catch (ActionExecutorException e) { + assertTrue(e.getMessage().contains("Invalid launcher setting for environment variables")); + } + } + + public void testSubmitWithLauncherQueue() throws Exception { + String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + " <property><name>oozie.launcher.queue</name><value>test</value></property>" + + "</configuration>" + + "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + + "</java>"; + Context context = createContext(actionXml, null); + submitAction(context); + final ApplicationId appId = ConverterUtils.toApplicationId(context.getAction().getExternalId()); + Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); + String queue = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getQueue(); + assertEquals("test", queue); + } + + private HadoopAccessorService getHadoopAccessorService() { + return Services.get().get(HadoopAccessorService.class); + } + + public void testChildKill() throws Exception { final JobConf clusterConf = createJobConf(); FileSystem fileSystem = FileSystem.get(clusterConf); @@ -2319,7 +2485,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { // kill the action - based on the job tag, the SleepJob is expected to be killed too ae.kill(context, context.getAction()); - HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class); + HadoopAccessorService hadoopAccessorService = getHadoopAccessorService(); Configuration config = hadoopAccessorService.createConfiguration(getJobTrackerUri()); YarnClient yarnClient = hadoopAccessorService.createYarnClient(getTestUser(), config);