OOZIE-2594 Make MapReduce action work, small refactors, remove RunningJob from 
test cases, test fixes. Follow up: OOZIE-2686

Change-Id: I797963d65bc248c81c9e9d0b2a48a68dd2bab5cf


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ca7e56fd
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ca7e56fd
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ca7e56fd

Branch: refs/heads/oya
Commit: ca7e56fdccbca80ce2f9b87812c15305ca9d09d0
Parents: 2fddebb
Author: Peter Bacsko <pbac...@cloudera.com>
Authored: Mon Sep 12 11:29:14 2016 +0200
Committer: Peter Bacsko <pbac...@cloudera.com>
Committed: Mon Sep 26 14:11:28 2016 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 117 ++++----
 .../action/hadoop/MapReduceActionExecutor.java  |  79 +++++-
 .../apache/oozie/servlet/CallbackServlet.java   |   4 +-
 .../action/hadoop/TestDistCpActionExecutor.java |  29 +-
 .../action/hadoop/TestJavaActionExecutor.java   |  12 +-
 .../hadoop/TestLauncherAMCallbackNotifier.java  | 148 ++++++----
 .../action/hadoop/TestMapReduceActionError.java | 173 ------------
 .../oozie/action/hadoop/TestOozieJobInfo.java   |  19 +-
 .../action/hadoop/TestShellActionExecutor.java  |  46 +---
 .../command/wf/TestActionCheckXCommand.java     |   7 +-
 .../command/wf/TestActionStartXCommand.java     |  26 +-
 .../java/org/apache/oozie/test/XFsTestCase.java |  27 ++
 .../java/org/apache/oozie/test/XTestCase.java   |  10 +-
 .../action/hadoop/TestHiveActionExecutor.java   |  39 +--
 .../action/hadoop/TestHive2ActionExecutor.java  |  40 +--
 .../apache/oozie/action/hadoop/LauncherAM.java  | 274 +++++++++++--------
 .../hadoop/LauncherAMCallbackNotifier.java      |  15 +-
 .../oozie/action/hadoop/LauncherMain.java       |   8 +-
 .../oozie/action/hadoop/MapReduceMain.java      |   1 -
 .../org/apache/oozie/action/hadoop/PigMain.java |   2 +-
 .../action/hadoop/TestPigActionExecutor.java    |  58 +---
 .../apache/oozie/action/hadoop/TestPyspark.java |  19 +-
 .../action/hadoop/TestSparkActionExecutor.java  |  21 +-
 .../action/hadoop/TestSqoopActionExecutor.java  |  51 +---
 .../hadoop/TestMapReduceActionExecutor.java     | 267 +++++++++---------
 25 files changed, 645 insertions(+), 847 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 6a28406..8637f64 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -18,6 +18,28 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
@@ -31,6 +53,7 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.token.Token;
@@ -48,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.WorkflowActionBean;
@@ -78,38 +102,21 @@ import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.ConnectException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.collect.ImmutableList;
 
 
 public class JavaActionExecutor extends ActionExecutor {
 
-    protected static final String HADOOP_USER = "user.name";
+    public static final String RUNNING = "RUNNING";
+    public static final String SUCCEEDED = "SUCCEEDED";
+    public static final String KILLED = "KILLED";
+    public static final String FAILED = "FAILED";
+    public static final String FAILED_KILLED = "FAILED/KILLED";
     public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
     public static final String HADOOP_NAME_NODE = "fs.default.name";
-    private static final String HADOOP_JOB_NAME = "mapred.job.name";
     public static final String OOZIE_COMMON_LIBDIR = "oozie";
-    private static final Set<String> DISALLOWED_PROPERTIES = new 
HashSet<String>();
-    public final static String MAX_EXTERNAL_STATS_SIZE = 
"oozie.external.stats.max.size";
+
+    public static final String MAX_EXTERNAL_STATS_SIZE = 
"oozie.external.stats.max.size";
     public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = 
"yarn.timeline-service.enabled";
@@ -120,24 +127,27 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String HADOOP_REDUCE_JAVA_OPTS = 
"mapreduce.reduce.java.opts";
     public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
     public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
+    public static final String HADOOP_JOB_CLASSLOADER = 
"mapreduce.job.classloader";
+    public static final String HADOOP_USER_CLASSPATH_FIRST = 
"mapreduce.user.classpath.first";
+    public static final String OOZIE_CREDENTIALS_SKIP = 
"oozie.credentials.skip";
     public static final String YARN_AM_RESOURCE_MB = 
"yarn.app.mapreduce.am.resource.mb";
     public static final String YARN_AM_COMMAND_OPTS = 
"yarn.app.mapreduce.am.command-opts";
     public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
-    private static final String JAVA_MAIN_CLASS_NAME = 
"org.apache.oozie.action.hadoop.JavaMain";
     public static final int YARN_MEMORY_MB_MIN = 512;
+
+    private static final String JAVA_MAIN_CLASS_NAME = 
"org.apache.oozie.action.hadoop.JavaMain";
+    private static final String HADOOP_JOB_NAME = "mapred.job.name";
+    private static final Set<String> DISALLOWED_PROPERTIES = new 
HashSet<String>();
+
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
     private static int maxFSGlobMax;
-    private static final String SUCCEEDED = "SUCCEEDED";
-    private static final String KILLED = "KILLED";
-    private static final String FAILED = "FAILED";
-    private static final String FAILED_KILLED = "FAILED/KILLED";
+
+    protected static final String HADOOP_USER = "user.name";
+
     protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = 
Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String HADOOP_JOB_CLASSLOADER = 
"mapreduce.job.classloader";
-    public static final String HADOOP_USER_CLASSPATH_FIRST = 
"mapreduce.user.classpath.first";
-    public static final String OOZIE_CREDENTIALS_SKIP = 
"oozie.credentials.skip";
 
     static {
         DISALLOWED_PROPERTIES.add(HADOOP_USER);
@@ -237,6 +247,13 @@ public class JavaActionExecutor extends ActionExecutor {
         conf.set(HADOOP_YARN_RM, jobTracker);
         conf.set(HADOOP_NAME_NODE, nameNode);
         conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
+
+        // FIXME - think about this!
+        Element e = actionXml.getChild("config-class", ns);
+        if (e != null) {
+            conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, 
e.getTextTrim());
+        }
+
         return conf;
     }
 
@@ -308,6 +325,7 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    // FIXME: is this needed?
     private HashMap<String, List<String>> populateEnvMap(String input) {
         HashMap<String, List<String>> envMaps = new HashMap<String, 
List<String>>();
         String[] envEntries = input.split(",");
@@ -918,7 +936,7 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    private void injectCallback(Context context, Configuration conf) {
+    protected void injectCallback(Context context, Configuration conf) {
         String callback = 
context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
         conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, 
callback);
     }
@@ -1109,6 +1127,7 @@ public class JavaActionExecutor extends ActionExecutor {
     private ApplicationSubmissionContext 
createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String 
user,
                                                                     Context 
context, Configuration actionConf)
             throws IOException, HadoopAccessorException, URISyntaxException {
+
         // Create launch context for app master
         ApplicationSubmissionContext appContext = 
Records.newRecord(ApplicationSubmissionContext.class);
 
@@ -1149,9 +1168,10 @@ public class JavaActionExecutor extends ActionExecutor {
         Map<String, String> env = new HashMap<String, String>();
         // This adds the Hadoop jars to the classpath in the Launcher JVM
         ClasspathUtils.setupClasspath(env, launcherJobConf);
-        if (false) {        // TODO: OYA: config to add MR jars?  Probably 
also needed for MR Action
-            ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
-        }
+
+        // FIXME: move this to specific places where it's actually needed - 
keeping it here for now
+        ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+
         amContainer.setEnvironment(env);
 
         // Set the command
@@ -1160,18 +1180,28 @@ public class JavaActionExecutor extends ActionExecutor {
                 + "/bin/java");
         // TODO: OYA: remove attach debugger to AM; useful for debugging
 //                    
vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
-        MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs);
-        vargs.add(LauncherAM.class.getName());
+
+        // FIXME: decide what to do with this method call - signature keeps 
changing
+        // MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs, 
null);
+
+        vargs.add("-Dlog4j.configuration=container-log4j.properties");
+        vargs.add("-Dlog4j.debug=true");
+        vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + 
ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+        vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 
1024 * 1024);
+        vargs.add("-Dhadoop.root.logger=INFO,CLA");
+        vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
+        vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
+        vargs.add("org.apache.oozie.action.hadoop.LauncherAM");  // note: 
using string temporarily so we don't have to depend on sharelib-oozie
         vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
                 Path.SEPARATOR + ApplicationConstants.STDOUT);
         vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
                 Path.SEPARATOR + ApplicationConstants.STDERR);
