http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
index dbef441..23648b8 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.Proxy;
 import java.net.URL;
+import java.util.EnumSet;
 
 // Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
 /**
@@ -34,9 +35,11 @@ import java.net.URL;
  */
 public class LauncherAMCallbackNotifier {
     private static final String OOZIE_LAUNCHER_CALLBACK = 
"oozie.launcher.callback.";
+    private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+    private static final EnumSet<FinalApplicationStatus> 
FAILED_APPLICATION_STATES = EnumSet.of(FinalApplicationStatus.KILLED, 
FinalApplicationStatus.FAILED);
+
     public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = 
OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
     public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = 
OOZIE_LAUNCHER_CALLBACK + "retry.interval";
-    static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
     public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = 
OOZIE_LAUNCHER_CALLBACK + "max.attempts";
     public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = 
OOZIE_LAUNCHER_CALLBACK + "timeout";
     public static final String OOZIE_LAUNCHER_CALLBACK_URL = 
OOZIE_LAUNCHER_CALLBACK + "url";
@@ -51,6 +54,7 @@ public class LauncherAMCallbackNotifier {
     protected URL urlToNotify; //URL to notify read from the config
     protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for 
notification
 
+
     /**
      * Parse the URL that needs to be notified of the end of the job, along
      * with the number of retries in case of failure, the amount of time to
@@ -136,7 +140,7 @@ public class LauncherAMCallbackNotifier {
      *
      * @throws InterruptedException
      */
-    public void notifyURL(FinalApplicationStatus finalStatus) throws 
InterruptedException {
+    public void notifyURL(FinalApplicationStatus finalStatus, boolean 
backgroundAction) throws InterruptedException {
         // Do we need job-end notification?
         if (userUrl == null) {
             System.out.println("Callback notification URL not set, skipping.");
@@ -145,7 +149,12 @@ public class LauncherAMCallbackNotifier {
 
         //Do string replacements for final status
         if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
-            userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, 
finalStatus.toString());
+            // only send back "RUNNING" if the submission was successful
+            if (backgroundAction && 
!FAILED_APPLICATION_STATES.contains(finalStatus)) {
+                userUrl = 
userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, "RUNNING");
+            } else {
+                userUrl = 
userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, 
finalStatus.toString());
+            }
         }
 
         // Create the URL, ensure sanity

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index fef6523..6955416 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -201,12 +201,12 @@ public abstract class LauncherMain {
      * @param conf Configuration/Properties object to dump to STDOUT
      * @throws IOException thrown if an IO error ocurred.
      */
-    @SuppressWarnings("unchecked")
-    protected static void logMasking(String header, Collection<String> 
maskSet, Iterable conf) throws IOException {
+
+    protected static void logMasking(String header, Collection<String> 
maskSet, Iterable<Map.Entry<String,String>> conf) throws IOException {
         StringWriter writer = new StringWriter();
         writer.write(header + "\n");
         writer.write("--------------------\n");
-        for (Map.Entry entry : (Iterable<Map.Entry>) conf) {
+        for (Map.Entry<String, String> entry : conf) {
             String name = (String) entry.getKey();
             String value = (String) entry.getValue();
             for (String mask : maskSet) {
@@ -247,7 +247,7 @@ public abstract class LauncherMain {
      * @throws OozieActionConfiguratorException
      */
     protected static void runConfigClass(JobConf actionConf) throws 
OozieActionConfiguratorException {
-        String configClass = 
System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
+        String configClass = 
actionConf.get(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
         if (configClass != null) {
             try {
                 Class<?> klass = Class.forName(configClass);

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index 96d59b9..ee5044b 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -123,7 +123,6 @@ public class MapReduceMain extends LauncherMain {
         return runJob;
     }
 
-    @SuppressWarnings("unchecked")
     protected JobClient createJobClient(JobConf jobConf) throws IOException {
         return new JobClient(jobConf);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java 
b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
index 3d1110b..38509b4 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
@@ -121,7 +121,7 @@ public class PigMain extends LauncherMain {
         pigProperties.store(os, "");
         os.close();
 
-        logMasking("pig.properties:", Arrays.asList("password"), 
pigProperties.entrySet());
+        logMasking("pig.properties:", Arrays.asList("password"), 
(Iterable<Map.Entry<String, String>>)(Iterable<?>) pigProperties.entrySet());
 
         List<String> arguments = new ArrayList<String>();
         String script = actionConf.get(PigActionExecutor.PIG_SCRIPT);

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
index 09d3da3..16064e7 100644
--- 
a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
+++ 
b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/TestPigActionExecutor.java
@@ -145,7 +145,7 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         PigActionExecutor ae = new PigActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -160,34 +160,14 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("mapreduce.framework.name", "yarn");
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
     private void _testSubmit(String actionXml, boolean checkForSuccess) throws 
Exception {
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
-
-        sleep(2000);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         ae.check(context, context.getAction());
@@ -223,9 +203,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to true
         String actionXml = setPigActionXml(PIG_SCRIPT, true);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
@@ -276,9 +255,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to false
         String actionXml = setPigActionXml(PIG_SCRIPT, false);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
@@ -306,9 +284,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to true
         String actionXml = setPigActionXml(PIG_SCRIPT, true);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         PigActionExecutor ae = new PigActionExecutor();
         WorkflowAction wfAction = context.getAction();
@@ -328,9 +305,8 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         // Set the action xml with the option for retrieving stats to false
         String actionXml = setPigActionXml(PIG_SCRIPT, false);
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        evaluateLauncherJob(launcherJob);
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
@@ -347,16 +323,6 @@ public class TestPigActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(wfAction.getExternalChildIDs());
     }
 
-    private void evaluateLauncherJob(final RunningJob launcherJob) throws 
Exception{
-        waitFor(180 * 1000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        sleep(2000);
-    }
-
     protected XConfiguration setPigConfig(boolean writeStats) {
         XConfiguration conf = new XConfiguration();
         conf.set("oozie.pig.log.level", "INFO");

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
index 458baaa..9d8d4aa 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestPyspark.java
@@ -106,13 +106,8 @@ public class TestPyspark extends ActionExecutorTestCase {
             WorkflowAction.Status wfStatus)
             throws Exception {
         Context context = createContext(getActionXml(sparkOpts), wf);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(200 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         SparkActionExecutor ae = new SparkActionExecutor();
         ae.check(context, context.getAction());
         assertEquals(externalStatus, context.getAction().getExternalStatus());
@@ -120,7 +115,7 @@ public class TestPyspark extends ActionExecutorTestCase {
         assertEquals(wfStatus, context.getAction().getStatus());
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
         WorkflowAction action = context.getAction();
         ae.prepareActionDir(getFileSystem(), context);
@@ -131,12 +126,8 @@ public class TestPyspark extends ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        return jobId;
     }
 
     protected Context createContext(String actionXml, WorkflowJobBean wf) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index 8c77be0..d97f1f0 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
@@ -175,13 +176,8 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         scriptWriter.close();
 
         Context context = createContext(getActionXml());
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(200 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherID = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherID);
 
         SparkActionExecutor ae = new SparkActionExecutor();
         ae.check(context, context.getAction());
@@ -212,7 +208,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -227,14 +223,7 @@ public class TestSparkActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-
-        JobClient jobClient =
-                
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
 
b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
index 6474092..abf5915 100644
--- 
a/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
+++ 
b/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
@@ -179,14 +179,8 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(getActionXml());
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
@@ -227,14 +221,8 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(getActionXmlEval());
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
@@ -263,14 +251,8 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         createDB();
 
         Context context = createContext(getActionXmlFreeFromQuery());
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
@@ -315,7 +297,7 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
     }
 
 
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         SqoopActionExecutor ae = new SqoopActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -329,24 +311,7 @@ public class TestSqoopActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1");
-        XConfiguration conf = new XConfiguration(
-                new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("mapreduce.framework.name", "yarn");
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
     private Context createContext(String actionXml) throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index d4095da..53330ce 100644
--- 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -18,45 +18,17 @@
 
 package org.apache.oozie.action.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.streaming.StreamJob;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.command.wf.StartXCommand;
-import org.apache.oozie.command.wf.SubmitXCommand;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.ClassUtils;
-import org.jdom.Element;
-
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.Writer;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.StringReader;
+import java.io.Writer;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.List;
@@ -67,14 +39,44 @@ import java.util.jar.JarOutputStream;
 import java.util.regex.Pattern;
 import java.util.zip.ZipEntry;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.command.wf.StartXCommand;
+import org.apache.oozie.command.wf.SubmitXCommand;
+import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
+import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClassUtils;
+import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
 
 public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
 
+    private static final String PIPES = "pipes";
+    private static final String MAP_REDUCE = "map-reduce";
+
     @Override
     protected void setSystemProps() throws Exception {
         super.setSystemProps();
@@ -212,10 +214,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
          assertEquals("global-output-dir", actionConf.get("outputDir"));
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethods() throws Exception {
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
-        assertEquals(Arrays.asList(StreamingMain.class), 
ae.getLauncherClasses());
+        List<Class<?>> classes = Arrays.<Class<?>>asList(StreamingMain.class);
+        assertEquals(classes, ae.getLauncherClasses());
 
         Element actionXml = XmlUtils.parseXml("<map-reduce>" + "<job-tracker>" 
+ getJobTrackerUri() + "</job-tracker>"
                 + "<name-node>" + getNameNodeUri() + "</name-node>" + 
"<configuration>"
@@ -226,7 +228,6 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         XConfiguration protoConf = new XConfiguration();
         protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
 
-
         WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
         WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
         action.setType(ae.getType());
@@ -386,7 +387,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -408,29 +409,21 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         conf.set("fs.default.name", e.getChildTextTrim("name-node"));
         conf.set("user.name", context.getProtoActionConf().get("user.name"));
         conf.set("group.name", getTestGroup());
-
         conf.set("mapreduce.framework.name", "yarn");
+
         JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
         XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        ae.submitLauncher(getFileSystem(), context, context.getAction());
+        return context.getAction().getExternalId();
     }
 
     private String _testSubmit(String name, String actionXml) throws Exception 
{
 
         Context context = createContext(name, actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -441,7 +434,6 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
 
         JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
-        String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
         final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
@@ -453,7 +445,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -471,17 +463,27 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         return mrJob.getID().toString();
     }
 
+    private void _testSubmitError(String actionXml, String errorMessage) 
throws Exception {
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
+        MapReduceActionExecutor ae = new MapReduceActionExecutor();
+        ae.check(context, context.getAction());
+
+        assertEquals(JavaActionExecutor.FAILED_KILLED, 
context.getAction().getExternalStatus());
+
+        ae.end(context, context.getAction());
+        assertEquals(WorkflowAction.Status.ERROR, 
context.getAction().getStatus());
+        assertTrue(context.getAction().getErrorMessage().contains("already 
exists"));
+    }
+
     private void _testSubmitWithCredentials(String name, String actionXml) 
throws Exception {
 
-        Context context = createContextWithCredentials("map-reduce", 
actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        Context context = createContextWithCredentials(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -492,7 +494,6 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
 
         JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
-        String group = conf.get("group.name");
         JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
         final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
@@ -504,7 +505,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -555,7 +556,37 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "</map-reduce>";
-        _testSubmit("map-reduce", actionXml);
+        _testSubmit(MAP_REDUCE, actionXml);
+    }
+
+    public void testMapReduceActionError() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output1");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, 
"data.txt")));
+        ow.write("dummy\n");
+        ow.write("dummy\n");
+        ow.close();
+
+        String actionXml = "<map-reduce>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<configuration>" +
+                "<property><name>mapred.mapper.class</name><value>" + 
MapperReducerForTest.class.getName() +
+                "</value></property>" +
+                "<property><name>mapred.reducer.class</name><value>" + 
MapperReducerForTest.class.getName() +
+                "</value></property>" +
+                "<property><name>mapred.input.dir</name><value>" + inputDir + 
"</value></property>" +
+                "<property><name>mapred.output.dir</name><value>" + outputDir 
+ "</value></property>" +
+                "</configuration>" +
+                "</map-reduce>";
+
+        _testSubmitError(actionXml, "already exists");
     }
 
     public void testMapReduceWithConfigClass() throws Exception {
@@ -569,7 +600,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         w.write("dummy\n");
         w.close();
 
-        Path jobXml = new Path(getFsTestCaseDir(), "job.xml");
+        Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
         XConfiguration conf = getMapReduceConfig(inputDir.toString(), 
outputDir.toString());
         conf.set(MapperReducerForTest.JOB_XML_OUTPUT_LOCATION, 
jobXml.toUri().toString());
         conf.set("B", "b");
@@ -578,9 +609,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + conf.toXmlString(false)
                 + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
 
-        _testSubmit("map-reduce", actionXml);
+        _testSubmit(MAP_REDUCE, actionXml);
         Configuration conf2 = new Configuration(false);
         conf2.addResource(fs.open(jobXml));
+
         assertEquals("a", conf2.get("A"));
         assertEquals("c", conf2.get("B"));
     }
@@ -601,16 +633,9 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + getMapReduceConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false)
                 + 
"<config-class>org.apache.oozie.does.not.exist</config-class>" + 
"</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(120 * 2000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
                 context.getProtoActionConf());
@@ -638,16 +663,9 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + conf.toXmlString(false)
                 + "<config-class>" + 
OozieActionConfiguratorForTest.class.getName() + "</config-class>" + 
"</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(120 * 2000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        assertFalse(LauncherMapperHelper.isMainSuccessful(launcherJob));
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         final Map<String, String> actionData = 
LauncherMapperHelper.getActionData(fs, context.getActionDir(),
                 context.getProtoActionConf());
@@ -671,7 +689,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceCredentialsConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false)
                 + "</map-reduce>";
-        _testSubmitWithCredentials("map-reduce", actionXml);
+        _testSubmitWithCredentials(MAP_REDUCE, actionXml);
     }
 
     protected Path createAndUploadUberJar() throws Exception {
@@ -734,7 +752,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
                 + getNameNodeUri() + "</name-node>"
                 + getMapReduceUberJarConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "</map-reduce>";
-        String jobID = _testSubmit("map-reduce", actionXml);
+        String jobID = _testSubmit(MAP_REDUCE, actionXml);
 
         boolean containsLib1Jar = false;
         String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar";
@@ -914,7 +932,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                     + "#wordcount-simple" + "</program>" + "      </pipes>"
                     + getPipesConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "<file>"
                     + programPath + "</file>" + "</map-reduce>";
-            _testSubmit("pipes", actionXml);
+            _testSubmit(PIPES, actionXml);
         }
         else {
             System.out.println(
@@ -948,15 +966,9 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "true")
                         .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
@@ -981,7 +993,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1026,15 +1038,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "false")
                         .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -1057,7 +1064,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1098,15 +1105,10 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 + 
getOozieActionExternalStatsWriteProperty(inputDir.toString(), 
outputDir.toString(), "false")
                 .toXmlString(false) + "</map-reduce>";
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -1129,7 +1131,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         actionXml = "<map-reduce>"
@@ -1185,35 +1187,24 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
                 .append(mrConfig.toXmlString(false)).append("</map-reduce>");
         String actionXml = sb.toString();
 
-        Context context = createContext("map-reduce", actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(120 * 2000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
+        Context context = createContext(MAP_REDUCE, actionXml);
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        assertTrue(launcherJob.isSuccessful());
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 context.getProtoActionConf());
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
-        // Assert launcher job name has been set
-        System.out.println("Launcher job name: " + launcherJob.getJobName());
-        assertTrue(launcherJob.getJobName().equals(launcherJobName));
 
         MapReduceActionExecutor ae = new MapReduceActionExecutor();
         ae.check(context, context.getAction());
         assertTrue(launcherId.equals(context.getAction().getExternalId()));
 
-        JobConf conf = ae.createBaseHadoopConf(context,
-                XmlUtils.parseXml(actionXml));
+        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
         String user = conf.get("user.name");
 
         JobClient jobClient = Services.get().get(HadoopAccessorService.class)
                 .createJobClient(user, conf);
-        final RunningJob mrJob = jobClient.getJob(JobID.forName(context
-                .getAction().getExternalChildIDs()));
+        final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs()));
 
         waitFor(120 * 1000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -1223,7 +1214,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         assertTrue(mrJob.isSuccessful());
         ae.check(context, context.getAction());
 
-        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertEquals(JavaActionExecutor.SUCCEEDED, 
context.getAction().getExternalStatus());
         assertNull(context.getAction().getData());
 
         ae.end(context, context.getAction());
@@ -1304,7 +1295,7 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
 
         Element eActionXml = XmlUtils.parseXml(actionXml);
 
-        Context context = createContext("map-reduce", actionXml);
+        Context context = createContext(MAP_REDUCE, actionXml);
 
         Path appPath = getAppPath();
 

Reply via email to