addressing some of Rohini's feedback
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/09c53894 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/09c53894 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/09c53894 Branch: refs/heads/oya Commit: 09c538941073ef12d1c3b01e48091aef4911e54f Parents: bec5536 Author: Gezapeti Cseh <[email protected]> Authored: Fri May 5 12:24:17 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Fri May 5 12:24:17 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 29 +- .../oozie/action/hadoop/LauncherHelper.java | 321 +++++++++++++++++++ .../action/hadoop/LauncherMapperHelper.java | 321 ------------------- .../action/hadoop/MapReduceActionExecutor.java | 15 +- .../oozie/command/wf/SubmitMRXCommand.java | 2 - core/src/main/resources/oozie-log4j.properties | 4 +- .../action/hadoop/TestFSPrepareActions.java | 10 +- .../action/hadoop/TestHCatPrepareActions.java | 2 +- .../action/hadoop/TestJavaActionExecutor.java | 43 +-- .../oozie/action/hadoop/TestLauncher.java | 136 ++++---- .../action/hadoop/TestPrepareActionsDriver.java | 4 +- .../action/hadoop/TestShellActionExecutor.java | 10 +- .../command/wf/TestActionCheckXCommand.java | 19 +- .../command/wf/TestActionStartXCommand.java | 13 +- .../oozie/service/TestRecoveryService.java | 7 +- .../action/hadoop/TestHiveActionExecutor.java | 17 +- .../action/hadoop/TestHive2ActionExecutor.java | 19 +- .../action/hadoop/TestPigActionExecutor.java | 8 +- .../action/hadoop/TestSqoopActionExecutor.java | 23 +- .../hadoop/TestMapReduceActionExecutor.java | 29 +- 20 files changed, 476 insertions(+), 556 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 4dc4abd..2256586 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -51,7 +51,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; @@ -72,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.WorkflowActionBean; @@ -88,7 +86,6 @@ import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.ClasspathUtils; import org.apache.oozie.util.ELEvaluationException; @@ -805,7 +802,7 @@ public class JavaActionExecutor extends ActionExecutor { launcherTime = context.getWorkflow().getCreatedTime().getTime(); } String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action); - LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); + LauncherHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); } else { LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties", @@ -837,22 +834,22 @@ public class JavaActionExecutor extends ActionExecutor { prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim(); } } - LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, + LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf, prepareXML); // Set the launcher Main Class - LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); - LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf); - LauncherMapperHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); - LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); - LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); + LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml)); + LauncherHelper.setupLauncherURIHandlerConf(launcherJobConf); + LauncherHelper.setupMaxOutputData(launcherJobConf, maxActionOutputLen); + LauncherHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize); + LauncherHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax); List<Element> list = actionXml.getChildren("arg", ns); String[] args = new String[list.size()]; for (int i = 0; i < list.size(); i++) { args[i] = list.get(i).getTextTrim(); } - LauncherMapperHelper.setupMainArguments(launcherJobConf, args); + LauncherHelper.setupMainArguments(launcherJobConf, args); // Make mapred.child.java.opts and mapreduce.map.java.opts equal, but give values from the latter priority; also append // <java-opt> and <java-opts> and give those highest priority @@ -959,7 +956,7 @@ public class JavaActionExecutor extends ActionExecutor { Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); String consoleUrl; - String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context + String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context .getRecoveryId()); boolean alreadyRunning = launcherId != null; @@ -1098,7 +1095,7 @@ public class JavaActionExecutor extends ActionExecutor { vargs.add("-Dlog4j.configuration=container-log4j.properties"); vargs.add("-Dlog4j.debug=true"); vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); - vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0); vargs.add("-Dhadoop.root.logger=INFO,CLA"); vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); @@ -1381,7 +1378,7 @@ public class JavaActionExecutor extends ActionExecutor { if (appStatus != null || fallback) { Path actionDir = context.getActionDir(); // load sequence file into object - Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); + Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); if (fallback) { String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS); if (finalStatus != null) { @@ -1411,7 +1408,7 @@ public class JavaActionExecutor extends ActionExecutor { LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId()); context.setExecutionData(appStatus.toString(), null); if (appStatus == FinalApplicationStatus.SUCCEEDED) { - if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { + if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) { context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS))); LOG.info(XLog.STD, "action produced output"); @@ -1419,7 +1416,7 @@ public class JavaActionExecutor extends ActionExecutor { else { context.setExecutionData(SUCCEEDED, null); } - if (LauncherMapperHelper.hasStatsData(actionData)) { + if (LauncherHelper.hasStatsData(actionData)) { context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS)); LOG.info(XLog.STD, "action produced stats"); } http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java new file mode 100644 index 0000000..5ac1a16 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java @@ -0,0 +1,321 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.service.UserGroupInformationService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.PropertiesUtils; + +// TODO: we're no longer using Launcher Mapper -- give this class a better name +public class LauncherHelper { + + public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + + public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) + throws HadoopAccessorException, IOException { + String jobId = null; + Path recoveryFile = new Path(actionDir, recoveryId); + FileSystem fs = Services.get().get(HadoopAccessorService.class) + .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); + + if (fs.exists(recoveryFile)) { + InputStream is = fs.open(recoveryFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + jobId = reader.readLine(); + reader.close(); + } + return jobId; + + } + + public static void setupMainClass(Configuration launcherConf, String javaMainClass) { + // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via + // <configuration> property + if (javaMainClass != null && !javaMainClass.equals("")) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); + } + } + + public static void setupLauncherURIHandlerConf(Configuration launcherConf) { + for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { + launcherConf.set(entry.getKey(), entry.getValue()); + } + } + + public static void setupMainArguments(Configuration launcherConf, String[] args) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); + for (int i = 0; i < args.length; i++) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); + } + } + + public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); + } + + /** + * Set the maximum value of stats data + * + * @param launcherConf the oozie launcher configuration + * @param maxStatsData the maximum allowed size of stats data + */ + public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); + } + + /** + * Set the maximum number of globbed files/dirs + * + * @param launcherConf the oozie launcher configuration + * @param fsGlobMax the maximum number of files/dirs for FS operation + */ + public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); + } + + public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir, + String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { + + launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); + launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); + launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); + + actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + + if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { + List<String> purgedEntries = new ArrayList<String>(); + Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); + for (String entry : entries) { + if (entry.contains("#")) { + purgedEntries.add(entry); + } + } + actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); + launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); + } + } + + public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag, + long launcherTime) + throws NoSuchAlgorithmException { + launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime); + // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) + String tag = getTag(launcherTag); + // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. + // mapreduce.job.tags should only go to child job launch by launcher. + actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag); + } + + public static String getTag(String launcherTag) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("MD5"); + digest.update(launcherTag.getBytes(), 0, launcherTag.length()); + String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); + return md5; + } + + public static boolean isMainDone(RunningJob runningJob) throws IOException { + return runningJob.isComplete(); + } + + public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { + boolean succeeded = runningJob.isSuccessful(); + if (succeeded) { + Counters counters = runningJob.getCounters(); + if (counters != null) { + Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); + if (group != null) { + succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; + } + } + } + return succeeded; + } + + /** + * Determine whether action has external child jobs or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); + } + + /** + * Determine whether action has output data or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasOutputData(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); + } + + /** + * Determine whether action has external stats or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ + return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); + } + + /** + * Determine whether action has new id (id swap) or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); + } + + /** + * Get the sequence file path storing all action data + * @param actionDir + * @return + */ + public static Path getActionDataSequenceFilePath(Path actionDir) { + return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); + } + + /** + * Utility function to load the contents of action data sequence file into + * memory object + * + * @param fs Action Filesystem + * @param actionDir Path + * @param conf Configuration + * @return Map action data + * @throws IOException + * @throws InterruptedException + */ + public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) + throws IOException, InterruptedException { + UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); + UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); + + return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { + @Override + public Map<String, String> run() throws IOException { + Map<String, String> ret = new HashMap<String, String>(); + Path seqFilePath = getActionDataSequenceFilePath(actionDir); + if (fs.exists(seqFilePath)) { + SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); + Text key = new Text(), value = new Text(); + while (seqFile.next(key, value)) { + ret.put(key.toString(), value.toString()); + } + seqFile.close(); + } + else { // maintain backward-compatibility. to be deprecated + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); + InputStream is; + BufferedReader reader = null; + Properties props; + if (files != null && files.length > 0) { + for (int x = 0; x < files.length; x++) { + Path file = files[x].getPath(); + if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, + IOUtils.getReaderAsString(reader, -1)); + } + else if (file.equals(new Path(actionDir, "newId.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + props = PropertiesUtils.readProperties(reader, -1); + ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { + int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, + 2 * 1024); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { + int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + Integer.MAX_VALUE); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); + } + } + } + } + return ret; + } + }); + } + + public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { + String tag; + if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { + tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); + } else if (parentId != null) { + tag = parentId + "@" + wfAction.getName(); + } else { + tag = wfAction.getId(); + } + return tag; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java deleted file mode 100644 index 8bd476e..0000000 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java +++ /dev/null @@ -1,321 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.math.BigInteger; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorException; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.UserGroupInformationService; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.PropertiesUtils; - -// TODO: we're no longer using Launcher Mapper -- give this class a better name -public class LauncherMapperHelper { - - public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; - - public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) - throws HadoopAccessorException, IOException { - String jobId = null; - Path recoveryFile = new Path(actionDir, recoveryId); - FileSystem fs = Services.get().get(HadoopAccessorService.class) - .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); - - if (fs.exists(recoveryFile)) { - InputStream is = fs.open(recoveryFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - jobId = reader.readLine(); - reader.close(); - } - return jobId; - - } - - public static void setupMainClass(Configuration launcherConf, String javaMainClass) { - // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via - // <configuration> property - if (javaMainClass != null && !javaMainClass.equals("")) { - launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); - } - } - - public static void setupLauncherURIHandlerConf(Configuration launcherConf) { - for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { - launcherConf.set(entry.getKey(), entry.getValue()); - } - } - - public static void setupMainArguments(Configuration launcherConf, String[] args) { - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); - for (int i = 0; i < args.length; i++) { - launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); - } - } - - public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); - } - - /** - * Set the maximum value of stats data - * - * @param launcherConf the oozie launcher configuration - * @param maxStatsData the maximum allowed size of stats data - */ - public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ - launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); - } - - /** - * Set the maximum number of globbed files/dirs - * - * @param launcherConf the oozie launcher configuration - * @param fsGlobMax the maximum number of files/dirs for FS operation - */ - public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); - } - - public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir, - String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { - - launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); - launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); - launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); - launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); - launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); - - actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); - actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); - - if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { - List<String> purgedEntries = new ArrayList<String>(); - Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); - for (String entry : entries) { - if (entry.contains("#")) { - purgedEntries.add(entry); - } - } - actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); - launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); - } - } - - public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag, - long launcherTime) - throws NoSuchAlgorithmException { - launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime); - // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) - String tag = getTag(launcherTag); - // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. - // mapreduce.job.tags should only go to child job launch by launcher. - actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag); - } - - public static String getTag(String launcherTag) throws NoSuchAlgorithmException { - MessageDigest digest = MessageDigest.getInstance("MD5"); - digest.update(launcherTag.getBytes(), 0, launcherTag.length()); - String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); - return md5; - } - - public static boolean isMainDone(RunningJob runningJob) throws IOException { - return runningJob.isComplete(); - } - - public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { - boolean succeeded = runningJob.isSuccessful(); - if (succeeded) { - Counters counters = runningJob.getCounters(); - if (counters != null) { - Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); - if (group != null) { - succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; - } - } - } - return succeeded; - } - - /** - * Determine whether action has external child jobs or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); - } - - /** - * Determine whether action has output data or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasOutputData(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); - } - - /** - * Determine whether action has external stats or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ - return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); - } - - /** - * Determine whether action has new id (id swap) or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); - } - - /** - * Get the sequence file path storing all action data - * @param actionDir - * @return - */ - public static Path getActionDataSequenceFilePath(Path actionDir) { - return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); - } - - /** - * Utility function to load the contents of action data sequence file into - * memory object - * - * @param fs Action Filesystem - * @param actionDir Path - * @param conf Configuration - * @return Map action data - * @throws IOException - * @throws InterruptedException - */ - public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) - throws IOException, InterruptedException { - UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); - UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); - - return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { - @Override - public Map<String, String> run() throws IOException { - Map<String, String> ret = new HashMap<String, String>(); - Path seqFilePath = getActionDataSequenceFilePath(actionDir); - if (fs.exists(seqFilePath)) { - SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); - Text key = new Text(), value = new Text(); - while (seqFile.next(key, value)) { - ret.put(key.toString(), value.toString()); - } - seqFile.close(); - } - else { // maintain backward-compatibility. to be deprecated - org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); - InputStream is; - BufferedReader reader = null; - Properties props; - if (files != null && files.length > 0) { - for (int x = 0; x < files.length; x++) { - Path file = files[x].getPath(); - if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, - IOUtils.getReaderAsString(reader, -1)); - } - else if (file.equals(new Path(actionDir, "newId.properties"))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - props = PropertiesUtils.readProperties(reader, -1); - ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { - int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, - 2 * 1024); - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils - .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { - int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, - Integer.MAX_VALUE); - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils - .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); - } - } - } - } - return ret; - } - }); - } - - public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { - String tag; - if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { - tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); - } else if (parentId != null) { - tag = parentId + "@" + wfAction.getName(); - } else { - tag = wfAction.getId(); - } - return tag; - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index a6f69eb..c15e362 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -316,9 +316,9 @@ public class MapReduceActionExecutor extends JavaActionExecutor { protected void injectCallback(Context context, Configuration conf) { // add callback for the MapReduce job String callback = context.getCallbackUrl("$jobStatus"); - String oriiginalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL); - if (oriiginalCallbackURL != null) { - LOG.warn("Overriding the action job end notification URI. Original value: {0}", oriiginalCallbackURL); + String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL); + if (originalCallbackURL != null) { + LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL); } conf.set(JOB_END_NOTIFICATION_URL, callback); @@ -340,7 +340,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor { Element actionXml = XmlUtils.parseXml(action.getConf()); jobConf = createBaseHadoopConf(context, actionXml); Path actionDir = context.getActionDir(); - actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); + actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); } catch (Exception e) { LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); throw convertException(e); @@ -367,9 +367,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor { jobCompleted = runningJob.isComplete(); } catch (Exception e) { - LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); LOG.warn("Unable to check the state of a running MapReduce job -" - + " please check the health of the Job History Server!"); + + " please check the health of the Job History Server!", e); exception = true; throw convertException(e); } finally { @@ -402,12 +401,12 @@ public class MapReduceActionExecutor extends JavaActionExecutor { public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException { // Kill the LauncherAM which submits the MR job super.kill(context, action); - + // TODO if action.getExternalChildIDs() is not empty, then kill based on that // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get // the YARN ApplicationID based on the tag and kill it as well YarnClient yarnClient = null; try { - String tag = LauncherMapperHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), + String tag = LauncherHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action)); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.ALL); http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java index 26deda8..fdb9b1b 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java @@ -45,8 +45,6 @@ public class SubmitMRXCommand extends SubmitHttpXCommand { SKIPPED_CONFS.add(XOozieClient.NN); DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2); - DEPRECATE_MAP.put(XOozieClient.RM, "yarn.resourcemanager.address"); - DEPRECATE_MAP.put(XOozieClient.NN, "fs.defaultFS"); DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name"); } http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/main/resources/oozie-log4j.properties ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-log4j.properties b/core/src/main/resources/oozie-log4j.properties index c86b301..165f1df 100644 --- a/core/src/main/resources/oozie-log4j.properties +++ b/core/src/main/resources/oozie-log4j.properties @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java index 386fef3..a08c16a 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestFSPrepareActions.java @@ -59,7 +59,7 @@ public class TestFSPrepareActions extends XFsTestCase { String prepareXML = "<prepare>" + "<delete path='" + newDir + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertFalse(fs.exists(newDir)); } @@ -85,7 +85,7 @@ public class TestFSPrepareActions extends XFsTestCase { String prepareXML = "<prepare>" + "<delete path='" + newDir + "/201[0-1]/*" + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertFalse(fs.exists(new Path(newDir + "/2010/10"))); assertFalse(fs.exists(new Path(newDir + "/2011/10"))); @@ -107,7 +107,7 @@ public class TestFSPrepareActions extends XFsTestCase { String prepareXML = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertTrue(fs.exists(newDir)); } @@ -126,7 +126,7 @@ public class TestFSPrepareActions extends XFsTestCase { try { JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); fail("Expected to catch an exception but did not encounter any"); } catch (LauncherException le) { @@ -153,7 +153,7 @@ public class TestFSPrepareActions extends XFsTestCase { String prepareXML = "<prepare>" + "<mkdir path='" + noSchemePath + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertTrue(fs.exists(new Path(noSchemePath))); http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java index d66d9c9..4fe9452 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestHCatPrepareActions.java @@ -65,7 +65,7 @@ public class TestHCatPrepareActions extends XHCatTestCase { + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); FileSystem fs = getFileSystem(); assertFalse(fs.exists(new Path(part1))); http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index d6a44a0..3d423b9 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -45,7 +45,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; @@ -476,7 +475,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); waitUntilYarnAppDoneAndAssertSuccess(runningJob); - //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); + //FIXME assertFalse(LauncherHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); @@ -500,7 +499,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); waitUntilYarnAppDoneAndAssertSuccess(runningJob); - //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); + //FIXME assertFalse(LauncherHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); @@ -522,7 +521,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml, null); final String runningJob = submitAction(context); waitUntilYarnAppDoneAndAssertSuccess(runningJob); - //FIXME assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); + //FIXME assertFalse(LauncherHelper.isMainSuccessful(runningJob)); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); @@ -564,7 +563,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { public boolean evaluate() throws Exception { JavaActionExecutor ae = new JavaActionExecutor(); Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - return LauncherMapperHelper.getRecoveryId(conf, context.getActionDir(), context.getRecoveryId()) != null; + return LauncherHelper.getRecoveryId(conf, context.getActionDir(), context.getRecoveryId()) != null; } }); @@ -1638,41 +1637,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertNotSame(conf.get(JavaActionExecutor.ACL_VIEW_JOB), actionConf.get(JavaActionExecutor.ACL_VIEW_JOB)); assertNotSame(conf.get(JavaActionExecutor.ACL_MODIFY_JOB), actionConf.get(JavaActionExecutor.ACL_MODIFY_JOB)); } -/* - public void testACLModifyJob() throws Exception { - // CASE 1: If user has provided modify-acl value - // then it should NOT be overridden by group name - String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node> <configuration>" + - "<property><name>mapreduce.job.acl-modify-job</name><value>MODIFIER</value></property>" + - "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + - "</java>"; - - Context context = createContext(actionXml, "USERS"); - String job = submitAction(context); - FileSystem fs = context.getAppFileSystem(); - Configuration jobXmlConf = new XConfiguration(fs.open(new Path(job.getJobFile()))); - String userModifyAcl = jobXmlConf.get(JavaActionExecutor.ACL_MODIFY_JOB); // 'MODIFIER' - String userGroup = context.getWorkflow().getAcl(); // 'USERS' - assertFalse(userGroup.equals(userModifyAcl)); - - // CASE 2: If user has not provided modify-acl value - // then it equals group name - actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node> <configuration>" + - "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + - "</java>"; - context = createContext(actionXml, "USERS"); - job = submitAction(context); - fs = context.getAppFileSystem(); - jobXmlConf = new XConfiguration(fs.open(new Path(job.getJobFile()))); - - userModifyAcl = jobXmlConf.get(JavaActionExecutor.ACL_MODIFY_JOB); - userGroup = context.getWorkflow().getAcl(); - assertTrue(userGroup.equals(userModifyAcl)); - } -*/ public void testParseJobXmlAndConfiguration() throws Exception { String str = "<java>" + "<job-xml>job1.xml</job-xml>" http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java index 7920e1c..1088fd1 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncher.java @@ -83,12 +83,12 @@ public class TestLauncher extends XFsTestCase { jobConf.set("fs.default.name", getNameNodeUri()); - LauncherMapperHelper.setupMainClass(jobConf, LauncherMainTester.class.getName()); - LauncherMapperHelper.setupMainArguments(jobConf, arg); + LauncherHelper.setupMainClass(jobConf, LauncherMainTester.class.getName()); + LauncherHelper.setupMainArguments(jobConf, arg); Configuration actionConf = new XConfiguration(); - LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); - LauncherMapperHelper.setupYarnRestartHandling(jobConf, jobConf, "1@a", System.currentTimeMillis()); + LauncherHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); + LauncherHelper.setupYarnRestartHandling(jobConf, jobConf, "1@a", System.currentTimeMillis()); assertEquals("1", actionConf.get("oozie.job.id")); assertEquals("1@a", actionConf.get("oozie.action.id")); @@ -127,13 +127,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertFalse(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertFalse(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertTrue(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void ___testExit0() throws Exception { @@ -150,13 +150,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertFalse(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertFalse(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertTrue(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void ___testExit1() throws Exception { @@ -173,13 +173,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertTrue(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertTrue(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertFalse(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); assertTrue(actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)); } @@ -197,13 +197,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertTrue(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertTrue(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertFalse(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void __testThrowable() throws Exception { @@ -220,13 +220,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertTrue(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertFalse(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertTrue(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertFalse(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void __testOutput() throws Exception { @@ -243,13 +243,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertTrue(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertTrue(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertTrue(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertTrue(LauncherHelper.isMainSuccessful(runningJob)); + assertTrue(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void __testNewId() throws Exception { @@ -266,13 +266,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertTrue(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertTrue(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertTrue(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } public void __testSecurityManager() throws Exception { @@ -289,13 +289,13 @@ public class TestLauncher extends XFsTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, actionDir, conf); - assertFalse(fs.exists(LauncherMapperHelper.getActionDataSequenceFilePath(actionDir))); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); - assertTrue(LauncherMapperHelper.isMainSuccessful(runningJob)); - assertFalse(LauncherMapperHelper.hasOutputData(actionData)); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - assertTrue(LauncherMapperHelper.isMainDone(runningJob)); + Map<String, String> actionData = LauncherHelper.getActionData(fs, actionDir, conf); + assertFalse(fs.exists(LauncherHelper.getActionDataSequenceFilePath(actionDir))); + assertTrue(LauncherHelper.isMainDone(runningJob)); + assertTrue(LauncherHelper.isMainSuccessful(runningJob)); + assertFalse(LauncherHelper.hasOutputData(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.isMainDone(runningJob)); } // Test to ensure that the property value "oozie.action.prepare.xml" in the configuration of the job is an empty @@ -311,7 +311,7 @@ public class TestLauncher extends XFsTestCase { Configuration actionConf = new XConfiguration(); String prepareBlock = ""; - LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock); + LauncherHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock); assertTrue(jobConf.get("oozie.action.prepare.xml").equals("")); } @@ -330,28 +330,28 @@ public class TestLauncher extends XFsTestCase { Configuration actionConf = new XConfiguration(); String prepareBlock = "<prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>"; - LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock); + LauncherHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, prepareBlock); assertTrue(jobConf.get("oozie.action.prepare.xml").equals(prepareBlock)); } public void testSetupMainClass() throws Exception { Configuration conf = new Configuration(false); - LauncherMapperHelper.setupMainClass(conf, ""); + LauncherHelper.setupMainClass(conf, ""); assertNull(conf.get("oozie.launcher.action.main.class")); conf = new Configuration(false); - LauncherMapperHelper.setupMainClass(conf, "org.blah.myclass1"); + LauncherHelper.setupMainClass(conf, "org.blah.myclass1"); assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass1"); conf = new Configuration(false); conf.set("oozie.launcher.action.main.class", "org.blah.myclass2"); - LauncherMapperHelper.setupMainClass(conf, ""); + LauncherHelper.setupMainClass(conf, ""); assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass2"); // the passed argument (myclass1) should have priority conf = new Configuration(false); conf.set("oozie.launcher.action.main.class", "org.blah.myclass2"); - LauncherMapperHelper.setupMainClass(conf, "org.blah.myclass1"); + LauncherHelper.setupMainClass(conf, "org.blah.myclass1"); assertEquals(conf.get("oozie.launcher.action.main.class"), "org.blah.myclass1"); } @@ -367,14 +367,14 @@ public class TestLauncher extends XFsTestCase { Configuration actionConf = new XConfiguration(); actionConf.set("mapreduce.job.cache.files", "a.jar,aa.jar#aa.jar"); - LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); + LauncherHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); assertFalse(jobConf.getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)); assertEquals("a.jar,aa.jar#aa.jar", actionConf.get("mapreduce.job.cache.files")); Services.get().getConf().setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); actionConf = new XConfiguration(); actionConf.set("mapreduce.job.cache.files", "a.jar,aa.jar#aa.jar"); - LauncherMapperHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); + LauncherHelper.setupLauncherInfo(jobConf, "1", "1@a", actionDir, "1@a-0", actionConf, ""); assertTrue(jobConf.getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)); assertEquals("aa.jar#aa.jar", actionConf.get("mapreduce.job.cache.files")); } http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java index e940d39..89aeab6 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java @@ -55,7 +55,7 @@ public class TestPrepareActionsDriver extends XFsTestCase { } JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); assertTrue(fs.exists(actionDir)); } @@ -75,7 +75,7 @@ public class TestPrepareActionsDriver extends XFsTestCase { try { prepareXML = "prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>"; JobConf conf = createJobConf(); - LauncherMapperHelper.setupLauncherURIHandlerConf(conf); + LauncherHelper.setupLauncherURIHandlerConf(conf); PrepareActionsDriver.doOperations(prepareXML, conf); fail("Expected to catch an exception but did not encounter any"); } catch (Exception ex) { http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java index 931307e..bda7222 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java @@ -26,16 +26,10 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.util.Shell; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.ActionService; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.PropertiesUtils; @@ -312,9 +306,9 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); ShellActionExecutor ae = new ShellActionExecutor(); ae.check(context, context.getAction()); http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index 4069a95..a5128a8 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -27,14 +27,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.WorkflowAction; @@ -266,9 +265,9 @@ public class TestActionCheckXCommand extends XDataTestCase { waitUntilYarnAppDoneAndAssertSuccess(launcherId); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(action.getId()).call(); action = jpaService.execute(wfActionGetCmd); @@ -427,9 +426,9 @@ public class TestActionCheckXCommand extends XDataTestCase { } }); assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(actionId).call(); WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd); @@ -494,9 +493,9 @@ public class TestActionCheckXCommand extends XDataTestCase { } }); assertTrue(launcherJob.isSuccessful()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(action1.getId()).call(); WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd); @@ -561,9 +560,9 @@ public class TestActionCheckXCommand extends XDataTestCase { }); assertTrue(launcherJob2.isSuccessful()); - actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); new ActionCheckXCommand(actionId).call(); WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd); http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java index 6c3051b..80c5d54 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java @@ -28,11 +28,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.OozieClient; @@ -48,7 +46,6 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.LiteWorkflowStoreService; @@ -165,9 +162,9 @@ public class TestActionStartXCommand extends XDataTestCase { String launcherId = action.getExternalId(); waitUntilYarnAppDoneAndAssertSuccess(launcherId); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } public void testActionStartToCheckRetry() throws Exception { @@ -233,9 +230,9 @@ public class TestActionStartXCommand extends XDataTestCase { String launcherId = action.getExternalId(); waitUntilYarnAppDoneAndAssertSuccess(launcherId); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index c19d62a..ce04c6d 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -21,7 +21,6 @@ package org.apache.oozie.service; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; @@ -31,7 +30,7 @@ import org.apache.oozie.DagEngine; import org.apache.oozie.ForTestingActionExecutor; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.LauncherMapperHelper; +import org.apache.oozie.action.hadoop.LauncherHelper; import org.apache.oozie.action.hadoop.MapReduceActionExecutor; import org.apache.oozie.action.hadoop.MapperReducerForTest; import org.apache.oozie.client.CoordinatorAction; @@ -251,9 +250,9 @@ public class TestRecoveryService extends XDataTestCase { waitUntilYarnAppDoneAndAssertSuccess(launcherId); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); + assertTrue(LauncherHelper.hasIdSwap(actionData)); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/09c53894/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java index ed9bba3..71ee641 100644 --- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java +++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java @@ -22,7 +22,6 @@ import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.StringReader; import java.io.Writer; import java.text.MessageFormat; import java.util.Arrays; @@ -32,22 +31,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.ClassUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.jdom.Element; import org.jdom.Namespace; public class TestHiveActionExecutor extends ActionExecutorTestCase { @@ -167,9 +158,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); HiveActionExecutor ae = new HiveActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -190,9 +181,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); HiveActionExecutor ae = new HiveActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId()));
