Repository: oozie
Updated Branches:
  refs/heads/master 7f59c8769 -> d5c4f3b7b


OOZIE-3298 [MapReduce action] External ID is not filled properly and failing MR 
job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5c4f3b7
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5c4f3b7
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5c4f3b7

Branch: refs/heads/master
Commit: d5c4f3b7bf3dd83b986a375016e7805a3c079086
Parents: 7f59c87
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Tue Sep 11 16:19:13 2018 +0200
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Tue Sep 11 16:19:13 2018 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java |  69 +++--
 .../action/hadoop/MapReduceActionExecutor.java  | 289 ++++++++++++++++++-
 .../action/hadoop/ActionExecutorTestCase.java   | 170 +++++++++++
 .../action/hadoop/TestJavaActionExecutor.java   |   5 -
 .../hadoop/TestYarnApplicationIdComparator.java |  71 +++++
 .../hadoop/TestYarnApplicationIdFinder.java     | 242 ++++++++++++++++
 .../oozie/TestSubWorkflowActionExecutor.java    | 136 +--------
 release-log.txt                                 |   1 +
 .../oozie/action/hadoop/LauncherMain.java       |  67 +++--
 .../hadoop/TestMapReduceActionExecutor.java     | 171 ++++++++++-
 10 files changed, 1014 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 05fac39..0385c77 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -977,7 +977,8 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    public void submitLauncher(FileSystem actionFs, final Context context, 
WorkflowAction action) throws ActionExecutorException {
+    public void submitLauncher(final FileSystem actionFs, final Context 
context, final WorkflowAction action)
+            throws ActionExecutorException {
         YarnClient yarnClient = null;
         try {
             Path appPathRoot = new Path(context.getWorkflow().getAppPath());
@@ -993,15 +994,13 @@ public class JavaActionExecutor extends ActionExecutor {
             // action job configuration
             Configuration actionConf = loadHadoopDefaultResources(context, 
actionXml);
             setupActionConf(actionConf, context, actionXml, appPathRoot);
-            addAppNameContext(action, context);
+            addAppNameContext(context, action);
             LOG.debug("Setting LibFilesArchives ");
             setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
 
             String jobName = actionConf.get(HADOOP_JOB_NAME);
             if (jobName == null || jobName.isEmpty()) {
-                jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
-                        getType(), context.getWorkflow().getAppName(),
-                        action.getName(), context.getWorkflow().getId());
+                jobName = getYarnApplicationName(context, action, 
"oozie:action");
                 actionConf.set(HADOOP_JOB_NAME, jobName);
             }
 
@@ -1067,8 +1066,7 @@ public class JavaActionExecutor extends ActionExecutor {
                 YarnClientApplication newApp = yarnClient.createApplication();
                 ApplicationId appId = 
newApp.getNewApplicationResponse().getApplicationId();
                 ApplicationSubmissionContext appContext =
-                        createAppSubmissionContext(appId, launcherConf, 
context, actionConf, action.getName(),
-                                credentials, actionXml);
+                        createAppSubmissionContext(appId, launcherConf, 
context, actionConf, action, credentials, actionXml);
                 yarnClient.submitApplication(appContext);
 
                 launcherId = appId.toString();
@@ -1090,6 +1088,15 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    private String getYarnApplicationName(final Context context, final 
WorkflowAction action, final String prefix) {
+        return XLog.format("{0}:T={1}:W={2}:A={3}:ID={4}",
+                prefix,
+                getType(),
+                context.getWorkflow().getAppName(),
+                action.getName(),
+                context.getWorkflow().getId());
+    }
+
     private void removeHBaseSettingFromOozieDefaultResource(final 
Configuration jobConf) {
         final String[] propertySources = 
jobConf.getPropertySources(HbaseCredentials.HBASE_USE_DYNAMIC_JARS);
         if (propertySources != null && propertySources.length > 0 &&
@@ -1100,12 +1107,8 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    protected void addAppNameContext(WorkflowAction action, Context context) {
-        String oozieActionName = 
String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
-                getType(),
-                context.getWorkflow().getAppName(),
-                action.getName(),
-                context.getWorkflow().getId());
+    private void addAppNameContext(final Context context, final WorkflowAction 
action) {
+        final String oozieActionName = getYarnApplicationName(context, action, 
"oozie:launcher");
         context.setVar(OOZIE_ACTION_NAME, oozieActionName);
     }
 
@@ -1128,7 +1131,7 @@ public class JavaActionExecutor extends ActionExecutor {
                                                                     final 
Configuration launcherJobConf,
                                                                     final 
Context actionContext,
                                                                     final 
Configuration actionConf,
-                                                                    final 
String actionName,
+                                                                    final 
WorkflowAction action,
                                                                     final 
Credentials credentials,
                                                                     final 
Element actionXml)
             throws IOException, HadoopAccessorException, URISyntaxException {
@@ -1139,7 +1142,7 @@ public class JavaActionExecutor extends ActionExecutor {
         setPriority(launcherJobConf, appContext);
         setQueue(launcherJobConf, appContext);
         appContext.setApplicationId(appId);
-        setApplicationName(actionContext, actionName, appContext);
+        setApplicationName(actionContext, action, appContext);
         appContext.setApplicationType("Oozie Launcher");
         setMaxAttempts(launcherJobConf, appContext);
 
@@ -1286,10 +1289,10 @@ public class JavaActionExecutor extends ActionExecutor {
         return oldJavaOpts;
     }
 
-    private void setApplicationName(Context context, String actionName, 
ApplicationSubmissionContext appContext) {
-        String jobName = 
XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
-                context.getWorkflow().getAppName(), actionName,
-                context.getWorkflow().getId());
+    private void setApplicationName(final Context context,
+                                    final WorkflowAction action,
+                                    final ApplicationSubmissionContext 
appContext) {
+        final String jobName = getYarnApplicationName(context, action, 
"oozie:launcher");
         appContext.setApplicationName(jobName);
     }
 
@@ -1642,15 +1645,18 @@ public class JavaActionExecutor extends ActionExecutor {
             yarnClient = createYarnClient(context, jobConf);
             FinalApplicationStatus appStatus = null;
             try {
-                ApplicationReport appReport =
-                        
yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
-                YarnApplicationState appState = 
appReport.getYarnApplicationState();
+                final String effectiveApplicationId = 
findYarnApplicationId(context, action);
+                final ApplicationId applicationId = 
ConverterUtils.toApplicationId(effectiveApplicationId);
+                final ApplicationReport appReport = 
yarnClient.getApplicationReport(applicationId);
+                final YarnApplicationState appState = 
appReport.getYarnApplicationState();
                 if (appState == YarnApplicationState.FAILED || appState == 
YarnApplicationState.FINISHED
                         || appState == YarnApplicationState.KILLED) {
                     appStatus = appReport.getFinalApplicationStatus();
                 }
-
-            } catch (Exception ye) {
+            } catch (final ActionExecutorException aae) {
+                LOG.warn("Foreseen Exception occurred while action execution; 
rethrowing ", aae);
+                throw aae;
+            } catch (final Exception ye) {
                 LOG.warn("Exception occurred while checking Launcher AM 
status; will try checking action data file instead ", ye);
                 // Fallback to action data file if we can't find the Launcher 
AM (maybe it got purged)
                 fallback = true;
@@ -1750,6 +1756,19 @@ public class JavaActionExecutor extends ActionExecutor {
     }
 
     /**
+     * For every {@link JavaActionExecutor} that is not {@link 
MapReduceActionExecutor}, the effective YARN application ID of the
+     * action is the one where {@link LauncherAM} is run, hence this default 
implementation.
+     * @param context the execution context
+     * @param action the workflow action
+     * @return a {@code String} that depicts the application ID of the 
launcher ApplicationMaster of this action
+     * @throws ActionExecutorException
+     */
+    protected String findYarnApplicationId(final Context context, final 
WorkflowAction action)
+            throws ActionExecutorException {
+        return action.getExternalId();
+    }
+
+    /**
      * Get the output data of an action. Subclasses should override this method
      * to get action specific output data.
      * @param actionFs the FileSystem object
@@ -1861,7 +1880,7 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    private String getActionYarnTag(Context context, WorkflowAction action) {
+    protected String getActionYarnTag(Context context, WorkflowAction action) {
         return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), 
context.getWorkflow().getParentId(), action);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
 
b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 83a23f5..a4dd13b 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -20,11 +20,22 @@ package org.apache.oozie.action.hadoop;
 
 import java.io.IOException;
 import java.io.StringReader;
+import java.net.URISyntaxException;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Predicate;
 
 import com.google.common.base.Charsets;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Ordering;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -34,15 +45,22 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
+import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
+import static 
org.apache.oozie.action.hadoop.LauncherMain.CHILD_MAPREDUCE_JOB_TAGS;
+
 public class MapReduceActionExecutor extends JavaActionExecutor {
 
     public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = 
"oozie.action.external.stats.write";
@@ -51,6 +69,7 @@ public class MapReduceActionExecutor extends 
JavaActionExecutor {
     private static final String STREAMING_MAIN_CLASS_NAME = 
"org.apache.oozie.action.hadoop.StreamingMain";
     public static final String JOB_END_NOTIFICATION_URL = 
"job.end.notification.url";
     private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
+    static final String YARN_APPLICATION_TYPE_MAPREDUCE = "MAPREDUCE";
     private XLog log = XLog.getLog(getClass());
 
     public MapReduceActionExecutor() {
@@ -341,33 +360,35 @@ public class MapReduceActionExecutor extends 
JavaActionExecutor {
         Map<String, String> actionData;
         Configuration jobConf;
 
+        // Need to emit jobConf and actionData for later usage
         try {
-            FileSystem actionFs = context.getAppFileSystem();
-            Element actionXml = XmlUtils.parseXml(action.getConf());
+            final FileSystem actionFs = context.getAppFileSystem();
+            final Element actionXml = XmlUtils.parseXml(action.getConf());
             jobConf = createBaseHadoopConf(context, actionXml);
-            Path actionDir = context.getActionDir();
+            final Path actionDir = context.getActionDir();
             actionData = LauncherHelper.getActionData(actionFs, actionDir, 
jobConf);
-        } catch (Exception e) {
+        } catch (final Exception e) {
             LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e);
             throw convertException(e);
         }
 
-        final String newId = 
actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID);
+        final String newJobId = findNewHadoopJobId(context, action);
 
         // check the Hadoop job if newID is defined (which should be the case 
here) - otherwise perform the normal check()
-        if (newId != null) {
+        if (newJobId != null) {
             boolean jobCompleted;
             JobClient jobClient = null;
             boolean exception = false;
 
             try {
                 jobClient = createJobClient(context, new JobConf(jobConf));
-                RunningJob runningJob = jobClient.getJob(JobID.forName(newId));
+                final JobID jobid = JobID.forName(newJobId);
+                final RunningJob runningJob = jobClient.getJob(jobid);
 
                 if (runningJob == null) {
                     context.setExternalStatus(FAILED);
                     throw new 
ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                            "Unknown hadoop job [{0}] associated with action 
[{1}].  Failing this action!", newId,
+                            "Unknown hadoop job [{0}] associated with action 
[{1}].  Failing this action!", newJobId,
                             action.getId());
                 }
 
@@ -396,7 +417,7 @@ public class MapReduceActionExecutor extends 
JavaActionExecutor {
                 super.check(context, action);
             } else {
                 context.setExternalStatus(RUNNING);
-                String externalAppId = 
TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString();
+                final String externalAppId = 
TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString();
                 context.setExternalChildIDs(externalAppId);
             }
         } else {
@@ -409,4 +430,254 @@ public class MapReduceActionExecutor extends 
JavaActionExecutor {
         injectCallback(context, actionConf);
     }
 
+    private String findNewHadoopJobId(final Context context, final 
WorkflowAction action) throws ActionExecutorException {
+        try {
+            final Configuration jobConf = createJobConfFromActionConf(context, 
action);
+
+            return new HadoopJobIdFinder(jobConf, context).find();
+        } catch (final HadoopAccessorException | IOException | JDOMException | 
URISyntaxException | InterruptedException |
+                NoSuchAlgorithmException e) {
+            LOG.warn("Exception while trying to find new Hadoop job id(). 
Message[{0}]", e.getMessage(), e);
+            throw convertException(e);
+        }
+    }
+
+    private Configuration createJobConfFromActionConf(final Context context, 
final WorkflowAction action)
+            throws JDOMException, NoSuchAlgorithmException {
+        final Element actionXml = XmlUtils.parseXml(action.getConf());
+        final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+
+        final String launcherTag = getActionYarnTag(context, action);
+        jobConf.set(CHILD_MAPREDUCE_JOB_TAGS, 
LauncherHelper.getTag(launcherTag));
+
+        return jobConf;
+    }
+
+    /**
+     * Find YARN application ID only for {@link MapReduceActionExecutor} 
delegating to {@link YarnApplicationIdFinder}.
+     * @param context the execution context
+     * @param action the workflow action
+     * @return the YARN application ID as a {@code String}
+     * @throws ActionExecutorException when the YARN application ID could not 
be found
+     */
+    @Override
+    protected String findYarnApplicationId(final Context context, final 
WorkflowAction action) throws ActionExecutorException {
+        try {
+            final Configuration jobConf = createJobConfFromActionConf(context, 
action);
+            final HadoopJobIdFinder hadoopJobIdFinder = new 
HadoopJobIdFinder(jobConf, context);
+
+            return new YarnApplicationIdFinder(hadoopJobIdFinder,
+                    new YarnApplicationReportReader(jobConf), 
(WorkflowActionBean) action).find();
+        }
+        catch (final  IOException | HadoopAccessorException | JDOMException | 
InterruptedException | URISyntaxException |
+                NoSuchAlgorithmException e) {
+            LOG.warn("Exception while finding YARN application id. 
Message[{0}]", e.getMessage(), e);
+            throw convertException(e);
+        }
+    }
+
+    /**
+     * Finds a Hadoop job ID based on {@code action-data.seq} file stored on 
HDFS by {@link MapReduceMain}.
+     */
+    @VisibleForTesting
+    static class HadoopJobIdFinder {
+        private final Configuration jobConf;
+        private final Context executorContext;
+
+        HadoopJobIdFinder(final Configuration jobConf, final Context 
executorContext) {
+            this.jobConf = jobConf;
+            this.executorContext = executorContext;
+        }
+
+        String find() throws HadoopAccessorException, IOException, 
URISyntaxException, InterruptedException {
+            final FileSystem actionFs = executorContext.getAppFileSystem();
+            final Path actionDir = executorContext.getActionDir();
+            final Map<String, String> actionData = 
LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+
+            return actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID);
+        }
+    }
+
+    /**
+     * Find YARN application ID in three stages:
+     * <ul>
+     *     <li>based on {@code action-data.seq} written by {@link 
MapReduceMain}, if already present. If present and is not the
+     *     Oozie Launcher's application ID ({@link 
WorkflowAction#getExternalId()}), gets used. Else, fall back to following:</li>
+     *     <li>if not found, look up the appropriate YARN child ID</li>
+     *     <li>if an appropriate YARN application ID is not found, go with 
Oozie Launcher's application ID
+     *     ({@link WorkflowAction#getExternalId()})</li>
+     * </ul>
+     */
+    @VisibleForTesting
+    static class YarnApplicationIdFinder {
+        private static final XLog LOG = 
XLog.getLog(YarnApplicationIdFinder.class);
+
+        private final HadoopJobIdFinder hadoopJobIdFinder;
+        private final YarnApplicationReportReader reader;
+        private final WorkflowActionBean workflowActionBean;
+
+        YarnApplicationIdFinder(final HadoopJobIdFinder hadoopJobIdFinder,
+                                final YarnApplicationReportReader reader,
+                                final WorkflowActionBean workflowActionBean) {
+            this.hadoopJobIdFinder = hadoopJobIdFinder;
+            this.reader = reader;
+            this.workflowActionBean = workflowActionBean;
+        }
+
+        String find() throws IOException, HadoopAccessorException, 
URISyntaxException, InterruptedException {
+            final String newJobId = hadoopJobIdFinder.find();
+            if (Strings.isNullOrEmpty(newJobId) && !isHadoopJobId(newJobId)) {
+                LOG.trace("Is not a Hadoop Job Id, falling back.");
+                return fallbackToYarnChildOrExternalId();
+            }
+
+            final String effectiveApplicationId;
+            final String newApplicationId = 
TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString();
+
+            if (workflowActionBean.getExternalId().equals(newApplicationId) || 
newApplicationId == null) {
+                LOG.trace("New YARN application ID {0} is empty or is the same 
as {1}, falling back.",
+                        newApplicationId, workflowActionBean.getExternalId());
+                effectiveApplicationId = fallbackToYarnChildOrExternalId();
+            }
+            else {
+                LOG.trace("New YARN application ID {0} is different, using 
it.", newApplicationId);
+                effectiveApplicationId = newApplicationId;
+            }
+
+            return effectiveApplicationId;
+        }
+
+        /**
+         * When a Hadoop could not be found, fall back finding the YARN child 
application ID, or the workflow's {@code externalId}:
+         * <ul>
+         *     <li>look for YARN children of the actual {@code 
WorkflowActionBean}</li>
+         *     <li>filter for type {@code MAPREDUCE}. Note that those will be 
the YARN application children of the original
+         *     {@code Oozie Launcher} type. Filter also for the ones not in 
YARN applications' terminal states. What remains is the
+         *     one we call YARN child ID</li>
+         *     <li>if not found, go with {@link 
WorkflowActionBean#externalId}</li>
+         *     <li>if the found one is not newer than the one already stored, 
go with {@link WorkflowActionBean#externalId}</li>
+         *     <li>if found and there is no {@link 
WorkflowActionBean#externalId}, go with the YARN child ID</li>
+         *     <li>else, go with the YARN child ID</li>
+         * </ul>
+         * @return the YARN child application's ID, or the workflow action's 
external ID
+         */
+        private String fallbackToYarnChildOrExternalId() {
+            final List<ApplicationReport> childYarnApplications = 
reader.read();
+            childYarnApplications.removeIf(new Predicate<ApplicationReport>() {
+                @Override
+                public boolean test(ApplicationReport applicationReport) {
+                    return 
!applicationReport.getApplicationType().equals(YARN_APPLICATION_TYPE_MAPREDUCE);
+                }
+            });
+
+            if (childYarnApplications.isEmpty()) {
+                LOG.trace("No child YARN applications present, returning {0} 
instead", workflowActionBean.getExternalId());
+                return workflowActionBean.getExternalId();
+            }
+
+            final String yarnChildId = getLastYarnId(childYarnApplications);
+
+            if (Strings.isNullOrEmpty(yarnChildId)) {
+                LOG.trace("yarnChildId is empty, returning {0} instead", 
workflowActionBean.getExternalId());
+                return workflowActionBean.getExternalId();
+            }
+
+            if (Strings.isNullOrEmpty(workflowActionBean.getExternalId())) {
+                LOG.trace("workflowActionBean.externalId is empty, returning 
{0} instead", yarnChildId);
+                return yarnChildId;
+            }
+
+            if (new YarnApplicationIdComparator().compare(yarnChildId, 
workflowActionBean.getExternalId()) > 0) {
+                LOG.trace("yarnChildId is newer, returning {0}", yarnChildId);
+                return yarnChildId;
+            }
+
+            LOG.trace("yarnChildId is not newer, returning {0}", 
workflowActionBean.getExternalId());
+            return workflowActionBean.getExternalId();
+        }
+
+        /**
+         * Get the biggest YARN application ID given {@link 
YarnApplicationIdComparator}.
+         * @param yarnApplications the YARN application reports
+         * @return the biggest {@link 
ApplicationReport#getApplicationId()#toString()}
+         */
+        @VisibleForTesting
+        protected String getLastYarnId(final List<ApplicationReport> 
yarnApplications) {
+            Preconditions.checkNotNull(yarnApplications, "YARN application 
list should be filled");
+            Preconditions.checkArgument(!yarnApplications.isEmpty(), "no YARN 
applications in the list");
+
+            final Iterable<String> unorderedApplicationIds =
+                    Iterables.transform(yarnApplications, new 
Function<ApplicationReport, String>() {
+                        @Override
+                        public String apply(final ApplicationReport input) {
+                            Preconditions.checkNotNull(input, "YARN 
application should be filled");
+                            return input.getApplicationId().toString();
+                        }
+                    });
+
+            return Ordering.from(new 
YarnApplicationIdComparator()).max(unorderedApplicationIds);
+        }
+
+        private boolean isHadoopJobId(final String jobIdCandidate) {
+            try {
+                return JobID.forName(jobIdCandidate) != null;
+            } catch (final IllegalArgumentException e) {
+                LOG.warn("Job ID candidate is not a Hadoop Job ID.", e);
+                return false;
+            }
+        }
+    }
+
+    /**
+     * Compares two YARN application IDs in the sense:
+     * <ul>
+     *     <li>originating from different cluster timestamps the one with the 
bigger timestamp is considered greater</li>
+     *     <li>originating from the same cluster timestamp the one with the 
higher sequence number is considered greater</li>
+     *     <li>originating from the same cluster timestamp and with the same 
sequence number both are considered equal</li>
+     * </ul>
+     */
+    @VisibleForTesting
+    @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE", 
justification = "instances will never be serialized")
+    static class YarnApplicationIdComparator implements Comparator<String> {
+        private static final String PREFIX = "application_";
+        private static final String SEPARATOR = "_";
+
+        @Override
+        public int compare(final String left, final String right) {
+            // Let's say two application IDs with different cluster timestamps 
are equal
+            final int middleLongPartComparisonResult = 
Long.compare(getMiddleLongPart(left), getMiddleLongPart(right));
+            if (middleLongPartComparisonResult != 0) {
+                return middleLongPartComparisonResult;
+            }
+
+            // Else we compare the sequence number
+            return Integer.compare(getLastIntegerPart(left), 
getLastIntegerPart(right));
+        }
+
+        private long getMiddleLongPart(final String applicationId) {
+            return 
Long.parseLong(applicationId.substring(applicationId.indexOf(PREFIX) + 
PREFIX.length(),
+                    applicationId.lastIndexOf(SEPARATOR)));
+        }
+
+        private int getLastIntegerPart(final String applicationId) {
+            return 
Integer.parseInt(applicationId.substring(applicationId.lastIndexOf(SEPARATOR) + 
SEPARATOR.length()));
+        }
+    }
+
+    /**
+     * Encapsulates call to the static method
+     * {@link LauncherMain#getChildYarnApplications(Configuration, 
ApplicationsRequestScope, long)} for better testability.
+     */
+    @VisibleForTesting
+    static class YarnApplicationReportReader {
+        private final Configuration jobConf;
+
+        YarnApplicationReportReader(final Configuration jobConf) {
+            this.jobConf = jobConf;
+        }
+
+        List<ApplicationReport> read() {
+            return LauncherMain.getChildYarnApplications(jobConf, 
ApplicationsRequestScope.OWN, 0L);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
index f39bba2..05511e4 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
@@ -21,15 +21,25 @@ package org.apache.oozie.action.hadoop;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.DagELFunctions;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.action.oozie.JavaSleepAction;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.wf.KillXCommand;
 import org.apache.oozie.service.CallbackService;
 import org.apache.oozie.service.ELService;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
@@ -47,18 +57,27 @@ import org.apache.oozie.workflow.lite.EndNodeDef;
 import org.apache.oozie.workflow.lite.LiteWorkflowApp;
 import org.apache.oozie.workflow.lite.StartNodeDef;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.Writer;
+import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
 public abstract class ActionExecutorTestCase extends XHCatTestCase {
+    protected static final int JOB_TIMEOUT = 100_000;
 
     @Override
     protected void setUp() throws Exception {
@@ -325,4 +344,155 @@ public abstract class ActionExecutorTestCase extends 
XHCatTestCase {
         writer.close();
     }
 
+    protected void writeToFile(final String appXml, final String appPath) 
throws IOException {
+        final File wf = new File(URI.create(appPath));
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(wf));
+            out.println(appXml);
+        }
+        catch (final IOException iex) {
+            throw iex;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+
+    protected String submitWorkflow(final String workflowUri, final 
OozieClient wfClient) throws OozieClientException {
+        final Properties conf = wfClient.createConfiguration();
+        conf.setProperty(OozieClient.APP_PATH, workflowUri);
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("appName", "var-app-name");
+
+        final String jobId = wfClient.submit(conf);
+        wfClient.start(jobId);
+
+        return jobId;
+    }
+
+    protected ApplicationId getChildMRJobApplicationId(final Configuration 
conf) throws IOException {
+        final List<ApplicationId> applicationIdList  = new ArrayList<>();
+        final Path inputDir = new Path(getFsTestCaseDir(), "input");
+        final Path wfIDFile = new Path(inputDir, 
LauncherMainTester.JOB_ID_FILE_NAME);
+        final FileSystem fs = FileSystem.get(conf);
+
+        // wait until we have the running child MR job's ID from HDFS
+        waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
+        if (!fs.exists(wfIDFile) || !fs.isFile(wfIDFile)) {
+            throw new IOException("Workflow ID file does not exist: " + 
wfIDFile.toString());
+        }
+
+        try (final BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(wfIDFile)))) {
+            final String line = reader.readLine();
+            JobID.forName(line);
+            final String jobID = line;
+            final String appID = jobID.replace("job", "application");
+            final ApplicationId id = ConverterUtils.toApplicationId(appID);
+            applicationIdList.add(id);
+        }
+
+        assertTrue("Application ID should've been found. No external Child ID 
was found in " + wfIDFile.toString(),
+                applicationIdList.size() == 1);
+
+        return applicationIdList.get(0);
+    }
+
+    private static class ApplicationIdExistsPredicate implements Predicate {
+        private final FileSystem fs;
+        private final Path wfIDFile;
+
+        ApplicationIdExistsPredicate(final FileSystem fs, final Path wfIDFile) 
{
+            this.fs = fs;
+            this.wfIDFile = wfIDFile;
+        }
+
+        @Override
+        public boolean evaluate() throws Exception {
+            return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() 
> 0;
+        }
+    }
+
+    protected static class WorkflowActionRunningPredicate extends 
WorkflowActionStatusPredicate {
+        WorkflowActionRunningPredicate(final OozieClient wfClient, final 
String jobId) {
+            super(wfClient, jobId, WorkflowJob.Status.RUNNING, 
WorkflowAction.Status.RUNNING);
+        }
+    }
+
+    protected static class WorkflowActionKilledPredicate extends 
WorkflowActionStatusPredicate {
+        WorkflowActionKilledPredicate(final OozieClient wfClient, final String 
jobId) {
+            super(wfClient, jobId, WorkflowJob.Status.KILLED, 
WorkflowAction.Status.KILLED);
+        }
+    }
+
+    private static abstract class WorkflowActionStatusPredicate implements 
Predicate {
+        private final OozieClient wfClient;
+        private final String jobId;
+        private final WorkflowJob.Status expectedWorkflowJobStatus;
+        private final WorkflowAction.Status expectedWorkflowActionStatus;
+
+        WorkflowActionStatusPredicate(final OozieClient wfClient,
+                                      final String jobId,
+                                      final WorkflowJob.Status 
expectedWorkflowJobStatus,
+                                      final WorkflowAction.Status 
expectedWorkflowActionStatus) {
+            this.wfClient = wfClient;
+            this.jobId = jobId;
+            this.expectedWorkflowJobStatus = expectedWorkflowJobStatus;
+            this.expectedWorkflowActionStatus = expectedWorkflowActionStatus;
+        }
+
+        @Override
+        public boolean evaluate() throws Exception {
+            final WorkflowJob.Status actualWorkflowJobStatus = 
wfClient.getJobInfo(jobId).getStatus();
+            final boolean isWorkflowInState = 
actualWorkflowJobStatus.equals(expectedWorkflowJobStatus);
+
+            final WorkflowAction.Status actualWorkflowActionStatus = 
wfClient.getJobInfo(jobId).getActions().get(1).getStatus();
+            final boolean isWorkflowActionInState = 
actualWorkflowActionStatus.equals(expectedWorkflowActionStatus);
+
+            return isWorkflowInState && isWorkflowActionInState;
+        }
+    }
+
+    protected void killWorkflow(final String jobId) throws CommandException {
+        new KillXCommand(jobId).call();
+    }
+
+    protected void waitForWorkflowToStart(final OozieClient wfClient, final 
String jobId) {
+        waitFor(JOB_TIMEOUT, new 
WorkflowActionRunningPredicate(wfClient,jobId));
+    }
+
+    protected void waitForWorkflowToKill(final OozieClient wfClient, final 
String jobId) {
+        waitFor(JOB_TIMEOUT, new 
WorkflowActionKilledPredicate(wfClient,jobId));
+    }
+
+    protected String getJavaAction(final boolean launchMRAction) {
+        final Path inputDir = new Path(getFsTestCaseDir(), "input");
+        final Path outputDir = new Path(getFsTestCaseDir(), "output");
+        final String javaActionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class>" + JavaSleepAction.class.getName()+ 
"</main-class>" +
+                "</java>";
+        final String javaWithMRActionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class>" + LauncherMainTester.class.getName()+ 
"</main-class>" +
+                "<arg>javamapreduce</arg>" +
+                "<arg>"+inputDir.toString()+"</arg>" +
+                "<arg>"+outputDir.toString()+"</arg>" +
+                "</java>";
+
+        return launchMRAction ? javaWithMRActionXml : javaActionXml;
+    }
+
+    void killYarnApplication(final Configuration configuration, final 
ApplicationId yarnApplicationId)
+            throws HadoopAccessorException, IOException, YarnException {
+        getHadoopAccessorService().createYarnClient(getTestUser(), 
configuration).killApplication(yarnApplicationId);
+    }
+
+    HadoopAccessorService getHadoopAccessorService() {
+        return Services.get().get(HadoopAccessorService.class);
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index a31079a..784dc96 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -2616,11 +2616,6 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         waitUntilYarnAppDoneAndAssertSuccess(applicationId);
     }
 
-    private HadoopAccessorService getHadoopAccessorService() {
-        return Services.get().get(HadoopAccessorService.class);
-    }
-
-
     public void testChildKill() throws Exception {
         final JobConf clusterConf = createJobConf();
         FileSystem fileSystem = FileSystem.get(clusterConf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
new file mode 100644
index 0000000..aabb363
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestYarnApplicationIdComparator {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    private MapReduceActionExecutor.YarnApplicationIdComparator comparator;
+
+    @Before
+    public void setUp() {
+        this.comparator = new 
MapReduceActionExecutor.YarnApplicationIdComparator();
+    }
+
+    @Test
+    public void whenWrongParametersGivenExceptionIsThrown() {
+        expectedException.expect(NullPointerException.class);
+        comparator.compare(null, null);
+
+        expectedException.expect(NumberFormatException.class);
+        comparator.compare("application_a_b", "application_c_d");
+
+        expectedException.expect(IndexOutOfBoundsException.class);
+        comparator.compare("a_b_c", "d_e_f");
+    }
+
+    @Test
+    public void whenDifferentTimestampsLeftEqualsRight() {
+        assertEquals("cluster timestamps are different, the one with bigger 
timestamp wins",
+                -1,
+                comparator.compare("application_1534164756526_0001", 
"application_1534164756527_0002"));
+    }
+
+    @Test
+    public void whenSameTimestampsGreaterSequenceWins() {
+        assertEquals("cluster timestamps are the same but sequences are 
different, left should be greater than right",
+                1,
+                comparator.compare("application_1534164756526_0002", 
"application_1534164756526_0001"));
+    }
+
+    @Test
+    public void whenSameTimestampsAndSameSequencesLeftEqualsRight() {
+        assertEquals("cluster timestamps and sequences are the same, left 
should equal right",
+                0,
+                comparator.compare("application_1534164756526_0001", 
"application_1534164756526_0001"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
new file mode 100644
index 0000000..3fd7149
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestYarnApplicationIdFinder {
+    @Rule
+    public ExpectedException expectedException = ExpectedException.none();
+
+    @Mock
+    private MapReduceActionExecutor.HadoopJobIdFinder hadoopJobIdFinder;
+
+    @Mock
+    private MapReduceActionExecutor.YarnApplicationReportReader reader;
+
+    @Mock
+    private WorkflowActionBean workflowActionBean;
+
+    @Mock
+    private ApplicationReport applicationReport;
+
+    @Mock
+    private ApplicationId applicationId;
+
+    private MapReduceActionExecutor.YarnApplicationIdFinder 
yarnApplicationIdFinder;
+
+    @Before
+    public void setUp() throws Exception {
+        yarnApplicationIdFinder = new 
MapReduceActionExecutor.YarnApplicationIdFinder(hadoopJobIdFinder,
+                reader, workflowActionBean);
+    }
+
+    @Test
+    public void 
whenHadoopJobIdAndChildYarnApplicationAreNotPresentActionExternalIdIsFound() 
throws Exception {
+        when(hadoopJobIdFinder.find()).thenReturn(null);
+        when(reader.read()).thenReturn(Collections.emptyList());
+        
when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0000");
+
+        assertEquals("no Hadoop Job ID nor YARN applications: 
WorkflowActionBean.externalId should be found",
+                "application_1534164756526_0000",
+                yarnApplicationIdFinder.find());
+
+        when(applicationReport.getApplicationType()).thenReturn("Oozie 
Launcher");
+        when(applicationReport.getApplicationId()).thenReturn(applicationId);
+        
when(applicationId.toString()).thenReturn("application_1534164756526_0001");
+        when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+
+        assertEquals(
+                "no Hadoop Job ID nor YARN applications of MAPREDUCE type: 
WorkflowActionBean.externalId should be found",
+                "application_1534164756526_0000",
+                yarnApplicationIdFinder.find());
+
+        when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+        when(workflowActionBean.getWfId()).thenReturn("workflowId");
+
+        assertEquals(
+                "no Hadoop Job ID nor YARN applications of the same workflow: 
WorkflowActionBean.externalId should be found",
+                "application_1534164756526_0000",
+                yarnApplicationIdFinder.find());
+    }
+
+    @Test
+    public void whenHadoopJobIdIsNotCorrectExceptionIsThrown() throws 
Exception {
+        when(hadoopJobIdFinder.find()).thenReturn("notAHadoopJobId");
+        expectedException.expect(IllegalArgumentException.class);
+
+        yarnApplicationIdFinder.find();
+    }
+
+    @Test
+    public void whenHadoopJobIdIsNotPresentChildYarnApplicationIdIsFound() 
throws Exception {
+        when(hadoopJobIdFinder.find()).thenReturn(null);
+        when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+        when(workflowActionBean.getWfId()).thenReturn("workflowId");
+        
when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+        
when(applicationId.toString()).thenReturn("application_1534164756526_0000");
+        when(applicationReport.getApplicationId()).thenReturn(applicationId);
+        when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+
+        assertEquals("no Hadoop Job ID, but an appropriate YARN application: 
applicationId should be found",
+                "application_1534164756526_0000",
+                yarnApplicationIdFinder.find());
+    }
+
+    @Test
+    public void whenHadoopJobIsNotPresentAsYarnApplicationHadoopJobIdIsUsed() 
throws Exception {
+        setupMocks("job_1534164756526_0002", "application_1534164756526_0000", 
"application_1534164756526_0001");
+
+        assertEquals("Hadoop Job ID should be found when it is not present as 
a YARN application ID",
+                "application_1534164756526_0002",
+                yarnApplicationIdFinder.find());
+    }
+
+    private void setupMocks(final String mrJobId, final String wfExternalId, 
final String yarnApplicationId)
+            throws HadoopAccessorException, IOException, URISyntaxException, 
InterruptedException, YarnException {
+        when(hadoopJobIdFinder.find()).thenReturn(mrJobId);
+        when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE");
+        when(workflowActionBean.getWfId()).thenReturn("workflowId");
+        when(workflowActionBean.getExternalId()).thenReturn(wfExternalId);
+        
when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+        when(applicationId.toString()).thenReturn(yarnApplicationId);
+        when(applicationReport.getApplicationId()).thenReturn(applicationId);
+        when(reader.read()).thenReturn(Lists.newArrayList(applicationReport));
+    }
+
+    @Test
+    public void 
whenHadoopJobIsPresentAsYarnApplicationAndDifferentFromItsUsed() throws 
Exception {
+        setupMocks("job_1534164756526_0002", "application_1534164756526_0001", 
"application_1534164756526_0003");
+
+        assertEquals("Hadoop Job ID should be found when different from the 
YARN application ID",
+                "application_1534164756526_0002",
+                yarnApplicationIdFinder.find());
+    }
+
+    @Test
+    public void 
whenHadoopJobIsPresentAsYarnApplicationAndContainWorkflowIdNotUsed() throws 
Exception {
+        setupMocks("job_1534164756526_0002", "application_1534164756526_0002", 
"application_1534164756526_0003");
+
+        assertEquals("YARN application ID should be found when greater than 
WorkflowActionBean.externalId",
+                "application_1534164756526_0003",
+                yarnApplicationIdFinder.find());
+    }
+
+    @Test
+    public void 
whenOldLauncherAndMRobApplicationsAreFinishedAndNewLauncherPresentNewLauncherIsUsed()
 throws Exception {
+        final ApplicationReport oldLauncher = mock(ApplicationReport.class);
+        when(oldLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+        
when(oldLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+        final ApplicationId oldLauncherId = mock(ApplicationId.class);
+        
when(oldLauncherId.toString()).thenReturn("application_1534164756526_0001");
+        when(oldLauncher.getApplicationId()).thenReturn(oldLauncherId);
+        final ApplicationReport oldMRJob = mock(ApplicationReport.class);
+        when(oldMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+        
when(oldMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+        final ApplicationId oldMRJobId = mock(ApplicationId.class);
+        
when(oldMRJobId.toString()).thenReturn("application_1534164756526_0002");
+        when(oldMRJob.getApplicationId()).thenReturn(oldMRJobId);
+        final ApplicationReport newLauncher = mock(ApplicationReport.class);
+        when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+        
when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+        final ApplicationId newLauncherId = mock(ApplicationId.class);
+        
when(newLauncherId.toString()).thenReturn("application_1534164756526_0003");
+        when(newLauncher.getApplicationId()).thenReturn(newLauncherId);
+        final ApplicationReport newMRJob = mock(ApplicationReport.class);
+        when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+        
when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+        final ApplicationId newMRJobId = mock(ApplicationId.class);
+        
when(newMRJobId.toString()).thenReturn("application_1534164756526_0004");
+        when(newMRJob.getApplicationId()).thenReturn(newMRJobId);
+        when(reader.read()).thenReturn(Lists.newArrayList(oldLauncher, 
oldMRJob, newLauncher, newMRJob));
+
+        
when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0003");
+        assertEquals("newLauncher should be found", 
"application_1534164756526_0004", yarnApplicationIdFinder.find());
+
+        
when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0004");
+        assertEquals("newLauncher should be found", 
"application_1534164756526_0004", yarnApplicationIdFinder.find());
+
+        
when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0005");
+        assertEquals("workflowActionBean.externalId should be found",
+                "application_1534164756526_0005", 
yarnApplicationIdFinder.find());
+    }
+
+    @Test
+    public void testGetLastYarnIdOnNullThrows() {
+        expectedException.expect(NullPointerException.class);
+        yarnApplicationIdFinder.getLastYarnId(null);
+    }
+
+    @Test
+    public void testGetLastYarnIdOnEmptyListThrows() {
+        expectedException.expect(IllegalArgumentException.class);
+        yarnApplicationIdFinder.getLastYarnId(Collections.emptyList());
+    }
+
+    @Test
+    public void testGetLastYarnIdOnOneElementSuccess() {
+        when(applicationReport.getApplicationId()).thenReturn(applicationId);
+        
when(applicationId.toString()).thenReturn("application_1534164756526_0000");
+
+        final String lastYarnId = 
yarnApplicationIdFinder.getLastYarnId(Collections.singletonList(applicationReport));
+        assertEquals("last YARN id should be the only element in the list", 
"application_1534164756526_0000", lastYarnId);
+    }
+
+    @Test
+    public void testGetLastYarnIdFromUnorderedListSuccess() {
+        final ApplicationReport newLauncher = mock(ApplicationReport.class);
+        when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher");
+        
when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED);
+        final ApplicationId newLauncherId = mock(ApplicationId.class);
+        
when(newLauncherId.toString()).thenReturn("application_1534164756526_0003");
+        when(newLauncher.getApplicationId()).thenReturn(newLauncherId);
+        final ApplicationReport newMRJob = mock(ApplicationReport.class);
+        when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE");
+        
when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING);
+        final ApplicationId newMRJobId = mock(ApplicationId.class);
+        
when(newMRJobId.toString()).thenReturn("application_1534164756526_0004");
+        when(newMRJob.getApplicationId()).thenReturn(newMRJobId);
+
+        final String lastYarnId = 
yarnApplicationIdFinder.getLastYarnId(Lists.newArrayList(newMRJob, 
newLauncher));
+        assertEquals("last YARN id should be the maximal element in the list", 
"application_1534164756526_0004", lastYarnId);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
 
b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
index 893405e..9c7f821 100644
--- 
a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
@@ -21,20 +21,14 @@ package org.apache.oozie.action.oozie;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
 import org.apache.oozie.action.hadoop.LauncherMainTester;
-import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
-import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.wf.KillXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -45,22 +39,13 @@ import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileWriter;
 import java.io.IOException;
-import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
 import java.io.StringReader;
 import java.io.Writer;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Properties;
 
 public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
-    private static final int JOB_TIMEOUT = 100 * 1000;
 
     public void testType() {
         SubWorkflowActionExecutor subWorkflow = new 
SubWorkflowActionExecutor();
@@ -506,7 +491,7 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
             final OozieClient wfClient = LocalOozie.getClient();
             final String jobId = submitWorkflow(workflowUri, wfClient);
 
-            waitForSubWFtoStart(wfClient, jobId);
+            waitForWorkflowToStart(wfClient, jobId);
             WorkflowJob wf = wfClient.getJobInfo(jobId);
             // Suspending subworkflow
             new SuspendXCommand(wf.getActions().get(1).getExternalId()).call();
@@ -529,7 +514,7 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
             final String jobId = submitWorkflow(workflowUri, wfClient);
             final Configuration conf = 
Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
 
-            waitForSubWFtoStart(wfClient, jobId);
+            waitForWorkflowToStart(wfClient, jobId);
 
             final ApplicationId externalChildJobId = 
getChildMRJobApplicationId(conf);
             killWorkflow(jobId);
@@ -540,70 +525,11 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
 
     }
 
-    private void killWorkflow(String jobId) throws CommandException {
-        new KillXCommand(jobId).call();
-    }
-
-    private ApplicationId getChildMRJobApplicationId(Configuration conf) 
throws IOException {
-        final List<ApplicationId> applicationIdList  = new ArrayList<>();
-        final Path inputDir = new Path(getFsTestCaseDir(), "input");
-        final Path wfIDFile = new Path(inputDir, 
LauncherMainTester.JOB_ID_FILE_NAME);
-        final FileSystem fs = FileSystem.get(conf);
-
-        // wait until we have the running child MR job's ID from HDFS
-        waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
-
-        try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(fs.open(wfIDFile)))) {
-            String line = reader.readLine();
-            JobID.forName(line);
-            String jobID = line;
-            String appID = jobID.replace("job", "application");
-            ApplicationId id = ConverterUtils.toApplicationId(appID);
-            applicationIdList.add(id);
-        }
-
-        assertTrue("Application ID should've been found. No external Child ID 
was found in " + wfIDFile.toString(),
-                applicationIdList.size() == 1);
-        return applicationIdList.get(0);
-    }
-
-    private void waitForSubWFtoStart(OozieClient wfClient, String jobId) {
-        waitFor(JOB_TIMEOUT, new 
SubWorkflowActionRunningPredicate(wfClient,jobId));
-    }
-
-    private String submitWorkflow(String workflowUri, OozieClient wfClient) 
throws OozieClientException {
-        Properties conf = wfClient.createConfiguration();
-        conf.setProperty(OozieClient.APP_PATH, workflowUri);
-        conf.setProperty(OozieClient.USER_NAME, getTestUser());
-        conf.setProperty("appName", "var-app-name");
-        final String jobId = wfClient.submit(conf);
-        wfClient.start(jobId);
-        return jobId;
-    }
-
-    private void writeToFile(String appXml, String appPath) throws IOException 
{
-        // TODO Auto-generated method stub
-        File wf = new File(URI.create(appPath));
-        PrintWriter out = null;
-        try {
-            out = new PrintWriter(new FileWriter(wf));
-            out.println(appXml);
-        }
-        catch (IOException iex) {
-            throw iex;
-        }
-        finally {
-            if (out != null) {
-                out.close();
-            }
-        }
-    }
-
     public String getLazyWorkflow(boolean launchMRAction) {
         return  "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" +
                 "<start to='java' />" +
                 "       <action name='java'>" +
-                getAction(launchMRAction)
+                getJavaAction(launchMRAction)
                 + "<ok to='end' />"
                 + "<error to='fail' />"
                 + "</action>"
@@ -614,26 +540,6 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
                 + "</workflow-app>";
     }
 
-    private String getAction(boolean launchMRAction) {
-        Path inputDir = new Path(getFsTestCaseDir(), "input");
-        Path outputDir = new Path(getFsTestCaseDir(), "output");
-        String javaActionXml = "<java>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<main-class>" + JavaSleepAction.class.getName()+ 
"</main-class>" +
-                "</java>";
-        String javaWithMRActionXml = "<java>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<main-class>" + LauncherMainTester.class.getName()+ 
"</main-class>" +
-                "<arg>javamapreduce</arg>" +
-                "<arg>"+inputDir.toString()+"</arg>" +
-                "<arg>"+outputDir.toString()+"</arg>" +
-                "</java>";
-        String actionXml = launchMRAction ? javaWithMRActionXml : 
javaActionXml;
-        return actionXml;
-    }
-
     public void testSubWorkflowRerun() throws Exception {
         try {
             String workflowUri = createSubWorkflowWithLazyAction(false);
@@ -646,7 +552,7 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
             final String jobId = wfClient.submit(conf);
             wfClient.start(jobId);
 
-            waitForSubWFtoStart(wfClient, jobId);
+            waitForWorkflowToStart(wfClient, jobId);
 
             String subWorkflowExternalId = 
wfClient.getJobInfo(jobId).getActions().get(1).getExternalId();
             
wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId());
@@ -925,38 +831,4 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
                 + "<end name='end' />"
                 + "</workflow-app>";
     }
-
-    private static class ApplicationIdExistsPredicate implements Predicate {
-
-        private final FileSystem fs;
-        private final Path wfIDFile;
-
-        public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) {
-            this.fs = fs;
-            this.wfIDFile = wfIDFile;
-        }
-
-        @Override
-        public boolean evaluate() throws Exception {
-            return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() 
> 0;
-        }
-    }
-
-    private static class SubWorkflowActionRunningPredicate implements 
Predicate {
-        private final OozieClient wfClient;
-        private final String jobId;
-
-        public SubWorkflowActionRunningPredicate(OozieClient wfClient, String 
jobId) {
-            this.wfClient = wfClient;
-            this.jobId = jobId;
-        }
-
-        @Override
-        public boolean evaluate() throws Exception {
-            boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.RUNNING;
-            boolean isSubWfActionRunning = wfClient.getJobInfo(jobId)
-                    .getActions().get(1).getStatus() == 
WorkflowAction.Status.RUNNING;
-            return isSubWfRunning && isSubWfActionRunning;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 371a60d..b835951 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3298 [MapReduce action] External ID is not filled properly and failing 
MR job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti)
 OOZIE-3317 amend [build] Fix false positive precommit reports (kmarton via 
andras.piros)
 OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due 
to busy waiting (pbacsko)
 OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index c9e2a91..b6599f7 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -199,37 +199,9 @@ public abstract class LauncherMain {
 
     public static Set<ApplicationId> getChildYarnJobs(Configuration 
actionConf, ApplicationsRequestScope scope,
                                                       long startTime) {
-        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
-        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
-        if (tag == null) {
-            System.out.print("Could not find YARN tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
-            return childYarnJobs;
-        }
-
-        System.out.println("tag id : " + tag);
-        GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
-        gar.setScope(scope);
-        gar.setApplicationTags(Collections.singleton(tag));
-        long endTime = System.currentTimeMillis();
-        if (startTime > endTime) {
-            System.out.println("WARNING: Clock skew between the Oozie server 
host and this host detected.  Please fix this.  " +
-                    "Attempting to work around...");
-            // We don't know which one is wrong (relative to the RM), so to be 
safe, let's assume they're both wrong and add an
-            // offset in both directions
-            long diff = 2 * (startTime - endTime);
-            startTime = startTime - diff;
-            endTime = endTime + diff;
-        }
-        gar.setStartRange(startTime, endTime);
-        try {
-            ApplicationClientProtocol proxy = 
ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
-            GetApplicationsResponse apps = proxy.getApplications(gar);
-            List<ApplicationReport> appsList = apps.getApplicationList();
-            for(ApplicationReport appReport : appsList) {
-                childYarnJobs.add(appReport.getApplicationId());
-            }
-        } catch (YarnException | IOException ioe) {
-            throw new RuntimeException("Exception occurred while finding child 
jobs", ioe);
+        final Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        for (final ApplicationReport applicationReport : 
getChildYarnApplications(actionConf, scope, startTime)) {
+            childYarnJobs.add(applicationReport.getApplicationId());
         }
 
         if (childYarnJobs.isEmpty()) {
@@ -254,6 +226,39 @@ public abstract class LauncherMain {
         return getChildYarnJobs(actionConf, scope, startTime);
     }
 
+    public static List<ApplicationReport> getChildYarnApplications(final 
Configuration actionConf,
+                                                                   final 
ApplicationsRequestScope scope,
+                                                                   long 
startTime) {
+        final String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        if (tag == null) {
+            System.out.print("Could not find YARN tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
+            return Collections.emptyList();
+        }
+
+        System.out.println("tag id : " + tag);
+        final GetApplicationsRequest gar = 
GetApplicationsRequest.newInstance();
+        gar.setScope(scope);
+        gar.setApplicationTags(Collections.singleton(tag));
+        long endTime = System.currentTimeMillis();
+        if (startTime > endTime) {
+            System.out.println("WARNING: Clock skew between the Oozie server 
host and this host detected.  Please fix this.  " +
+                    "Attempting to work around...");
+            // We don't know which one is wrong (relative to the RM), so to be 
safe, let's assume they're both wrong and add an
+            // offset in both directions
+            final long diff = 2 * (startTime - endTime);
+            startTime = startTime - diff;
+            endTime = endTime + diff;
+        }
+        gar.setStartRange(startTime, endTime);
+        try {
+            final ApplicationClientProtocol proxy = 
ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+            final GetApplicationsResponse apps = proxy.getApplications(gar);
+            return apps.getApplicationList();
+        } catch (final YarnException | IOException e) {
+            throw new RuntimeException("Exception occurred while finding child 
jobs", e);
+        }
+    }
+
     public static void killChildYarnJobs(Configuration actionConf) {
         try {
             Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
index f460b6b..68e83fa 100644
--- 
a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
+++ 
b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java
@@ -56,9 +56,11 @@ import org.apache.hadoop.streaming.StreamJob;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
@@ -67,6 +69,8 @@ import org.apache.oozie.command.wf.StartXCommand;
 import org.apache.oozie.command.wf.SubmitXCommand;
 import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
 import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
+import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
@@ -74,10 +78,12 @@ import org.apache.oozie.util.ClassUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PropertiesUtils;
 import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 
 public class TestMapReduceActionExecutor extends ActionExecutorTestCase {
+    private static final XLog LOG = 
XLog.getLog(TestMapReduceActionExecutor.class);
 
     private static final String PIPES = "pipes";
     private static final String MAP_REDUCE = "map-reduce";
@@ -85,7 +91,8 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
     @Override
     protected void setSystemProps() throws Exception {
         super.setSystemProps();
-        setSystemProperty("oozie.service.ActionService.executor.classes", 
MapReduceActionExecutor.class.getName());
+        setSystemProperty("oozie.service.ActionService.executor.classes",
+                String.join(",", MapReduceActionExecutor.class.getName(), 
JavaActionExecutor.class.getName()));
         setSystemProperty("oozie.credentials.credentialclasses", 
"cred=org.apache.oozie.action.hadoop.CredentialForTest");
     }
 
@@ -803,10 +810,6 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         w.close();
     }
 
-    private HadoopAccessorService getHadoopAccessorService() {
-        return Services.get().get(HadoopAccessorService.class);
-    }
-
     public void testMapReduceWithUberJarEnabled() throws Exception {
         Services serv = Services.get();
         boolean originalUberJarDisabled = 
serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
@@ -1306,4 +1309,162 @@ public class TestMapReduceActionExecutor extends 
ActionExecutorTestCase {
         });
     }
 
+    public void testFailingMapReduceJobCausesOozieLauncherAMToFail() throws 
Exception {
+        final String workflowUri = createWorkflowWithMapReduceAction();
+
+        startWorkflowAndFailChildMRJob(workflowUri);
+    }
+
+    private String createWorkflowWithMapReduceAction() throws IOException {
+        final String workflowUri = getTestCaseFileUri("workflow.xml");
+        final String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" 
name=\"workflow\">" +
+                "   <start to=\"map-reduce\"/>" +
+                "   <action name=\"map-reduce\">" +
+                "       <map-reduce>" +
+                "           <resource-manager>" + getJobTrackerUri() + 
"</resource-manager>" +
+                "           <name-node>" + getNameNodeUri() + "</name-node>" +
+                "           <configuration>\n" +
+                "               <property>\n" +
+                "                   <name>mapred.job.queue.name</name>\n" +
+                "                   <value>default</value>\n" +
+                "               </property>\n" +
+                "               <property>\n" +
+                "                   <name>mapred.mapper.class</name>\n" +
+                "                   
<value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
+                "               </property>\n" +
+                "               <property>\n" +
+                "                   <name>mapred.reducer.class</name>\n" +
+                "                   
<value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" +
+                "               </property>\n" +
+                "                <property>\n" +
+                "                    <name>mapred.input.dir</name>\n" +
+                "                    <value>" + getFsTestCaseDir() + 
"/input</value>\n" +
+                "                </property>\n" +
+                "                <property>\n" +
+                "                    <name>mapred.output.dir</name>\n" +
+                "                    <value>" + getFsTestCaseDir() + 
"/output</value>\n" +
+                "                </property>\n" +
+                "           </configuration>\n" +
+                "       </map-reduce>" +
+                "       <ok to=\"end\"/>" +
+                "       <error to=\"fail\"/>" +
+                "   </action>" +
+                "   <kill name=\"fail\">" +
+                "       <message>Sub workflow failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
+                "   </kill>" +
+                "   <end name=\"end\"/>" +
+                "</workflow-app>";
+
+        writeToFile(appXml, workflowUri);
+
+        return workflowUri;
+    }
+
+    private void startWorkflowAndFailChildMRJob(final String workflowUri) 
throws Exception {
+        try {
+            LocalOozie.start();
+            final OozieClient wfClient = LocalOozie.getClient();
+            final String workflowId = submitWorkflow(workflowUri, wfClient);
+            final Configuration conf = 
Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
+
+            final Path inputFolder = createInputFolder(conf);
+
+            waitForWorkflowToStart(wfClient, workflowId);
+            
waitForChildYarnApplication(getHadoopAccessorService().createYarnClient(getTestUser(),
 conf), workflowId);
+            assertAndWriteNextMRJobId(workflowId, conf, inputFolder);
+
+            final ApplicationId externalChildJobId = 
getChildMRJobApplicationId(conf);
+
+            killYarnApplication(conf, externalChildJobId);
+            
waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString());
+            waitForWorkflowToKill(wfClient, workflowId);
+        } finally {
+            LocalOozie.stop();
+        }
+    }
+
+    /**
+     * Get all YARN application IDs, select the one of type {@code MAPREDUCE} 
that is relevant to {@code workflowId},
+     * and write to {@code inputFolder/jobID.txt}.
+     * <p>
+     * Simulating functional parts of {@link 
LauncherMain#writeExternalChildIDs(String, Pattern[], String)} in order
+     * {@link MapReduceActionExecutor#check(ActionExecutor.Context, 
WorkflowAction)} can find it later on the call chain.
+     * <p>
+     * We need to write out an own sequence file to {@link 
LauncherMainTester#JOB_ID_FILE_NAME} in order
+     * {@link 
ActionExecutorTestCase#getChildMRJobApplicationId(Configuration)} can find it. 
We unfortunately cannot rely on the
+     * original sequence file written by {@link 
LauncherMain#writeExternalChildIDs(String, Pattern[], String)} because we don't 
own
+     * a reference to the original {@link ActionExecutor.Context} as in {@link 
MapReduceActionExecutor}.
+     * @param workflowId the workflow ID
+     * @param conf the {@link Configuration} used for Hadoop Common / YARN API 
calls
+     * @param inputFolder where to write the output text file
+     * @throws IOException when the output text file cannot be written
+     * @throws YarnException when the list of YARN applications cannot be 
queried
+     * @throws HadoopAccessorException when {@link YarnClient} cannot be 
created
+     */
+    private void assertAndWriteNextMRJobId(final String workflowId, final 
Configuration conf, final Path inputFolder)
+            throws IOException, YarnException, HadoopAccessorException {
+        final Path wfIDFile = new Path(inputFolder, 
LauncherMainTester.JOB_ID_FILE_NAME);
+        try (final FileSystem fs = FileSystem.get(conf);
+             final Writer w = new OutputStreamWriter(fs.create(wfIDFile))) {
+            final List<ApplicationReport> allApplications =
+                    getHadoopAccessorService().createYarnClient(getTestUser(), 
conf).getApplications();
+
+            assertTrue("YARN applications number mismatch", 
allApplications.size() >= 2);
+
+            ApplicationReport mapReduce = null;
+            for (final ApplicationReport candidate : allApplications) {
+                if 
(candidate.getApplicationType().equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE)
+                        && candidate.getName().contains(workflowId)) {
+                    mapReduce = candidate;
+                }
+            }
+            assertNotNull("MAPREDUCE YARN application not found", mapReduce);
+
+            final String applicationId = 
mapReduce.getApplicationId().toString();
+            final String nextMRJobId = applicationId.replace("application", 
"job");
+
+            LOG.debug("Writing next MapReduce job ID: {0}", nextMRJobId);
+
+            w.write(nextMRJobId);
+        }
+    }
+
+    private Path createInputFolder(final Configuration conf) throws 
IOException {
+        final Path inputDir = new Path(getFsTestCaseDir(), "input");
+        try (final FileSystem fs = FileSystem.get(conf)) {
+             fs.mkdirs(inputDir);
+        }
+        return inputDir;
+    }
+
+    private void waitForChildYarnApplication(final YarnClient yarnClient, 
final String workflowId) {
+        waitFor(JOB_TIMEOUT, new 
ChildYarnApplicationPresentPredicate(yarnClient, workflowId));
+    }
+
+    private class ChildYarnApplicationPresentPredicate implements Predicate {
+        private final YarnClient yarnClient;
+        private final String workflowId;
+
+        ChildYarnApplicationPresentPredicate(final YarnClient yarnClient, 
final String workflowId) {
+            this.yarnClient = yarnClient;
+            this.workflowId = workflowId;
+        }
+
+        @Override
+        public boolean evaluate() throws Exception {
+            if (yarnClient.getApplications().isEmpty()) {
+                return false;
+            }
+
+            for (final ApplicationReport applicationReport : 
yarnClient.getApplications()) {
+                final String name = applicationReport.getName();
+                final String type = applicationReport.getApplicationType();
+                if 
(type.equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE) && 
name.contains(workflowId)) {
+                    return true;
+                }
+            }
+
+            return false;
+        }
+    }
 }

Reply via email to