-        List<String> vargsFinal = new ArrayList<String>(6);
         StringBuilder mergedCommand = new StringBuilder();
         for (CharSequence str : vargs) {
             mergedCommand.append(str).append(" ");
         }
-        vargsFinal.add(mergedCommand.toString());
+
+        List<String> vargsFinal = ImmutableList.of(mergedCommand.toString());
         LOG.debug("Command to launch container for ApplicationMaster is : "
                 + mergedCommand);
         amContainer.setCommands(vargsFinal);
@@ -1405,11 +1435,6 @@ public class JavaActionExecutor extends ActionExecutor {
         return 
Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
     }
 
-    protected RunningJob getRunningJob(Context context, WorkflowAction action, 
JobClient jobClient) throws Exception{
-        RunningJob runningJob = 
jobClient.getJob(JobID.forName(action.getExternalId()));
-        return runningJob;
-    }
-
     /**
      * Useful for overriding in actions that do subsequent job runs
      * such as the MapReduce Action, where the launcher job is not the

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
 
b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 4553351..019c4d9 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -21,7 +21,9 @@ package org.apache.oozie.action.hadoop;
 import java.io.IOException;
 import java.io.StringReader;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -312,13 +314,80 @@ public class MapReduceActionExecutor extends 
JavaActionExecutor {
     }
 
     @Override
-    protected RunningJob getRunningJob(Context context, WorkflowAction action, 
JobClient jobClient) throws Exception{
+    protected void injectCallback(Context context, Configuration conf) {
+        // add callback for the MapReduce job
+        String callback = context.getCallbackUrl("$jobStatus");
+        if (conf.get("job.end.notification.url") != null) {
+            LOG.warn("Overriding the action job end notification URI");
+        }
+        conf.set("job.end.notification.url", callback);
+
+        super.injectCallback(context, conf);
+    }
 
-        RunningJob runningJob;
-        String jobId = getActualExternalId(action);
+    @Override
+    public void check(Context context, WorkflowAction action) throws 
ActionExecutorException {
+        Map<String, String> actionData = Collections.emptyMap();
+        JobConf jobConf = null;
 
-        runningJob = jobClient.getJob(JobID.forName(jobId));
+        try {
+            FileSystem actionFs = context.getAppFileSystem();
+            Element actionXml = XmlUtils.parseXml(action.getConf());
+            jobConf = createBaseHadoopConf(context, actionXml);
+            Path actionDir = context.getActionDir();
+            actionData = LauncherMapperHelper.getActionData(actionFs, 
actionDir, jobConf);
+        } catch (Exception e) {
+            LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
+            throw convertException(e);
+        }
 
-        return runningJob;
+        final String newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
+
+        // check the Hadoop job if newID is defined (which should be the case 
here) - otherwise perform the normal check()
+        if (newId != null) {
+            boolean jobCompleted;
+            JobClient jobClient = null;
+            boolean exception = false;
+
+            try {
+                jobClient = createJobClient(context, jobConf);
+                RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
+
+                if (runningJob == null) {
+                    context.setExternalStatus(FAILED);
+                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
+                            "Unknown hadoop job [{0}] associated with action 
[{1}].  Failing this action!", newId,
+                            action.getId());
+                }
+
+                jobCompleted = runningJob.isComplete();
+            } catch (Exception e) {
+                LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), 
e);
+                exception = true;
+                throw convertException(e);
+            } finally {
+                if (jobClient != null) {
+                    try {
+                        jobClient.close();
+                    } catch (Exception e) {
+                        if (exception) {
+                            LOG.error("JobClient error (not re-throwing due to 
a previous error): ", e);
+                        } else {
+                            throw convertException(e);
+                        }
+                    }
+                }
+            }
+
+            // run original check() if the MR action is completed or there are 
errors - otherwise mark it as RUNNING
+            if (jobCompleted || (!jobCompleted && 
actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
+                super.check(context, action);
+            } else {
+                context.setExternalStatus(RUNNING);
+                context.setExternalChildIDs(newId);
+            }
+        } else {
+            super.check(context, action);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java 
b/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java
index 6123021..86ba467 100644
--- a/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java
+++ b/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java
@@ -40,10 +40,12 @@ import org.apache.oozie.util.PropertiesUtils;
 import org.apache.oozie.util.XLog;
 
 public class CallbackServlet extends JsonRestServlet {
+    private static final long serialVersionUID = 6439106936153152786L;
+
     private static final String INSTRUMENTATION_NAME = "callback";
 
     private static final ResourceInfo RESOURCE_INFO =
-            new ResourceInfo("", Arrays.asList("POST", "GET"), 
Collections.EMPTY_LIST);
+            new ResourceInfo("", Arrays.asList("POST", "GET"), 
Collections.<ParameterInfo>emptyList());
 
     public final static String CONF_MAX_DATA_LEN = 
"oozie.servlet.CallbackServlet.max.data.len";
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
index e8a140f..c1f0e6f 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
@@ -25,18 +25,10 @@ import java.io.OutputStream;
 import java.util.Arrays;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.test.XTestCase.Predicate;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 
@@ -64,13 +56,8 @@ public class TestDistCpActionExecutor extends 
ActionExecutorTestCase{
                 "<arg>" + outputPath + "</arg>" +
                 "</distcp>";
         Context context = createContext(actionXml);
-        final RunningJob runningJob = submitAction(context);
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return runningJob.isComplete();
-            }
-        });
-        assertTrue(runningJob.isSuccessful());
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         waitFor(60 * 1000, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -139,7 +126,7 @@ public class TestDistCpActionExecutor extends 
ActionExecutorTestCase{
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    protected String submitAction(Context context) throws Exception {
         DistcpActionExecutor ae = new DistcpActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -154,14 +141,8 @@ public class TestDistCpActionExecutor extends 
ActionExecutorTestCase{
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-
-        JobClient jobClient =
-            
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        ae.submitLauncher(getFileSystem(), context, context.getAction());
+        return context.getAction().getExternalId();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 123eba5..8adc606 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -36,7 +35,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
-import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -44,14 +42,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -60,7 +53,6 @@ import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -582,9 +574,7 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         assertEquals(launcherId, runningJob2);
         assertEquals(launcherId, context.getAction().getExternalId());
 
-        waitUntilYarnAppCompletes(runningJob2);
-        //FIXME?????
-        waitUntilYarnAppState(launcherId, YarnApplicationState.FINISHED);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
index d0b4d5b..9ba04da 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -33,6 +33,22 @@ import java.util.Map;
 
 // A lot of this adapted from 
org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and 
org.apache.hadoop.mapred.TestJobEndNotifier
 public class TestLauncherAMCallbackNotifier extends XTestCase {
+    private EmbeddedServletContainer container;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        QueryServlet.lastQueryString = null;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (container != null) {
+            container.stop();
+        }
+
+        super.tearDown();
+    }
 
     public void testConfiguration() throws Exception {
         Configuration conf = new Configuration(false);
@@ -91,8 +107,9 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
         
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, 
"5000");
 
         LauncherAMCallbackNotifier cnSpy = Mockito.spy(new 
LauncherAMCallbackNotifier(conf));
+
         long start = System.currentTimeMillis();
-        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
         long end = System.currentTimeMillis();
         Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
         Assert.assertTrue("Should have taken more than 5 seconds but it only 
took " + (end - start), end - start >= 5000);
@@ -103,68 +120,93 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
 
         cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
         start = System.currentTimeMillis();
-        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
         end = System.currentTimeMillis();
         Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
         Assert.assertTrue("Should have taken more than 9 seconds but it only 
took " + (end - start), end - start >= 9000);
     }
 
     public void testNotifyTimeout() throws Exception {
-        EmbeddedServletContainer container = null;
-        try {
-            container = new EmbeddedServletContainer("blah");
-            Map<String, String> params = new HashMap<String, String>();
-            params.put(HangServlet.SLEEP_TIME_MS, "1000000");
-            container.addServletEndpoint("/hang/*", HangServlet.class, params);
-            container.start();
-
-            Configuration conf = new Configuration(false);
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 
"0");
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
-            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, 
container.getServletURL("/hang/*"));
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, 
"5000");
-
-            LauncherAMCallbackNotifier cnSpy = Mockito.spy(new 
LauncherAMCallbackNotifier(conf));
-            long start = System.currentTimeMillis();
-            cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
-            long end = System.currentTimeMillis();
-            Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
-            Assert.assertTrue("Should have taken more than 5 seconds but it 
only took " + (end - start), end - start >= 5000);
-        } finally {
-            if (container != null) {
-                container.stop();
-            }
-        }
+        Map<String, String> params = new HashMap<String, String>();
+        params.put(HangServlet.SLEEP_TIME_MS, "1000000");
+        Configuration conf = setupEmbeddedContainer(HangServlet.class, 
"/hang/*", "/hang/*", params);
+
+        LauncherAMCallbackNotifier cnSpy = Mockito.spy(new 
LauncherAMCallbackNotifier(conf));
+        long start = System.currentTimeMillis();
+        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        long end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 5 seconds but it only 
took " + (end - start), end - start >= 5000);
     }
 
     public void testNotify() throws Exception {
-        EmbeddedServletContainer container = null;
-        try {
-            container = new EmbeddedServletContainer("blah");
-            container.addServletEndpoint("/count/*", QueryServlet.class);
-            container.start();
-
-            Configuration conf = new Configuration(false);
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 
"0");
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
-            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, 
container.getServletURL("/count/?status=$jobStatus"));
-            
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, 
"5000");
-
-            LauncherAMCallbackNotifier cn = new 
LauncherAMCallbackNotifier(conf);
-            QueryServlet.lastQueryString = null;
-            assertNull(QueryServlet.lastQueryString);
-            cn.notifyURL(FinalApplicationStatus.SUCCEEDED);
-            waitFor(5000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return 
"status=SUCCEEDED".equals(QueryServlet.lastQueryString);
-                }
-            });
-            assertEquals("status=SUCCEEDED", QueryServlet.lastQueryString);
-        } finally {
-            if (container != null) {
-                container.stop();
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, 
"/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        
waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString());
+    }
+
+    public void testNotifyBackgroundActionWhenSubmitSucceeds() throws 
Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, 
"/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(FinalApplicationStatus.SUCCEEDED, true);
+        waitForCallbackAndCheckResult("RUNNING");
+    }
+
+    public void testNotifyBackgroundActionWhenSubmitFailsWithKilled() throws 
Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, 
"/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(FinalApplicationStatus.KILLED, true);
+        
waitForCallbackAndCheckResult(FinalApplicationStatus.KILLED.toString());
+    }
+
+    public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws 
Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, 
"/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(FinalApplicationStatus.FAILED, true);
+        
waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString());
+    }
+
+    private Configuration setupEmbeddedContainer(Class<?> servletClass, String 
servletEndPoint, String servletUrl, Map<String, String> params) throws 
Exception {
+        container = new EmbeddedServletContainer("test");
+        if (servletEndPoint != null) {
+            if (params != null) {
+                container.addServletEndpoint(servletEndPoint, servletClass, 
params);
+            } else {
+                container.addServletEndpoint(servletEndPoint, servletClass);
             }
         }
+        container.start();
+
+        Configuration conf = new Configuration(false);
+        
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 
"0");
+        
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, 
container.getServletURL(servletUrl));
+        
conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, 
"5000");
+
+        return conf;
+    }
+
+    private void waitForCallbackAndCheckResult(final String expectedResult) {
+        waitFor(5000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return ("status=" + 
expectedResult).equals(QueryServlet.lastQueryString);
+            }
+        });
+
+        assertEquals("status="  + expectedResult, 
QueryServlet.lastQueryString);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
deleted file mode 100644
index 4cda615..0000000
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.jdom.Element;
-
-import java.io.File;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.StringReader;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-
-public class TestMapReduceActionError extends ActionExecutorTestCase {
-
-    @Override
-    protected void setSystemProps() throws Exception {
-        super.setSystemProps();
-        setSystemProperty("oozie.service.ActionService.executor.classes", 
MapReduceActionExecutor.class.getName());
-    }
-
-    private Context createContext(String actionXml) throws Exception {
-        JavaActionExecutor ae = new JavaActionExecutor();
-
-        Path appJarPath = new Path("lib/test.jar");
-        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), 
"test.jar", MapperReducerForTest.class);
-        InputStream is = new FileInputStream(jarFile);
-        OutputStream os = getFileSystem().create(new Path(getAppPath(), 
"lib/test.jar"));
-        IOUtils.copyStream(is, os);
-
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, 
appJarPath.toString());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
-        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
-        action.setType(ae.getType());
-        action.setConf(actionXml);
-
-        return new Context(wf, action);
-    }
-
-    private RunningJob submitAction(Context context) throws Exception {
-        MapReduceActionExecutor ae = new MapReduceActionExecutor();
-
-        WorkflowAction action = context.getAction();
-
-        ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action);
-
-        String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        conf.set("mapreduce.framework.name", "yarn");
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
-    }
-
-    private void _testSubmit(String actionXml) throws Exception {
-
-        Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-
-        MapReduceActionExecutor ae = new MapReduceActionExecutor();
-        ae.check(context, context.getAction());
-
-        JobConf conf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
-        String user = conf.get("user.name");
-        String group = conf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-        final RunningJob mrJob = 
jobClient.getJob(JobID.forName(context.getAction().getExternalId()));
-
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return mrJob.isComplete();
-            }
-        });
-        ae.check(context, context.getAction());
-
-        assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
-
-        ae.end(context, context.getAction());
-        assertEquals(WorkflowAction.Status.ERROR, 
context.getAction().getStatus());
-        assertTrue(context.getAction().getErrorMessage().contains("already 
exists"));
-    }
-
-    public void testMapReduce() throws Exception {
-        FileSystem fs = getFileSystem();
-
-        Path inputDir = new Path(getFsTestCaseDir(), "input");
-        Path outputDir = new Path(getFsTestCaseDir(), "output1");
-
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, 
"data.txt")));
-        ow.write("dummy\n");
-        ow.write("dummy\n");
-        ow.close();
-
-        String actionXml = "<map-reduce>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<configuration>" +
-                "<property><name>mapred.mapper.class</name><value>" + 
MapperReducerForTest.class.getName() +
-                "</value></property>" +
-                "<property><name>mapred.reducer.class</name><value>" + 
MapperReducerForTest.class.getName() +
-                "</value></property>" +
-                "<property><name>mapred.input.dir</name><value>" + inputDir + 
"</value></property>" +
-                "<property><name>mapred.output.dir</name><value>" + outputDir 
+ "</value></property>" +
-                "</configuration>" +
-                "</map-reduce>";
-        _testSubmit(actionXml);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
index 5bc7d00..9efacdd 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
@@ -33,19 +33,13 @@ import java.util.regex.Matcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
-import org.apache.oozie.action.hadoop.MapperReducerForTest;
-import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
@@ -53,19 +47,16 @@ import org.apache.oozie.command.bundle.BundleStartXCommand;
 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.command.wf.JobXCommand;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import 
org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
-import 
org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
@@ -165,12 +156,9 @@ public class TestOozieJobInfo extends XDataTestCase {
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
         JobConf conf = actionExecutor.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionList.get(1).getConf()));
         String user = conf.get("user.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-        String launcherId = actionList.get(1).getExternalId();
 
-        final RunningJob launcherJob = 
jobClient.getJob(JobID.forName(launcherId));
-        FileSystem fs = context.getAppFileSystem();
-        Configuration jobXmlConf = new XConfiguration(fs.open(new 
Path(launcherJob.getJobFile())));
+        FileSystem fs = getFileSystem();
+        Configuration jobXmlConf = new 
XConfiguration(fs.open(getPathToWorkflowResource(user, wfbean, services, 
context, LauncherAM.LAUNCHER_JOB_CONF_XML)));
         String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY);
 
         // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME;
@@ -186,7 +174,6 @@ public class TestOozieJobInfo extends XDataTestCase {
         assertTrue(jobInfo.contains(",testing=test,"));
         assertTrue(jobInfo.contains(",coord.nominal.time="));
         assertTrue(jobInfo.contains("launcher=true"));
-
     }
 
     protected void setCoordConf(Configuration jobConf) throws IOException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
index 9468fad..3354b3a 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
@@ -229,14 +229,8 @@ public class TestShellActionExecutor extends 
ActionExecutorTestCase {
 
         Context context = createContext(actionXml);
         // Submit the action
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(180 * 1000, new Predicate() { // Wait for the external job to
-                    // finish
-                    public boolean evaluate() throws Exception {
-                        return launcherJob.isComplete();
-                    }
-                });
-
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         ShellActionExecutor ae = new ShellActionExecutor();
         WorkflowAction action = context.getAction();
         ae.check(context, action);
@@ -258,19 +252,10 @@ public class TestShellActionExecutor extends 
ActionExecutorTestCase {
     private WorkflowAction _testSubmit(String actionXml, boolean 
checkForSuccess, String capture_output) throws Exception {
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);// Submit the
+        final String launcherId = submitAction(context);// Submit the
         // action
-        String launcherId = context.getAction().getExternalId(); // Get LM id
-        waitFor(180 * 1000, new Predicate() { // Wait for the external job to
-                    // finish
-                    public boolean evaluate() throws Exception {
-                        return launcherJob.isComplete();
-                    }
-                });
-        // Thread.sleep(2000);
-        assertTrue(launcherJob.isSuccessful());
-
-        sleep(2000);// Wait more to make sure no ID swap happens
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
@@ -334,14 +319,13 @@ public class TestShellActionExecutor extends 
ActionExecutorTestCase {
      * @return The RunningJob of the Launcher Mapper
      * @throws Exception
      */
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         ShellActionExecutor ae = new ShellActionExecutor();
 
         WorkflowAction action = context.getAction();
 
         ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action); // Submit the
