http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java index dbef441..23648b8 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import java.net.MalformedURLException; import java.net.Proxy; import java.net.URL; +import java.util.EnumSet; // Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier /** @@ -34,9 +35,11 @@ import java.net.URL; */ public class LauncherAMCallbackNotifier { private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback."; + private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000; + private static final EnumSet<FinalApplicationStatus> FAILED_APPLICATION_STATES = EnumSet.of(FinalApplicationStatus.KILLED, FinalApplicationStatus.FAILED); + public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts"; public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval"; - static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000; public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts"; public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout"; public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url"; @@ -51,6 +54,7 @@ public class LauncherAMCallbackNotifier { protected URL urlToNotify; //URL to notify read from the config protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification + /** * Parse the URL that needs to be notified of the end of the job, along * with the number of retries in case of failure, the amount of time to @@ -136,7 +140,7 @@ public class LauncherAMCallbackNotifier { * * @throws InterruptedException */ - public void notifyURL(FinalApplicationStatus finalStatus) throws InterruptedException { + public void notifyURL(FinalApplicationStatus finalStatus, boolean backgroundAction) throws InterruptedException { // Do we need job-end notification? if (userUrl == null) { System.out.println("Callback notification URL not set, skipping."); @@ -145,7 +149,12 @@ public class LauncherAMCallbackNotifier { //Do string replacements for final status if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) { - userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString()); + // only send back "RUNNING" if the submission was successful + if (backgroundAction && !FAILED_APPLICATION_STATES.contains(finalStatus)) { + userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, "RUNNING"); + } else { + userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString()); + } } // Create the URL, ensure sanity
http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index fef6523..6955416 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -201,12 +201,12 @@ public abstract class LauncherMain { * @param conf Configuration/Properties object to dump to STDOUT * @throws IOException thrown if an IO error ocurred. */ - @SuppressWarnings("unchecked") - protected static void logMasking(String header, Collection<String> maskSet, Iterable conf) throws IOException { + + protected static void logMasking(String header, Collection<String> maskSet, Iterable<Map.Entry<String,String>> conf) throws IOException { StringWriter writer = new StringWriter(); writer.write(header + "\n"); writer.write("--------------------\n"); - for (Map.Entry entry : (Iterable<Map.Entry>) conf) { + for (Map.Entry<String, String> entry : conf) { String name = (String) entry.getKey(); String value = (String) entry.getValue(); for (String mask : maskSet) { @@ -247,7 +247,7 @@ public abstract class LauncherMain { * @throws OozieActionConfiguratorException */ protected static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException { - String configClass = System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS); + String configClass = actionConf.get(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS); if (configClass != null) { try { Class<?> klass = Class.forName(configClass); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java index 96d59b9..ee5044b 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java @@ -123,7 +123,6 @@ public class MapReduceMain extends LauncherMain { return runJob; } - @SuppressWarnings("unchecked") protected JobClient createJobClient(JobConf jobConf) throws IOException { return new JobClient(jobConf); } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java index 3d1110b..38509b4 100644 --- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java +++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java @@ -121,7 +121,7 @@ public class PigMain extends LauncherMain { pigProperties.store(os, ""); os.close(); - logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet()); + logMasking("pig.properties:", Arrays.asList("password"), (Iterable<Map.Entry<String, String>>)(Iterable<?>) pigProperties.entrySet()); List<String> arguments = new ArrayList<String>(); String script = actionConf.get(PigActionExecutor.PIG_SCRIPT); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java index 09d3da3..16064e7 100644 --- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java +++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java @@ -145,7 +145,7 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { PigActionExecutor ae = new PigActionExecutor(); WorkflowAction action = context.getAction(); @@ -160,34 +160,14 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("mapreduce.framework.name", "yarn"); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } private void _testSubmit(String actionXml, boolean checkForSuccess) throws Exception { Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); - - sleep(2000); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); ae.check(context, context.getAction()); @@ -223,9 +203,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to true String actionXml = setPigActionXml(PIG_SCRIPT, true); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); @@ -276,9 +255,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to false String actionXml = setPigActionXml(PIG_SCRIPT, false); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); @@ -306,9 +284,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to true String actionXml = setPigActionXml(PIG_SCRIPT, true); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); PigActionExecutor ae = new PigActionExecutor(); WorkflowAction wfAction = context.getAction(); @@ -328,9 +305,8 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { // Set the action xml with the option for retrieving stats to false String actionXml = setPigActionXml(PIG_SCRIPT, false); Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - evaluateLauncherJob(launcherJob); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); @@ -347,16 +323,6 @@ public class TestPigActionExecutor extends ActionExecutorTestCase { assertNotNull(wfAction.getExternalChildIDs()); } - private void evaluateLauncherJob(final RunningJob launcherJob) throws Exception{ - waitFor(180 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - sleep(2000); - } - protected XConfiguration setPigConfig(boolean writeStats) { XConfiguration conf = new XConfiguration(); conf.set("oozie.pig.log.level", "INFO"); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java index 458baaa..9d8d4aa 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java @@ -106,13 +106,8 @@ public class TestPyspark extends ActionExecutorTestCase { WorkflowAction.Status wfStatus) throws Exception { Context context = createContext(getActionXml(sparkOpts), wf); - final RunningJob launcherJob = submitAction(context); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); SparkActionExecutor ae = new SparkActionExecutor(); ae.check(context, context.getAction()); assertEquals(externalStatus, context.getAction().getExternalStatus()); @@ -120,7 +115,7 @@ public class TestPyspark extends ActionExecutorTestCase { assertEquals(wfStatus, context.getAction().getStatus()); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); WorkflowAction action = context.getAction(); ae.prepareActionDir(getFileSystem(), context); @@ -131,12 +126,8 @@ public class TestPyspark extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + return jobId; } protected Context createContext(String actionXml, WorkflowJobBean wf) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java index 8c77be0..d97f1f0 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java @@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; @@ -175,13 +176,8 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { scriptWriter.close(); Context context = createContext(getActionXml()); - final RunningJob launcherJob = submitAction(context); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherID = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherID); SparkActionExecutor ae = new SparkActionExecutor(); ae.check(context, context.getAction()); @@ -212,7 +208,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { SparkActionExecutor ae = new SparkActionExecutor(); WorkflowAction action = context.getAction(); @@ -227,14 +223,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase { assertNotNull(jobTracker); assertNotNull(consoleUrl); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - - JobClient jobClient = - Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java index 6474092..abf5915 100644 --- a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java +++ b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java @@ -179,14 +179,8 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(getActionXml()); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); @@ -227,14 +221,8 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(getActionXmlEval()); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); @@ -263,14 +251,8 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { createDB(); Context context = createContext(getActionXmlFreeFromQuery()); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); @@ -315,7 +297,7 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { } - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { SqoopActionExecutor ae = new SqoopActionExecutor(); WorkflowAction action = context.getAction(); @@ -329,24 +311,7 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1"); - XConfiguration conf = new XConfiguration( - new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("mapreduce.framework.name", "yarn"); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } private Context createContext(String actionXml) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index d4095da..53330ce 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -18,45 +18,17 @@ package org.apache.oozie.action.hadoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.streaming.StreamJob; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.command.wf.StartXCommand; -import org.apache.oozie.command.wf.SubmitXCommand; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.ClassUtils; -import org.jdom.Element; - import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; -import java.io.OutputStream; import java.io.InputStream; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.Writer; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.StringReader; +import java.io.Writer; import java.net.URI; import java.util.Arrays; import java.util.List; @@ -67,14 +39,44 @@ import java.util.jar.JarOutputStream; import java.util.regex.Pattern; import java.util.zip.ZipEntry; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.streaming.StreamJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutorException; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.command.wf.StartXCommand; +import org.apache.oozie.command.wf.SubmitXCommand; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.ClassUtils; +import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.PropertiesUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + private static final String PIPES = "pipes"; + private static final String MAP_REDUCE = "map-reduce"; + @Override protected void setSystemProps() throws Exception { super.setSystemProps(); @@ -212,10 +214,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertEquals("global-output-dir", actionConf.get("outputDir")); } - @SuppressWarnings("unchecked") public void testSetupMethods() throws Exception { MapReduceActionExecutor ae = new MapReduceActionExecutor(); - assertEquals(Arrays.asList(StreamingMain.class), ae.getLauncherClasses()); + List<Class<?>> classes = Arrays.<Class<?>>asList(StreamingMain.class); + assertEquals(classes, ae.getLauncherClasses()); Element actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" @@ -226,7 +228,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { XConfiguration protoConf = new XConfiguration(); protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action"); WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); action.setType(ae.getType()); @@ -386,7 +387,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { MapReduceActionExecutor ae = new MapReduceActionExecutor(); WorkflowAction action = context.getAction(); @@ -408,29 +409,21 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { conf.set("fs.default.name", e.getChildTextTrim("name-node")); conf.set("user.name", context.getProtoActionConf().get("user.name")); conf.set("group.name", getTestGroup()); - conf.set("mapreduce.framework.name", "yarn"); + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + ae.submitLauncher(getFileSystem(), context, context.getAction()); + return context.getAction().getExternalId(); } private String _testSubmit(String name, String actionXml) throws Exception { Context context = createContext(name, actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -441,7 +434,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); @@ -453,7 +445,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -471,17 +463,27 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { return mrJob.getID().toString(); } + private void _testSubmitError(String actionXml, String errorMessage) throws Exception { + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + + MapReduceActionExecutor ae = new MapReduceActionExecutor(); + ae.check(context, context.getAction()); + + assertEquals(JavaActionExecutor.FAILED_KILLED, context.getAction().getExternalStatus()); + + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); + assertTrue(context.getAction().getErrorMessage().contains("already exists")); + } + private void _testSubmitWithCredentials(String name, String actionXml) throws Exception { - Context context = createContextWithCredentials("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + Context context = createContextWithCredentials(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -492,7 +494,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - String group = conf.get("group.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); @@ -504,7 +505,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -555,7 +556,37 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - _testSubmit("map-reduce", actionXml); + _testSubmit(MAP_REDUCE, actionXml); + } + + public void testMapReduceActionError() throws Exception { + FileSystem fs = getFileSystem(); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output1"); + + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); + ow.write("dummy\n"); + ow.write("dummy\n"); + ow.close(); + + String actionXml = "<map-reduce>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<configuration>" + + "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() + + "</value></property>" + + "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() + + "</value></property>" + + "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" + + "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" + + "</configuration>" + + "</map-reduce>"; + + _testSubmitError(actionXml, "already exists"); } public void testMapReduceWithConfigClass() throws Exception { @@ -569,7 +600,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { w.write("dummy\n"); w.close(); - Path jobXml = new Path(getFsTestCaseDir(), "job.xml"); + Path jobXml = new Path(getFsTestCaseDir(), "action.xml"); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, jobXml.toUri().toString()); conf.set("B", "b"); @@ -578,9 +609,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + conf.toXmlString(false) + "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>"; - _testSubmit("map-reduce", actionXml); + _testSubmit(MAP_REDUCE, actionXml); Configuration conf2 = new Configuration(false); conf2.addResource(fs.open(jobXml)); + assertEquals("a", conf2.get("A")); assertEquals("c", conf2.get("B")); } @@ -601,16 +633,9 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getMapReduceConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<config-class>org.apache.oozie.does.not.exist</config-class>" + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - waitFor(120 * 2000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob)); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); final Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, context.getActionDir(), context.getProtoActionConf()); @@ -638,16 +663,9 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + conf.toXmlString(false) + "<config-class>" + OozieActionConfiguratorForTest.class.getName() + "</config-class>" + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - waitFor(120 * 2000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob)); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); final Map<String, String> actionData = LauncherMapperHelper.getActionData(fs, context.getActionDir(), context.getProtoActionConf()); @@ -671,7 +689,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getNameNodeUri() + "</name-node>" + getMapReduceCredentialsConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - _testSubmitWithCredentials("map-reduce", actionXml); + _testSubmitWithCredentials(MAP_REDUCE, actionXml); } protected Path createAndUploadUberJar() throws Exception { @@ -734,7 +752,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - String jobID = _testSubmit("map-reduce", actionXml); + String jobID = _testSubmit(MAP_REDUCE, actionXml); boolean containsLib1Jar = false; String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar"; @@ -914,7 +932,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + "#wordcount-simple" + "</program>" + " </pipes>" + getPipesConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<file>" + programPath + "</file>" + "</map-reduce>"; - _testSubmit("pipes", actionXml); + _testSubmit(PIPES, actionXml); } else { System.out.println( @@ -948,15 +966,9 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "true") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); MapReduceActionExecutor ae = new MapReduceActionExecutor(); JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); @@ -981,7 +993,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1026,15 +1038,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -1057,7 +1064,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1098,15 +1105,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + getOozieActionExternalStatsWriteProperty(inputDir.toString(), outputDir.toString(), "false") .toXmlString(false) + "</map-reduce>"; - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -1129,7 +1131,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); actionXml = "<map-reduce>" @@ -1185,35 +1187,24 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { .append(mrConfig.toXmlString(false)).append("</map-reduce>"); String actionXml = sb.toString(); - Context context = createContext("map-reduce", actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(120 * 2000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); + Context context = createContext(MAP_REDUCE, actionXml); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - assertTrue(launcherJob.isSuccessful()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), context.getProtoActionConf()); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); - // Assert launcher job name has been set - System.out.println("Launcher job name: " + launcherJob.getJobName()); - assertTrue(launcherJob.getJobName().equals(launcherJobName)); MapReduceActionExecutor ae = new MapReduceActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - JobConf conf = ae.createBaseHadoopConf(context, - XmlUtils.parseXml(actionXml)); + JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); JobClient jobClient = Services.get().get(HadoopAccessorService.class) .createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context - .getAction().getExternalChildIDs())); + final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); waitFor(120 * 1000, new Predicate() { public boolean evaluate() throws Exception { @@ -1223,7 +1214,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { assertTrue(mrJob.isSuccessful()); ae.check(context, context.getAction()); - assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); assertNull(context.getAction().getData()); ae.end(context, context.getAction()); @@ -1304,7 +1295,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { Element eActionXml = XmlUtils.parseXml(actionXml); - Context context = createContext("map-reduce", actionXml); + Context context = createContext(MAP_REDUCE, actionXml); Path appPath = getAppPath();
