Revert "OOZIE-2687 Create XML schema for launcher configurations (asasvari)"
This reverts commit 8b247f28f0496af3f217ec4becb3f5f931f08511. Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8aca098c Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8aca098c Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8aca098c Branch: refs/heads/master Commit: 8aca098c2a7dc8227b22c6ffd64e38ff6e1489a1 Parents: 8b247f2 Author: Attila Sasvari <[email protected]> Authored: Mon Sep 18 13:31:58 2017 +0200 Committer: Attila Sasvari <[email protected]> Committed: Mon Sep 18 13:31:58 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 325 ++++-------- .../action/hadoop/SqoopActionExecutor.java | 2 +- .../oozie/action/ssh/SshActionExecutor.java | 10 +- .../org/apache/oozie/jms/MessageReceiver.java | 2 +- .../oozie/service/LiteWorkflowStoreService.java | 1 + .../org/apache/oozie/service/SchemaService.java | 9 +- .../org/apache/oozie/util/WritableUtils.java | 7 - .../workflow/lite/LiteWorkflowAppParser.java | 30 +- .../oozie/workflow/lite/LiteWorkflowLib.java | 2 + core/src/main/resources/oozie-default.xml | 43 +- .../action/hadoop/ActionExecutorTestCase.java | 39 +- .../oozie/action/hadoop/LauncherMainTester.java | 9 - .../action/hadoop/TestJavaActionExecutor.java | 212 +------- .../apache/oozie/service/TestSchemaService.java | 497 +++---------------- .../oozie/util/TestMetricsInstrumentation.java | 4 +- .../lite/TestLiteWorkflowAppParser.java | 51 -- pom.xml | 2 +- release-log.txt | 1 - .../apache/oozie/action/hadoop/LauncherAM.java | 8 - .../apache/oozie/action/hadoop/UDFTester.java | 2 +- 20 files changed, 214 insertions(+), 1042 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 9d1afb5..49fd4b8 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -22,28 +22,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.io.Closeables; import com.google.common.primitives.Ints; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; - import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -63,7 +41,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -121,6 +98,8 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -128,18 +107,12 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Objects; -import java.util.Properties; import java.util.Properties; import java.util.Set; -import java.util.Set; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.Closeables; public class JavaActionExecutor extends ActionExecutor { + public static final String RUNNING = "RUNNING"; public static final String SUCCEEDED = "SUCCEEDED"; public static final String KILLED = "KILLED"; @@ -149,11 +122,6 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_NAME_NODE = "fs.default.name"; public static final String OOZIE_COMMON_LIBDIR = "oozie"; - public static final String DEFAULT_LAUNCHER_VCORES = "oozie.launcher.default.vcores"; - public static final String DEFAULT_LAUNCHER_MEMORY_MB = "oozie.launcher.default.memory.mb"; - public static final String DEFAULT_LAUNCHER_PRIORITY = "oozie.launcher.default.priority"; - public static final String DEFAULT_LAUNCHER_QUEUE = "oozie.launcher.default.queue"; - public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; @@ -292,12 +260,6 @@ public class JavaActionExecutor extends ActionExecutor { conf.set(HADOOP_NAME_NODE, nameNode); conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); - // FIXME - think about this! - Element e = actionXml.getChild("config-class", ns); - if (e != null) { - conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); - } - return conf; } @@ -337,11 +299,6 @@ public class JavaActionExecutor extends ActionExecutor { throw convertException(ex); } XConfiguration.copy(launcherConf, conf); - // Inject config-class for launcher to use for action - Element e = actionXml.getChild("config-class", ns); - if (e != null) { - conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); - } checkForDisallowedProps(launcherConf, "launcher configuration"); return conf; } @@ -919,20 +876,13 @@ public class JavaActionExecutor extends ActionExecutor { if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) { opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS)); } - List<Element> javaopts = actionXml.getChildren("java-opt", ns); - - // Either one or more <java-opt> element or one <java-opts> can be present since oozie-workflow-0.4 - if (!javaopts.isEmpty()) { - for (Element opt : javaopts) { - opts.append(" ").append(opt.getTextTrim()); - } + for (Element opt: javaopts) { + opts.append(" ").append(opt.getTextTrim()); } - else { - Element opt = actionXml.getChild("java-opts", ns); - if (opt != null) { - opts.append(" ").append(opt.getTextTrim()); - } + Element opt = actionXml.getChild("java-opts", ns); + if (opt != null) { + opts.append(" ").append(opt.getTextTrim()); } launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); @@ -992,7 +942,6 @@ public class JavaActionExecutor extends ActionExecutor { } Element actionXml = XmlUtils.parseXml(action.getConf()); - LOG.debug("ActionXML: {0}", action.getConf()); // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); @@ -1001,14 +950,6 @@ public class JavaActionExecutor extends ActionExecutor { LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); - String jobName = actionConf.get(HADOOP_JOB_NAME); - if (jobName == null || jobName.isEmpty()) { - jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", - getType(), context.getWorkflow().getAppName(), - action.getName(), context.getWorkflow().getId()); - actionConf.set(HADOOP_JOB_NAME, jobName); - } - injectActionCallback(context, actionConf); if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) { @@ -1111,7 +1052,7 @@ public class JavaActionExecutor extends ActionExecutor { ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(), - credentials, actionXml); + credentials); yarnClient.submitApplication(appContext); launcherId = appId.toString(); @@ -1197,19 +1138,22 @@ public class JavaActionExecutor extends ActionExecutor { } private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, - String user, Context context, Configuration actionConf, String actionName, - Credentials credentials, Element actionXml) + String user, Context context, Configuration actionConf, String actionName, + Credentials credentials) throws IOException, HadoopAccessorException, URISyntaxException { ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - setResources(launcherJobConf, appContext); - setPriority(launcherJobConf, appContext); - setQueue(launcherJobConf, appContext); + String appName = getAppName(context); + appContext.setApplicationId(appId); - setApplicationName(context, actionName, appContext); + appContext.setApplicationName(appName); appContext.setApplicationType("Oozie Launcher"); - + Priority pri = Records.newRecord(Priority.class); + int priority = 0; // TODO: OYA: Add a constant or a config + pri.setPriority(priority); + appContext.setPriority(pri); + appContext.setQueue("default"); // TODO: will be possible to set in <launcher> ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // Set the resources to localize @@ -1226,36 +1170,19 @@ public class JavaActionExecutor extends ActionExecutor { localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR); amContainer.setLocalResources(localResources); - setEnvironmentVariables(launcherJobConf, amContainer); - - List<String> vargs = createCommand(context); - setJavaOpts(launcherJobConf, actionXml, vargs); - vargs.add(LauncherAM.class.getCanonicalName()); - vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); - vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); - StringBuilder mergedCommand = new StringBuilder(); - for (CharSequence str : vargs) { - mergedCommand.append(str).append(" "); - } - - List<String> vargsFinal = ImmutableList.of(mergedCommand.toString()); - LOG.debug("Command to launch container for ApplicationMaster is: {0}", mergedCommand); - amContainer.setCommands(vargsFinal); - appContext.setAMContainerSpec(amContainer); + // Set the environment variables + Map<String, String> env = new HashMap<String, String>(); + // This adds the Hadoop jars to the classpath in the Launcher JVM + ClasspathUtils.setupClasspath(env, launcherJobConf); - // Set tokens - if (credentials != null) { - DataOutputBuffer dob = new DataOutputBuffer(); - credentials.writeTokenStorageToStream(dob); - amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + if (needToAddMapReduceToClassPath()) { + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); } - appContext.setCancelTokensWhenComplete(true); - - return appContext; - } + addActionSpecificEnvVars(env); + amContainer.setEnvironment(Collections.unmodifiableMap(env)); - private List<String> createCommand(Context context) { + // Set the command List<String> vargs = new ArrayList<String>(6); vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString()) + "/bin/java"); @@ -1263,168 +1190,82 @@ public class JavaActionExecutor extends ActionExecutor { vargs.add("-Dlog4j.configuration=container-log4j.properties"); vargs.add("-Dlog4j.debug=true"); vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0); vargs.add("-Dhadoop.root.logger=INFO,CLA"); vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); - return vargs; - } - - private void setJavaOpts(Configuration launcherJobConf, Element actionXml, List<String> vargs) { - // Note: for backward compatibility reasons, we have to support the <java-opts> tag inside the <java> action - // If both java/java-opt(s) and launcher/java-opts are defined, we pick java/java-opts - // We also display a warning to let users know that they should migrate their workflow - StringBuilder javaOpts = new StringBuilder(); - boolean oldJavaOpts = handleJavaOpts(actionXml, javaOpts); - if (oldJavaOpts) { - vargs.add(javaOpts.toString()); - } - - final String oozieLauncherJavaOpts = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY); - if (oozieLauncherJavaOpts != null) { - if (oldJavaOpts) { - LOG.warn("<java-opts> was defined inside the <launcher> tag -- ignored"); - } else { - vargs.add(oozieLauncherJavaOpts); - } - } - } - - private boolean handleJavaOpts(Element actionXml, StringBuilder javaOpts) { - Namespace ns = actionXml.getNamespace(); - boolean oldJavaOpts = false; - @SuppressWarnings("unchecked") - List<Element> javaopts = actionXml.getChildren("java-opt", ns); - for (Element opt: javaopts) { - javaOpts.append(" ").append(opt.getTextTrim()); - oldJavaOpts = true; - } - Element opt = actionXml.getChild("java-opts", ns); - if (opt != null) { - javaOpts.append(" ").append(opt.getTextTrim()); - oldJavaOpts = true; - } - - if (oldJavaOpts) { - LOG.warn("Note: <java-opts> inside the action is used in the workflow. Please move <java-opts> tag under" - + " the <launcher> element. See the documentation for details"); - } - return oldJavaOpts; - } - private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) { - String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), - context.getWorkflow().getAppName(), actionName, - context.getWorkflow().getId()); - appContext.setApplicationName(jobName); - } + Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()), + YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR); + vargs.add("-Djava.io.tmpdir=" + amTmpDir); - private void setEnvironmentVariables(Configuration launcherJobConf, ContainerLaunchContext amContainer) throws IOException { - Map<String, String> env = new HashMap<>(); - - final String oozieLauncherEnvProperty = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY); - if (oozieLauncherEnvProperty != null) { - Map<String, String> environmentVars = extractEnvVarsFromOozieLauncherProps(oozieLauncherEnvProperty); - env.putAll(environmentVars); + vargs.add(LauncherAM.class.getCanonicalName()); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDERR); + StringBuilder mergedCommand = new StringBuilder(); + for (CharSequence str : vargs) { + mergedCommand.append(str).append(" "); } - // This adds the Hadoop jars to the classpath in the Launcher JVM - ClasspathUtils.setupClasspath(env, launcherJobConf); + List<String> vargsFinal = ImmutableList.of(mergedCommand.toString()); + LOG.debug("Command to launch container for ApplicationMaster is: {0}", mergedCommand); + amContainer.setCommands(vargsFinal); + appContext.setAMContainerSpec(amContainer); - if (needToAddMapReduceToClassPath()) { - ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + // Set tokens + if (credentials != null) { + DataOutputBuffer dob = new DataOutputBuffer(); + credentials.writeTokenStorageToStream(dob); + amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); } - addActionSpecificEnvVars(env); - amContainer.setEnvironment(ImmutableMap.copyOf(env)); - } + // Set Resources + // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores) + Resource resource = Resource.newInstance(2048, 1); + appContext.setResource(resource); + appContext.setCancelTokensWhenComplete(true); - private void setQueue(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { - String queue; - if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY) != null) { - queue = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY); - } else { - queue = Preconditions.checkNotNull(ConfigurationService.get(DEFAULT_LAUNCHER_QUEUE), "Default queue is undefined"); - } - appContext.setQueue(queue); + return appContext; } - private void setPriority(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { - int priority; - if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY) != null) { - priority = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY, -1); - } else { - int defaultPriority = ConfigurationService.getInt(DEFAULT_LAUNCHER_PRIORITY); - priority = defaultPriority; + Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context, + final WorkflowAction action, + final Configuration actionConf) throws Exception { + if (context == null || action == null) { + LOG.warn("context or action is null"); + return null; } - Priority pri = Records.newRecord(Priority.class); - pri.setPriority(priority); - appContext.setPriority(pri); - } - private void setResources(Configuration launcherJobConf, ApplicationSubmissionContext appContext) { - int memory; - if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY) != null) { - memory = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, -1); - Preconditions.checkArgument(memory > 0, "Launcher memory is 0 or negative"); - } else { - int defaultMemory = ConfigurationService.getInt(DEFAULT_LAUNCHER_MEMORY_MB, -1); - Preconditions.checkArgument(defaultMemory > 0, "Default launcher memory is 0 or negative"); - memory = defaultMemory; + if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && !UserGroupInformation.isSecurityEnabled()) { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + return null; } - int vcores; - if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY) != null) { - vcores = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, -1); - Preconditions.checkArgument(vcores > 0, "Launcher vcores is 0 or negative"); - } else { - int defaultVcores = ConfigurationService.getInt(DEFAULT_LAUNCHER_VCORES); - Preconditions.checkArgument(defaultVcores > 0, "Default launcher vcores is 0 or negative"); - vcores = defaultVcores; + final XConfiguration wfJobConf = getWorkflowConf(context); + if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && + wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)) && + !UserGroupInformation.isSecurityEnabled()) { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + return null; } - Resource resource = Resource.newInstance(memory, vcores); - appContext.setResource(resource); - } - private Map<String, String> extractEnvVarsFromOozieLauncherProps(String oozieLauncherEnvProperty) { - Map<String, String> envMap = new LinkedHashMap<>(); - for (String envVar : StringUtils.split(oozieLauncherEnvProperty, File.pathSeparatorChar)) { - String[] env = StringUtils.split(envVar, '='); - Preconditions.checkArgument(env.length == 2, "Invalid launcher setting for environment variables: \"%s\". " + - "<env> should contain a list of ENV_VAR_NAME=VALUE separated by the '%s' character. " + - "Example on Unix: A=foo1:B=foo2", oozieLauncherEnvProperty, File.pathSeparator); - envMap.put(env[0], env[1]); + final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context, action); + if (credPropertiesMap.isEmpty()) { + LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + return credPropertiesMap; } - return envMap; - } - protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, - WorkflowAction action, Configuration actionConf) throws Exception { - HashMap<String, CredentialsProperties> credPropertiesMap = null; - if (context != null && action != null) { - if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { - XConfiguration wfJobConf = getWorkflowConf(context); - if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || - !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { - credPropertiesMap = getActionCredentialsProperties(context, action); - if (!credPropertiesMap.isEmpty()) { - for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - if (entry.getValue() != null) { - CredentialsProperties prop = entry.getValue(); - LOG.debug("Credential Properties set for action : " + action.getId()); - for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { - String key = propEntry.getKey(); - String value = propEntry.getValue(); - actionConf.set(key, value); - LOG.debug("property : '" + key + "', value : '" + value + "'"); - } - } - } - } else { - LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); - } - } else { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + final CredentialsProperties prop = entry.getValue(); + LOG.debug("Credential Properties set for action : " + action.getId()); + for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) { + final String key = propEntry.getKey(); + final String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java index a0dfd31..8fdc50c 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java @@ -77,8 +77,8 @@ public class SqoopActionExecutor extends JavaActionExecutor { if (e != null) { String strConf = XmlUtils.prettyPrint(e).toString(); XConfiguration inlineConf = new XConfiguration(new StringReader(strConf)); - XConfiguration.copy(inlineConf, actionConf); checkForDisallowedProps(inlineConf, "inline configuration"); + XConfiguration.copy(inlineConf, actionConf); } } catch (IOException ex) { throw convertException(ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java index 5890b8c..7e33485 100644 --- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java @@ -329,8 +329,8 @@ public class SshActionExecutor extends ActionExecutor { * * @param command Command to execute as String. * @return exit status of the execution. - * @throws IOException if processSettings exits with status nonzero. - * @throws InterruptedException if processSettings does not run properly. + * @throws IOException if process exits with status nonzero. + * @throws InterruptedException if process does not run properly. */ public int executeCommand(String command) throws IOException, InterruptedException { Runtime runtime = Runtime.getRuntime(); @@ -396,7 +396,7 @@ public class SshActionExecutor extends ActionExecutor { * @param action action object. * @param recoveryId action id + run number to enable recovery in rerun * @param preserveArgs tell the ssh scripts to preserve or flatten the arguments - * @return processSettings id of the running command. + * @return process id of the running command. * @throws IOException thrown if failed to run the command. * @throws InterruptedException thrown if any interruption happens. */ @@ -468,7 +468,7 @@ public class SshActionExecutor extends ActionExecutor { } /** - * Get the return value of a processSettings. + * Get the return value of a process. * * @param command command to be executed. * @return zero if execution is successful and any non zero value for failure. @@ -679,7 +679,7 @@ public class SshActionExecutor extends ActionExecutor { * @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required. * @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the * store content may exceed this length. - * @return the exit value of the processSettings. + * @return the exit value of the process. * @throws IOException */ private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength) http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java index 87d0c5e..47bfd2b 100644 --- a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java +++ b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java @@ -49,7 +49,7 @@ public class MessageReceiver implements MessageListener { } /** - * Get the MessageHandler that will processSettings the message + * Get the MessageHandler that will process the message * * @return message handler */ http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java index 97a75ff..ffc29af 100644 --- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java +++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java @@ -18,6 +18,7 @@ package org.apache.oozie.service; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.StringUtils; import org.apache.oozie.action.control.EndActionExecutor; import org.apache.oozie.action.control.ForkActionExecutor; http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/service/SchemaService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/SchemaService.java b/core/src/main/java/org/apache/oozie/service/SchemaService.java index 9d2a521..137e2c0 100644 --- a/core/src/main/java/org/apache/oozie/service/SchemaService.java +++ b/core/src/main/java/org/apache/oozie/service/SchemaService.java @@ -29,17 +29,15 @@ import javax.xml.transform.stream.StreamSource; import javax.xml.validation.Schema; import javax.xml.validation.SchemaFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.schema.ResourceResolver; import org.xml.sax.SAXException; - /** * Service that loads Oozie workflow definition schema and registered extension * schemas. */ - public class SchemaService implements Service { public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService."; @@ -97,12 +95,9 @@ public class SchemaService implements Service { } List<StreamSource> sources = new ArrayList<StreamSource>(); for (String schemaName : schemaNames) { - StreamSource s = new StreamSource(IOUtils.getResourceAsStream(schemaName, -1)); - s.setSystemId(schemaName); - sources.add(s); + sources.add(new StreamSource(IOUtils.getResourceAsStream(schemaName, -1))); } SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); - factory.setResourceResolver(new ResourceResolver()); return factory.newSchema(sources.toArray(new StreamSource[sources.size()])); } http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/util/WritableUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java b/core/src/main/java/org/apache/oozie/util/WritableUtils.java index 5a4cb24..aa027e3 100644 --- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java +++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java @@ -151,13 +151,6 @@ public class WritableUtils { } } - /** - * Write string list. - * - * @param dataOutput the data output - * @param list the list - * @throws IOException Signals that an I/O exception has occurred. - */ public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException { dataOutput.writeInt(list.size()); for (String str : list) { http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java index aa0e06b..a74e5c7 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java @@ -68,7 +68,6 @@ import org.xml.sax.SAXException; */ public class LiteWorkflowAppParser { - private static final String LAUNCHER_E = "launcher"; private static final String DECISION_E = "decision"; private static final String ACTION_E = "action"; private static final String END_E = "end"; @@ -215,7 +214,6 @@ public class LiteWorkflowAppParser { private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf) throws WorkflowException { Namespace ns = root.getNamespace(); - LiteWorkflowApp def = null; GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ? null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); @@ -266,10 +264,10 @@ public class LiteWorkflowAppParser { } eActionConf = elem; if (SUBWORKFLOW_E.equals(elem.getName())) { - handleDefaultsAndGlobal(gData, null, elem, ns); + handleDefaultsAndGlobal(gData, null, elem); } else { - handleDefaultsAndGlobal(gData, configDefault, elem, ns); + handleDefaultsAndGlobal(gData, configDefault, elem); } } } @@ -302,11 +300,9 @@ public class LiteWorkflowAppParser { } else if (eNode.getName().equals(GLOBAL)) { if(jobConf.get(OOZIE_GLOBAL) != null) { gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL)); - handleDefaultsAndGlobal(gData, null, eNode, ns); + handleDefaultsAndGlobal(gData, null, eNode); } - gData = parseGlobalSection(ns, eNode); - } else if (eNode.getName().equals(PARAMETERS)) { // No operation is required } else { @@ -442,7 +438,7 @@ public class LiteWorkflowAppParser { } } - Configuration globalConf = new XConfiguration(); + Configuration globalConf = null; Element globalConfigurationElement = global.getChild(CONFIGURATION, ns); if (globalConfigurationElement != null) { try { @@ -451,18 +447,12 @@ public class LiteWorkflowAppParser { throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf"); } } - - Element globalLauncherElement = global.getChild(LAUNCHER_E, ns); - if (globalLauncherElement != null) { - LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(globalConf, globalLauncherElement, ns); - launcherConfigHandler.processSettings(); - } gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf); } return gData; } - private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement, Namespace ns) + private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement) throws WorkflowException { ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName()); @@ -507,7 +497,7 @@ public class LiteWorkflowAppParser { // If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's // <configuration> and <job-xml> fields. We also merge this with those from the <global> section, if given. If none are // defined, empty values are placed. Exceptions are thrown if there's an error parsing, but not if they're not given. - if (GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { + if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) { @SuppressWarnings("unchecked") List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs); if (gData != null && gData.jobXmls != null) { @@ -534,20 +524,12 @@ public class LiteWorkflowAppParser { if (gData != null && gData.conf != null) { XConfiguration.copy(gData.conf, actionConf); } - - Element launcherConfiguration = actionElement.getChild(LAUNCHER_E, actionNs); - if (launcherConfiguration != null) { - LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(actionConf, launcherConfiguration, actionNs); - launcherConfigHandler.processSettings(); - } - Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs); if (actionConfiguration != null) { //copy and override XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())), actionConf); } - int position = actionElement.indexOf(actionConfiguration); actionElement.removeContent(actionConfiguration); //replace with enhanced one Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false)); http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java index 2e09889..23df086 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java @@ -31,6 +31,8 @@ import org.apache.hadoop.conf.Configuration; import javax.xml.validation.Schema; import java.io.StringReader; +import java.util.Date; +import java.util.Map; //TODO javadoc public abstract class LiteWorkflowLib implements WorkflowLib { http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 2389b99..0d174b5 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1607,18 +1607,17 @@ will be the requeue interval for the actions which are waiting for a long time w <property> <name>oozie.service.SchemaService.wf.schemas</name> <value> - oozie-common-1.0.xsd, oozie-workflow-0.1.xsd,oozie-workflow-0.2.xsd,oozie-workflow-0.2.5.xsd,oozie-workflow-0.3.xsd,oozie-workflow-0.4.xsd, - oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd,oozie-workflow-1.0.xsd, - shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,shell-action-1.0.xsd, + oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd, + shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd, email-action-0.1.xsd,email-action-0.2.xsd, - hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd,hive-action-1.0.xsd, - sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,sqoop-action-1.0.xsd, + hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd, + sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd, ssh-action-0.1.xsd,ssh-action-0.2.xsd, distcp-action-0.1.xsd,distcp-action-0.2.xsd, oozie-sla-0.1.xsd,oozie-sla-0.2.xsd, - hive2-action-0.1.xsd,hive2-action-0.2.xsd,hive2-action-1.0.xsd, - spark-action-0.1.xsd,spark-action-0.2.xsd,spark-action-1.0.xsd + hive2-action-0.1.xsd, hive2-action-0.2.xsd, + spark-action-0.1.xsd,spark-action-0.2.xsd </value> <description> List of schemas for workflows (separated by commas). @@ -3075,35 +3074,5 @@ will be the requeue interval for the actions which are waiting for a long time w Most users should not have to change this. </description> </property> - <property> - <name>oozie.launcher.default.vcores</name> - <value>1</value> - <description> - The default number of vcores that are allocated for the Launcher AMs - </description> - </property> - - <property> - <name>oozie.launcher.default.memory.mb</name> - <value>2048</value> - <description> - The default amount of memory in MBs that is allocated for the Launcher AMs - </description> - </property> - <property> - <name>oozie.launcher.default.priority</name> - <value>0</value> - <description> - The default YARN priority of the Launcher AM - </description> - </property> - - <property> - <name>oozie.launcher.default.queue</name> - <value>default</value> - <description> - The default YARN queue where the Launcher AM is placed - </description> - </property> </configuration> http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java index dd90536..d74160a 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java @@ -35,7 +35,9 @@ import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; +import org.apache.oozie.test.XFsTestCase; import org.apache.oozie.test.XHCatTestCase; +import org.apache.oozie.test.XTestCase; import org.apache.oozie.util.ELEvaluator; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -242,7 +244,7 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase { protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName) throws Exception { Path appUri = new Path(getAppPath(), "workflow.xml"); - String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; + String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; content += "<start to='end' />"; content += "<end name='end' /></workflow-app>"; writeToFile(content, getAppPath(), "workflow.xml"); @@ -298,41 +300,6 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase { return workflow; } - protected WorkflowJobBean createBaseWorkflowWithLauncherConfig(XConfiguration protoConf, String actionName) throws Exception { - Path appUri = new Path(getAppPath(), "workflow.xml"); - - String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>"; - content += "<global>" - + "<launcher>" - + " <vcores>2</vcores>" - + " <memory.mb>1024</memory.mb>" - + " <queue>default</queue>" - + " <priority>1</priority>" - + " <java-opts>-verbose:class</java-opts>" - + "</launcher>" - + "</global>"; - - content += "<start to='end' />"; - content += "<end name='end' /></workflow-app>"; - writeToFile(content, getAppPath(), "workflow.xml"); - - WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>", - new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, - "end")) - .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); - XConfiguration wfConf = new XConfiguration(); - wfConf.set(OozieClient.USER_NAME, getTestUser()); - wfConf.set(OozieClient.APP_PATH, appUri.toString()); - - WorkflowJobBean workflow = createWorkflow(app, wfConf, protoConf); - - WorkflowActionBean action = new WorkflowActionBean(); - action.setName(actionName); - action.setId(Services.get().get(UUIDService.class).generateChildId(workflow.getId(), actionName)); - workflow.getActions().add(action); - return workflow; - } - private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, XConfiguration protoConf) throws Exception { WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB(); http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java index 43c71b0..6cee7a8 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java @@ -31,15 +31,6 @@ public class LauncherMainTester { System.out.println("Hello World!"); } - String testJavaOpts = System.getProperty("testJavaOpts"); - if (testJavaOpts != null && Boolean.parseBoolean(testJavaOpts)) { - throw new RuntimeException("Failing on purpose"); - } - String env = System.getenv("LAUNCHER_ENVIRON"); - if (env != null && env.equals("foo1")) { - throw new RuntimeException("Failing on purpose"); - } - if (args.length == 1) { if (args[0].equals("throwable")) { throw new Throwable("throwing throwable"); http://git-wip-us.apache.org/repos/asf/oozie/blob/8aca098c/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 02e60c0..d1d78fd 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -29,7 +29,14 @@ import java.io.Writer; import java.net.URI; import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.examples.SleepJob; @@ -44,7 +51,6 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; @@ -80,9 +86,7 @@ import org.junit.Test; public class TestJavaActionExecutor extends ActionExecutorTestCase { - private static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address"; - private static final String MAPRED_CHILD_JAVA_OPTS = "mapred.child.java.opts"; - private static final String MAPREDUCE_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; + public static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address"; @Override protected void beforeSetUp() throws Exception { @@ -269,8 +273,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { ae.setupLauncherConf(conf, actionXml, getFsTestCaseDir(), context); assertEquals("MAIN-CLASS", actionConf.get("oozie.action.java.main", "null")); assertEquals("org.apache.oozie.action.hadoop.JavaMain", ae.getLauncherMain(conf, actionXml)); - assertTrue(conf.get(MAPRED_CHILD_JAVA_OPTS).contains("JAVA-OPTS")); - assertTrue(conf.get(MAPREDUCE_MAP_JAVA_OPTS).contains("JAVA-OPTS")); + assertTrue(conf.get("mapred.child.java.opts").contains("JAVA-OPTS")); + assertTrue(conf.get("mapreduce.map.java.opts").contains("JAVA-OPTS")); assertEquals(Arrays.asList("A1", "A2"), Arrays.asList(LauncherAMUtils.getMainArguments(conf))); actionXml = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + @@ -1110,7 +1114,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { WorkflowApp app = new LiteWorkflowApp("testApp", wfxml, new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")). addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class)); - Configuration conf = getHadoopAccessorService(). + Configuration conf = Services.get().get(HadoopAccessorService.class). createConfiguration(new URI(getNameNodeUri()).getAuthority()); conf.set(OozieClient.APP_PATH, getNameNodeUri() + "/testPath"); conf.set(OozieClient.LOG_TOKEN, "testToken"); @@ -1222,8 +1226,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Configuration conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1247,8 +1251,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); - assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); + assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1274,8 +1278,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); - assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); + assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); + assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1301,8 +1305,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); - assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); + assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); + assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" @@ -1329,8 +1333,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf); - assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS)); - assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS)); + assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts")); + assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts")); } public void testActionLibsPath() throws Exception { @@ -2258,176 +2262,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertEquals("/user/map%20dev/test-case/shell/script/shell%201.sh", actPath); } - public void testSubmitOKWithVcoresAndMemory() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.vcores</name><value>1</value></property>" + - " <property><name>oozie.launcher.memory.mb</name><value>1024</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - submitAction(context); - waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); - ActionExecutor ae = new JavaActionExecutor(); - ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); - assertNull(context.getAction().getData()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); - } - - public void testSubmitOKWithLauncherJavaOpts() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.javaopts</name><value>-DtestJavaOpts=true</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - submitAction(context); - waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); - ActionExecutor ae = new JavaActionExecutor(); - ae.check(context, context.getAction()); - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); - assertNull(context.getAction().getData()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); - } - - public void testSubmitFailsWithNegativeVcores() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.vcores</name><value>-1</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - - boolean exception = false; - try { - submitAction(context); - } catch (ActionExecutorException e) { - exception = true; - assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass()); - } - - assertTrue("Exception was not caught", exception); - } - - public void testSubmitFailsWithNegativeMemory() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.memory.mb</name><value>-1</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - - boolean exception = false; - try { - submitAction(context); - } catch (ActionExecutorException e) { - exception = true; - assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass()); - } - - assertTrue("Exception was not caught", exception); - } - - public void testSubmitOKWithLauncherEnvVars() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.env</name><value>A=foo1" + File.pathSeparator + "B=foo2</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - submitAction(context); - waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); - ActionExecutor ae = new JavaActionExecutor(); - ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); - assertNull(context.getAction().getData()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); - } - - public void testEnvVarsPropagatedFromLauncherConfig() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.env</name><value>LAUNCHER_ENVIRON=foo1" + File.pathSeparator + "B=foo2</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - submitAction(context); - waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId()); - ActionExecutor ae = new JavaActionExecutor(); - ae.check(context, context.getAction()); - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); - assertNull(context.getAction().getData()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); - } - - public void testSubmitFailsWithInvalidLauncherEnvVars() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.env</name><value>Afoo1" + File.pathSeparator + "B=foo2</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - try { - submitAction(context); - fail(); - } - catch (ActionExecutorException e) { - assertTrue(e.getMessage().contains("Invalid launcher setting for environment variables")); - } - } - - public void testSubmitWithLauncherQueue() throws Exception { - String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - " <property><name>oozie.launcher.queue</name><value>test</value></property>" + - "</configuration>" + - "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" + - "</java>"; - Context context = createContext(actionXml, null); - submitAction(context); - final ApplicationId appId = ConverterUtils.toApplicationId(context.getAction().getExternalId()); - Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); - String queue = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getQueue(); - assertEquals("test", queue); - } - - private HadoopAccessorService getHadoopAccessorService() { - return Services.get().get(HadoopAccessorService.class); - } - - public void testChildKill() throws Exception { final JobConf clusterConf = createJobConf(); FileSystem fileSystem = FileSystem.get(clusterConf); @@ -2485,7 +2319,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { // kill the action - based on the job tag, the SleepJob is expected to be killed too ae.kill(context, context.getAction()); - HadoopAccessorService hadoopAccessorService = getHadoopAccessorService(); + HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class); Configuration config = hadoopAccessorService.createConfiguration(getJobTrackerUri()); YarnClient yarnClient = hadoopAccessorService.createYarnClient(getTestUser(), config);