-        // Launcher Mapper
+        ae.submitLauncher(getFileSystem(), context, action); // Submit the 
action
 
         String jobId = action.getExternalId();
         String jobTracker = action.getTrackerUri();
@@ -351,20 +335,6 @@ public class TestShellActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf = new XConfiguration();
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 0e1d0fd..d1f458b 100644
--- 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -29,8 +29,6 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.yarn.api.records.YarnApplicationState;
-import org.apache.oozie.ForTestingActionExecutor;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -46,7 +44,6 @@ import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.service.ActionCheckerService;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -266,9 +263,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
 
         String launcherId = action.getExternalId();
 
-        waitUntilYarnAppCompletes(launcherId);
-        YarnApplicationState appState = getYarnApplicationState(launcherId);
-        assertEquals("YarnApplicationState", YarnApplicationState.FINISHED, 
appState);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
index b7489e9..28e4755 100644
--- 
a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
@@ -30,8 +30,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.LauncherMapperHelper;
@@ -163,19 +161,10 @@ public class TestActionStartXCommand extends 
XDataTestCase {
         ActionExecutorContext context = new 
ActionXCommand.ActionExecutorContext(job, action, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
         JobConf conf = actionExecutor.createBaseHadoopConf(context, 
XmlUtils.parseXml(action.getConf()));
-        String user = conf.get("user.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
         String launcherId = action.getExternalId();
 
-        final RunningJob launcherJob = 
jobClient.getJob(JobID.forName(launcherId));
-
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
@@ -240,21 +229,10 @@ public class TestActionStartXCommand extends 
XDataTestCase {
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
         JobConf conf = actionExecutor.createBaseHadoopConf(context, 
XmlUtils.parseXml(action.getConf()));
         String user = conf.get("user.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
         String launcherId = action.getExternalId();
 
-        // retrieve launcher job
-        final RunningJob launcherJob = 
jobClient.getJob(JobID.forName(launcherId));
-
-        // time out after 120 seconds unless launcher job succeeds
-        waitFor(240 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        // check if launcher job succeeds
-        assertTrue(launcherJob.isSuccessful());
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
         assertTrue(LauncherMapperHelper.hasIdSwap(actionData));

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
index 1d399e4..20529e8 100644
--- a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java
@@ -28,11 +28,16 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
 import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 /**
  * Base JUnit <code>TestCase</code> subclass used by all Oozie testcases that 
need Hadoop FS access. <p/> As part of its
@@ -175,4 +180,26 @@ public abstract class XFsTestCase extends XTestCase {
         return has.createJobClient(getTestUser(), conf);
     }
 
+    /**
+     * Returns a Path object to a filesystem resource which belongs to a 
specific workflow on HDFS
+     * Example: 
/user/test/oozie-abcd/0000003-160913132555310-oozie-abcd-W/hadoop--map-reduce/launcher.xml
+     *
+     * @param userName current username
+     * @param action workflow Action object
+     * @param services Oozie Services class
+     * @param context Executor context
+     * @param fileName the filename
+     * @return the Path object which represents a file on HDFS
+     * @throws Exception
+     */
+    protected Path getPathToWorkflowResource(String userName, WorkflowJob job, 
Services services, ActionExecutorContext context, String fileName) throws 
Exception {
+        return new Path(
+                "/user" +
+                "/" + userName +
+                "/" + services.getSystemId() +
+                "/" + job.getId() +
+                "/" + context.getActionDir().getName(),
+                fileName
+                );
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index 7d8c48f..1299fa3 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -1192,11 +1192,12 @@ public abstract class XTestCase extends TestCase {
         return services;
     }
 
-    protected void waitUntilYarnAppState(String externalId, final 
YarnApplicationState... acceptedStates)
+    protected YarnApplicationState waitUntilYarnAppState(String externalId, 
final YarnApplicationState... acceptedStates)
             throws HadoopAccessorException, IOException, YarnException {
         final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
         final Set<YarnApplicationState> states = 
Sets.immutableEnumSet(Lists.newArrayList(acceptedStates));
         final MutableBoolean endStateOK = new MutableBoolean(false);
+        final MutableObject<YarnApplicationState> finalState = new 
MutableObject<YarnApplicationState>();
 
         JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
         // This is needed here because we need a mutable final YarnClient
@@ -1207,6 +1208,7 @@ public abstract class XTestCase extends TestCase {
                 @Override
                 public boolean evaluate() throws Exception {
                      YarnApplicationState state = 
yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState();
+                     finalState.setValue(state);
 
                      if (states.contains(state)) {
                          endStateOK.setValue(true);
@@ -1223,10 +1225,12 @@ public abstract class XTestCase extends TestCase {
         }
 
         assertTrue(endStateOK.isTrue());
+        return finalState.getValue();
     }
 
-    protected void waitUntilYarnAppCompletes(String externalId) throws 
HadoopAccessorException, IOException, YarnException {
-        waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, 
YarnApplicationState.KILLED, YarnApplicationState.FINISHED);
+    protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) 
throws HadoopAccessorException, IOException, YarnException {
+        YarnApplicationState state = waitUntilYarnAppState(externalId, 
YarnApplicationState.FAILED, YarnApplicationState.KILLED, 
YarnApplicationState.FINISHED);
+        assertEquals("YARN App state", YarnApplicationState.FINISHED, state);
     }
 
     protected YarnApplicationState getYarnApplicationState(String externalId) 
throws HadoopAccessorException, IOException, YarnException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
 
b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
index b966d4b..ec53ba9 100644
--- 
a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
+++ 
b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
@@ -164,14 +164,8 @@ public class TestHiveActionExecutor extends 
ActionExecutorTestCase {
             dataWriter.close();
             Context context = createContext(getActionScriptXml());
             Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.2");
-            final RunningJob launcherJob = submitAction(context, ns);
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            final String launcherId = submitAction(context, ns);
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
             Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
@@ -198,14 +192,8 @@ public class TestHiveActionExecutor extends 
ActionExecutorTestCase {
         {
             Context context = createContext(getActionQueryXml(hiveScript));
             Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.6");
-            final RunningJob launcherJob = submitAction(context, ns);
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            final String launcherId = submitAction(context, ns);
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
             Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
@@ -231,7 +219,7 @@ public class TestHiveActionExecutor extends 
ActionExecutorTestCase {
         }
     }
 
-    private RunningJob submitAction(Context context, Namespace ns) throws 
Exception {
+    private String submitAction(Context context, Namespace ns) throws 
Exception {
         HiveActionExecutor ae = new HiveActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -245,22 +233,9 @@ public class TestHiveActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
 
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        return jobId;
     }
 
     private String copyJar(String targetFile, Class<?> anyContainedClass)

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
 
b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
index 5963e42..5e71f12 100644
--- 
a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
+++ 
b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
@@ -205,16 +205,9 @@ public class TestHive2ActionExecutor extends 
ActionExecutorTestCase {
             dataWriter.write(SAMPLE_DATA_TEXT);
             dataWriter.close();
             Context context = createContext(getQueryActionXml(query));
-            final RunningJob launcherJob = submitAction(context,
+            final String launcherId = submitAction(context,
                 Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
             Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
@@ -244,16 +237,9 @@ public class TestHive2ActionExecutor extends 
ActionExecutorTestCase {
             dataWriter.write(SAMPLE_DATA_TEXT);
             dataWriter.close();
             Context context = createContext(getScriptActionXml());
-            final RunningJob launcherJob = submitAction(context,
+            final String launcherId = submitAction(context,
                 Namespace.getNamespace("uri:oozie:hive2-action:0.1"));
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
             Map<String, String> actionData = 
LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
@@ -275,7 +261,7 @@ public class TestHive2ActionExecutor extends 
ActionExecutorTestCase {
         }
     }
 
-    private RunningJob submitAction(Context context, Namespace ns) throws 
Exception {
+    private String submitAction(Context context, Namespace ns) throws 
Exception {
         Hive2ActionExecutor ae = new Hive2ActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -289,21 +275,7 @@ public class TestHive2ActionExecutor extends 
ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
     private Context createContext(String actionXml) throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index a1998e2..43ce520 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -17,22 +17,6 @@
  */
 package org.apache.oozie.action.hadoop;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
-import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.codehaus.jackson.map.Module.SetupContext;
-import org.xml.sax.SAXException;
-
-import javax.xml.parsers.ParserConfigurationException;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
@@ -43,13 +27,29 @@ import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.Permission;
-import java.util.Date;
+import java.security.PrivilegedAction;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.base.Preconditions;
 
 public class LauncherAM {
 
@@ -125,8 +125,14 @@ public class LauncherAM {
 
     // TODO: OYA: rethink all print messages and formatting
     public static void main(String[] AMargs) throws Exception {
-        ErrorHolder eHolder = new ErrorHolder();
+        final ErrorHolder eHolder = new ErrorHolder();
         FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+        String submitterUser = System.getProperty("submitter.user", "").trim();
+        Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user 
is undefined");
+        System.out.println("Submitter user is: " + submitterUser);
+        UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(submitterUser);
+        boolean backgroundAction = false;
+
         try {
             try {
                 launcherJobConf = readLauncherConf();
@@ -143,7 +149,7 @@ public class LauncherAM {
 
             try {
                 System.out.println("\nStarting the execution of prepare 
actions");
-                executePrepare();
+                executePrepare(ugi);
                 System.out.println("Completed the execution of prepare actions 
successfully");
             } catch (Exception ex) {
                 eHolder.setErrorMessage("Prepare execution in the Launcher AM 
has failed");
@@ -151,7 +157,7 @@ public class LauncherAM {
                 throw ex;
             }
 
-            String[] mainArgs = getMainArguments(launcherJobConf);
+            final String[] mainArgs = getMainArguments(launcherJobConf);
 
             // TODO: OYA: should we allow turning this off?
             // TODO: OYA: what should default be?
@@ -161,7 +167,8 @@ public class LauncherAM {
 
             setupMainConfiguration();
 
-            finalStatus = runActionMain(mainArgs, eHolder);
+            finalStatus = runActionMain(mainArgs, eHolder, ugi);
+
             if (finalStatus == FinalApplicationStatus.SUCCEEDED) {
                 handleActionData();
                 if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
@@ -180,6 +187,7 @@ public class LauncherAM {
                     System.out.println(actionData.get(ACTION_DATA_NEW_ID));
                     System.out.println("=======================");
                     System.out.println();
+                    backgroundAction = true;
                 }
             }
         } catch (Exception e) {
@@ -193,13 +201,13 @@ public class LauncherAM {
                 if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
                     failLauncher(eHolder);
                 }
-                uploadActionDataToHDFS();
+                uploadActionDataToHDFS(ugi);
             } finally {
                 try {
                     unregisterWithRM(finalStatus, eHolder.getErrorMessage());
                 } finally {
                     LauncherAMCallbackNotifier cn = new 
LauncherAMCallbackNotifier(launcherJobConf);
-                    cn.notifyURL(finalStatus);
+                    cn.notifyURL(finalStatus, backgroundAction);
                 }
             }
         }
@@ -240,16 +248,31 @@ public class LauncherAM {
     }
 
     // Method to execute the prepare actions
-    private static void executePrepare() throws IOException, 
LauncherException, ParserConfigurationException, SAXException {
-        String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
-        if (prepareXML != null) {
-            if (prepareXML.length() != 0) {
-                Configuration actionConf = new Configuration(launcherJobConf);
-                actionConf.addResource(ACTION_CONF_XML);
-                PrepareActionsDriver.doOperations(prepareXML, actionConf);
-            } else {
-                System.out.println("There are no prepare actions to execute.");
+    private static void executePrepare(UserGroupInformation ugi) throws 
Exception {
+        Exception e = ugi.doAs(new PrivilegedAction<Exception>() {
+            @Override
+            public Exception run() {
+                try {
+                    String prepareXML = 
launcherJobConf.get(ACTION_PREPARE_XML);
+                    if (prepareXML != null) {
+                        if (prepareXML.length() != 0) {
+                            Configuration actionConf = new 
Configuration(launcherJobConf);
+                            actionConf.addResource(ACTION_CONF_XML);
+                            PrepareActionsDriver.doOperations(prepareXML, 
actionConf);
+                        } else {
+                            System.out.println("There are no prepare actions 
to execute.");
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    return e;
+                }
+                return null;
             }
+        });
+
+        if (e != null) {
+            throw e;
         }
     }
 
@@ -282,65 +305,74 @@ public class LauncherAM {
 //        }
     }
 
-    private static FinalApplicationStatus runActionMain(String[] mainArgs, 
ErrorHolder eHolder) {
-        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
-        LauncherSecurityManager secMan = new LauncherSecurityManager();
-        try {
-            Class<?> klass = 
launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
-            System.out.println("Launcher class: " + klass.toString());
-            System.out.flush();
-            Method mainMethod = klass.getMethod("main", String[].class);
-            // Enable LauncherSecurityManager to catch System.exit calls
-            secMan.set();
-            mainMethod.invoke(null, (Object) mainArgs);
-
-            System.out.println();
-            System.out.println("<<< Invocation of Main class completed <<<");
-            System.out.println();
-            finalStatus = FinalApplicationStatus.SUCCEEDED;
-        } catch (InvocationTargetException ex) {
-            ex.printStackTrace(System.out);
-            // Get what actually caused the exception
-            Throwable cause = ex.getCause();
-            // If we got a JavaMainException from JavaMain, then we need to 
unwrap it
-            if (JavaMainException.class.isInstance(cause)) {
-                cause = cause.getCause();
-            }
-            if (LauncherMainException.class.isInstance(cause)) {
-                String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-                eHolder.setErrorMessage("Main Class [" + mainClass + "], exit 
code [" +
-                        ((LauncherMainException) ex.getCause()).getErrorCode() 
+ "]");
-            } else if (SecurityException.class.isInstance(cause)) {
-                if (secMan.getExitInvoked()) {
-                    System.out.println("Intercepting System.exit(" + 
secMan.getExitCode()
-                            + ")");
-                    System.err.println("Intercepting System.exit(" + 
secMan.getExitCode()
-                            + ")");
-                    // if 0 main() method finished successfully
-                    // ignoring
-                    eHolder.setErrorCode(secMan.getExitCode());
-                    if (eHolder.getErrorCode() != 0) {
+    private static FinalApplicationStatus runActionMain(final String[] 
mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) {
+        final AtomicReference<FinalApplicationStatus> finalStatus = new 
AtomicReference<FinalApplicationStatus>(FinalApplicationStatus.FAILED);
+
+        ugi.doAs(new PrivilegedAction<Void>() {
+            @Override
+            public Void run() {
+                LauncherSecurityManager secMan = new LauncherSecurityManager();
+                try {
+                    Class<?> klass = 
launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
+                    System.out.println("Launcher class: " + klass.toString());
+                    System.out.flush();
+                    Method mainMethod = klass.getMethod("main", 
String[].class);
+                    // Enable LauncherSecurityManager to catch System.exit 
calls
+                    secMan.set();
+                    mainMethod.invoke(null, (Object) mainArgs);
+
+                    System.out.println();
+                    System.out.println("<<< Invocation of Main class completed 
<<<");
+                    System.out.println();
+                    finalStatus.set(FinalApplicationStatus.SUCCEEDED);
+                } catch (InvocationTargetException ex) {
+                    ex.printStackTrace(System.out);
+                    // Get what actually caused the exception
+                    Throwable cause = ex.getCause();
+                    // If we got a JavaMainException from JavaMain, then we 
need to unwrap it
+                    if (JavaMainException.class.isInstance(cause)) {
+                        cause = cause.getCause();
+                    }
+                    if (LauncherMainException.class.isInstance(cause)) {
                         String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-                        eHolder.setErrorMessage("Main Class [" + mainClass + 
"], exit code [" + eHolder.getErrorCode() + "]");
+                        eHolder.setErrorMessage("Main Class [" + mainClass + 
"], exit code [" +
+                                ((LauncherMainException) 
ex.getCause()).getErrorCode() + "]");
+                    } else if (SecurityException.class.isInstance(cause)) {
+                        if (secMan.getExitInvoked()) {
+                            System.out.println("Intercepting System.exit(" + 
secMan.getExitCode()
+                                    + ")");
+                            System.err.println("Intercepting System.exit(" + 
secMan.getExitCode()
+                                    + ")");
+                            // if 0 main() method finished successfully
+                            // ignoring
+                            eHolder.setErrorCode(secMan.getExitCode());
+                            if (eHolder.getErrorCode() != 0) {
+                                String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                                eHolder.setErrorMessage("Main Class [" + 
mainClass + "], exit code [" + eHolder.getErrorCode() + "]");
+                            } else {
+                                
finalStatus.set(FinalApplicationStatus.SUCCEEDED);
+                            }
+                        }
                     } else {
-                        finalStatus = FinalApplicationStatus.SUCCEEDED;
+                        eHolder.setErrorMessage(cause.getMessage());
+                        eHolder.setErrorCause(cause);
                     }
+                } catch (Throwable t) {
+                    t.printStackTrace();
+                    eHolder.setErrorMessage(t.getMessage());
+                    eHolder.setErrorCause(t);
+                } finally {
+                    System.out.flush();
+                    System.err.flush();
+                    // Disable LauncherSecurityManager
+                    secMan.unset();
                 }
-            } else {
-                eHolder.setErrorMessage(cause.getMessage());
-                eHolder.setErrorCause(cause);
+
+                return null;
             }
-        } catch (Throwable t) {
-            t.printStackTrace(System.out);
-            eHolder.setErrorMessage(t.getMessage());
-            eHolder.setErrorCause(t);
-        } finally {
-            System.out.flush();
-            System.err.flush();
-            // Disable LauncherSecurityManager
-            secMan.unset();
-        }
-        return finalStatus;
+        });
+
+        return finalStatus.get();
     }
 
     private static void handleActionData() throws IOException {
@@ -410,40 +442,52 @@ public class LauncherAM {
         return sb.toString();
     }
 
-    private static void uploadActionDataToHDFS() throws IOException {
-        Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE);
-        // unused ??
-        FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf);
-        // upload into sequence file
-        System.out.println("Oozie Launcher, uploading action data to HDFS 
sequence file: "
-                + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri());
+    private static void uploadActionDataToHDFS(UserGroupInformation ugi) 
throws IOException {
+        IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() {
+            @Override
+            public IOException run() {
+                Path finalPath = new Path(actionDir, 
ACTION_DATA_SEQUENCE_FILE);
+                // upload into sequence file
+                System.out.println("Oozie Launcher, uploading action data to 
HDFS sequence file: "
+                        + new Path(actionDir, 
ACTION_DATA_SEQUENCE_FILE).toUri());
 
-        SequenceFile.Writer wr = null;
-        try {
-            wr = SequenceFile.createWriter(launcherJobConf,
-                    SequenceFile.Writer.file(finalPath),
-                    SequenceFile.Writer.keyClass(Text.class),
-                    SequenceFile.Writer.valueClass(Text.class));
-            if (wr != null) {
-                Set<String> keys = actionData.keySet();
-                for (String propsKey : keys) {
-                    wr.append(new Text(propsKey), new 
Text(actionData.get(propsKey)));
+                SequenceFile.Writer wr = null;
+                try {
+                    wr = SequenceFile.createWriter(launcherJobConf,
+                            SequenceFile.Writer.file(finalPath),
+                            SequenceFile.Writer.keyClass(Text.class),
+                            SequenceFile.Writer.valueClass(Text.class));
+                    if (wr != null) {
+                        Set<String> keys = actionData.keySet();
+                        for (String propsKey : keys) {
+                            wr.append(new Text(propsKey), new 
Text(actionData.get(propsKey)));
+                        }
+                    } else {
+                        throw new IOException("SequenceFile.Writer is null for 
" + finalPath);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return e;
+                } finally {
+                    if (wr != null) {
+                        try {
+                            wr.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            return e;
+                        }
+                    }
                 }
+
+                return null;
             }
-            else {
-                throw new IOException("SequenceFile.Writer is null for " + 
finalPath);
-            }
-        }
-        catch(IOException e) {
-            e.printStackTrace();
-            throw e;
-        }
-        finally {
-            if (wr != null) {
-                wr.close();
-            }
+        });
+
+        if (ioe != null) {
+            throw ioe;
         }
     }
+
     private static void failLauncher(int errorCode, String message, Throwable 
ex) {
         ErrorHolder eHolder = new ErrorHolder();
         eHolder.setErrorCode(errorCode);

Reply via email to