Repository: oozie Updated Branches: refs/heads/master 7f59c8769 -> d5c4f3b7b
OOZIE-3298 [MapReduce action] External ID is not filled properly and failing MR job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5c4f3b7 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5c4f3b7 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5c4f3b7 Branch: refs/heads/master Commit: d5c4f3b7bf3dd83b986a375016e7805a3c079086 Parents: 7f59c87 Author: Andras Piros <andras.pi...@cloudera.com> Authored: Tue Sep 11 16:19:13 2018 +0200 Committer: Andras Piros <andras.pi...@cloudera.com> Committed: Tue Sep 11 16:19:13 2018 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 69 +++-- .../action/hadoop/MapReduceActionExecutor.java | 289 ++++++++++++++++++- .../action/hadoop/ActionExecutorTestCase.java | 170 +++++++++++ .../action/hadoop/TestJavaActionExecutor.java | 5 - .../hadoop/TestYarnApplicationIdComparator.java | 71 +++++ .../hadoop/TestYarnApplicationIdFinder.java | 242 ++++++++++++++++ .../oozie/TestSubWorkflowActionExecutor.java | 136 +-------- release-log.txt | 1 + .../oozie/action/hadoop/LauncherMain.java | 67 +++-- .../hadoop/TestMapReduceActionExecutor.java | 171 ++++++++++- 10 files changed, 1014 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 05fac39..0385c77 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -977,7 +977,8 @@ public class JavaActionExecutor extends ActionExecutor { } } - public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException { + public void submitLauncher(final FileSystem actionFs, final Context context, final WorkflowAction action) + throws ActionExecutorException { YarnClient yarnClient = null; try { Path appPathRoot = new Path(context.getWorkflow().getAppPath()); @@ -993,15 +994,13 @@ public class JavaActionExecutor extends ActionExecutor { // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); setupActionConf(actionConf, context, actionXml, appPathRoot); - addAppNameContext(action, context); + addAppNameContext(context, action); LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); String jobName = actionConf.get(HADOOP_JOB_NAME); if (jobName == null || jobName.isEmpty()) { - jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}", - getType(), context.getWorkflow().getAppName(), - action.getName(), context.getWorkflow().getId()); + jobName = getYarnApplicationName(context, action, "oozie:action"); actionConf.set(HADOOP_JOB_NAME, jobName); } @@ -1067,8 +1066,7 @@ public class JavaActionExecutor extends ActionExecutor { YarnClientApplication newApp = yarnClient.createApplication(); ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = - createAppSubmissionContext(appId, launcherConf, context, actionConf, action.getName(), - credentials, actionXml); + createAppSubmissionContext(appId, launcherConf, context, actionConf, action, credentials, actionXml); yarnClient.submitApplication(appContext); launcherId = appId.toString(); @@ -1090,6 +1088,15 @@ public class JavaActionExecutor extends ActionExecutor { } } + private String getYarnApplicationName(final Context context, final WorkflowAction action, final String prefix) { + return XLog.format("{0}:T={1}:W={2}:A={3}:ID={4}", + prefix, + getType(), + context.getWorkflow().getAppName(), + action.getName(), + context.getWorkflow().getId()); + } + private void removeHBaseSettingFromOozieDefaultResource(final Configuration jobConf) { final String[] propertySources = jobConf.getPropertySources(HbaseCredentials.HBASE_USE_DYNAMIC_JARS); if (propertySources != null && propertySources.length > 0 && @@ -1100,12 +1107,8 @@ public class JavaActionExecutor extends ActionExecutor { } } - protected void addAppNameContext(WorkflowAction action, Context context) { - String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s", - getType(), - context.getWorkflow().getAppName(), - action.getName(), - context.getWorkflow().getId()); + private void addAppNameContext(final Context context, final WorkflowAction action) { + final String oozieActionName = getYarnApplicationName(context, action, "oozie:launcher"); context.setVar(OOZIE_ACTION_NAME, oozieActionName); } @@ -1128,7 +1131,7 @@ public class JavaActionExecutor extends ActionExecutor { final Configuration launcherJobConf, final Context actionContext, final Configuration actionConf, - final String actionName, + final WorkflowAction action, final Credentials credentials, final Element actionXml) throws IOException, HadoopAccessorException, URISyntaxException { @@ -1139,7 +1142,7 @@ public class JavaActionExecutor extends ActionExecutor { setPriority(launcherJobConf, appContext); setQueue(launcherJobConf, appContext); appContext.setApplicationId(appId); - setApplicationName(actionContext, actionName, appContext); + setApplicationName(actionContext, action, appContext); appContext.setApplicationType("Oozie Launcher"); setMaxAttempts(launcherJobConf, appContext); @@ -1286,10 +1289,10 @@ public class JavaActionExecutor extends ActionExecutor { return oldJavaOpts; } - private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) { - String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), - context.getWorkflow().getAppName(), actionName, - context.getWorkflow().getId()); + private void setApplicationName(final Context context, + final WorkflowAction action, + final ApplicationSubmissionContext appContext) { + final String jobName = getYarnApplicationName(context, action, "oozie:launcher"); appContext.setApplicationName(jobName); } @@ -1642,15 +1645,18 @@ public class JavaActionExecutor extends ActionExecutor { yarnClient = createYarnClient(context, jobConf); FinalApplicationStatus appStatus = null; try { - ApplicationReport appReport = - yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId())); - YarnApplicationState appState = appReport.getYarnApplicationState(); + final String effectiveApplicationId = findYarnApplicationId(context, action); + final ApplicationId applicationId = ConverterUtils.toApplicationId(effectiveApplicationId); + final ApplicationReport appReport = yarnClient.getApplicationReport(applicationId); + final YarnApplicationState appState = appReport.getYarnApplicationState(); if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED || appState == YarnApplicationState.KILLED) { appStatus = appReport.getFinalApplicationStatus(); } - - } catch (Exception ye) { + } catch (final ActionExecutorException aae) { + LOG.warn("Foreseen Exception occurred while action execution; rethrowing ", aae); + throw aae; + } catch (final Exception ye) { LOG.warn("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye); // Fallback to action data file if we can't find the Launcher AM (maybe it got purged) fallback = true; @@ -1750,6 +1756,19 @@ public class JavaActionExecutor extends ActionExecutor { } /** + * For every {@link JavaActionExecutor} that is not {@link MapReduceActionExecutor}, the effective YARN application ID of the + * action is the one where {@link LauncherAM} is run, hence this default implementation. + * @param context the execution context + * @param action the workflow action + * @return a {@code String} that depicts the application ID of the launcher ApplicationMaster of this action + * @throws ActionExecutorException + */ + protected String findYarnApplicationId(final Context context, final WorkflowAction action) + throws ActionExecutorException { + return action.getExternalId(); + } + + /** * Get the output data of an action. Subclasses should override this method * to get action specific output data. * @param actionFs the FileSystem object @@ -1861,7 +1880,7 @@ public class JavaActionExecutor extends ActionExecutor { } } - private String getActionYarnTag(Context context, WorkflowAction action) { + protected String getActionYarnTag(Context context, WorkflowAction action) { return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action); } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 83a23f5..a4dd13b 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -20,11 +20,22 @@ package org.apache.oozie.action.hadoop; import java.io.IOException; import java.io.StringReader; +import java.net.URISyntaxException; +import java.security.NoSuchAlgorithmException; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.function.Predicate; import com.google.common.base.Charsets; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import com.google.common.collect.Iterables; +import com.google.common.collect.Ordering; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,15 +45,22 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; +import org.jdom.JDOMException; import org.jdom.Namespace; +import static org.apache.oozie.action.hadoop.LauncherMain.CHILD_MAPREDUCE_JOB_TAGS; + public class MapReduceActionExecutor extends JavaActionExecutor { public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; @@ -51,6 +69,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor { private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain"; public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url"; private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name"; + static final String YARN_APPLICATION_TYPE_MAPREDUCE = "MAPREDUCE"; private XLog log = XLog.getLog(getClass()); public MapReduceActionExecutor() { @@ -341,33 +360,35 @@ public class MapReduceActionExecutor extends JavaActionExecutor { Map<String, String> actionData; Configuration jobConf; + // Need to emit jobConf and actionData for later usage try { - FileSystem actionFs = context.getAppFileSystem(); - Element actionXml = XmlUtils.parseXml(action.getConf()); + final FileSystem actionFs = context.getAppFileSystem(); + final Element actionXml = XmlUtils.parseXml(action.getConf()); jobConf = createBaseHadoopConf(context, actionXml); - Path actionDir = context.getActionDir(); + final Path actionDir = context.getActionDir(); actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); - } catch (Exception e) { + } catch (final Exception e) { LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); throw convertException(e); } - final String newId = actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID); + final String newJobId = findNewHadoopJobId(context, action); // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check() - if (newId != null) { + if (newJobId != null) { boolean jobCompleted; JobClient jobClient = null; boolean exception = false; try { jobClient = createJobClient(context, new JobConf(jobConf)); - RunningJob runningJob = jobClient.getJob(JobID.forName(newId)); + final JobID jobid = JobID.forName(newJobId); + final RunningJob runningJob = jobClient.getJob(jobid); if (runningJob == null) { context.setExternalStatus(FAILED); throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", - "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, + "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newJobId, action.getId()); } @@ -396,7 +417,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor { super.check(context, action); } else { context.setExternalStatus(RUNNING); - String externalAppId = TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString(); + final String externalAppId = TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString(); context.setExternalChildIDs(externalAppId); } } else { @@ -409,4 +430,254 @@ public class MapReduceActionExecutor extends JavaActionExecutor { injectCallback(context, actionConf); } + private String findNewHadoopJobId(final Context context, final WorkflowAction action) throws ActionExecutorException { + try { + final Configuration jobConf = createJobConfFromActionConf(context, action); + + return new HadoopJobIdFinder(jobConf, context).find(); + } catch (final HadoopAccessorException | IOException | JDOMException | URISyntaxException | InterruptedException | + NoSuchAlgorithmException e) { + LOG.warn("Exception while trying to find new Hadoop job id(). Message[{0}]", e.getMessage(), e); + throw convertException(e); + } + } + + private Configuration createJobConfFromActionConf(final Context context, final WorkflowAction action) + throws JDOMException, NoSuchAlgorithmException { + final Element actionXml = XmlUtils.parseXml(action.getConf()); + final Configuration jobConf = createBaseHadoopConf(context, actionXml); + + final String launcherTag = getActionYarnTag(context, action); + jobConf.set(CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); + + return jobConf; + } + + /** + * Find YARN application ID only for {@link MapReduceActionExecutor} delegating to {@link YarnApplicationIdFinder}. + * @param context the execution context + * @param action the workflow action + * @return the YARN application ID as a {@code String} + * @throws ActionExecutorException when the YARN application ID could not be found + */ + @Override + protected String findYarnApplicationId(final Context context, final WorkflowAction action) throws ActionExecutorException { + try { + final Configuration jobConf = createJobConfFromActionConf(context, action); + final HadoopJobIdFinder hadoopJobIdFinder = new HadoopJobIdFinder(jobConf, context); + + return new YarnApplicationIdFinder(hadoopJobIdFinder, + new YarnApplicationReportReader(jobConf), (WorkflowActionBean) action).find(); + } + catch (final IOException | HadoopAccessorException | JDOMException | InterruptedException | URISyntaxException | + NoSuchAlgorithmException e) { + LOG.warn("Exception while finding YARN application id. Message[{0}]", e.getMessage(), e); + throw convertException(e); + } + } + + /** + * Finds a Hadoop job ID based on {@code action-data.seq} file stored on HDFS by {@link MapReduceMain}. + */ + @VisibleForTesting + static class HadoopJobIdFinder { + private final Configuration jobConf; + private final Context executorContext; + + HadoopJobIdFinder(final Configuration jobConf, final Context executorContext) { + this.jobConf = jobConf; + this.executorContext = executorContext; + } + + String find() throws HadoopAccessorException, IOException, URISyntaxException, InterruptedException { + final FileSystem actionFs = executorContext.getAppFileSystem(); + final Path actionDir = executorContext.getActionDir(); + final Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); + + return actionData.get(LauncherAMUtils.ACTION_DATA_NEW_ID); + } + } + + /** + * Find YARN application ID in three stages: + * <ul> + * <li>based on {@code action-data.seq} written by {@link MapReduceMain}, if already present. If present and is not the + * Oozie Launcher's application ID ({@link WorkflowAction#getExternalId()}), gets used. Else, fall back to following:</li> + * <li>if not found, look up the appropriate YARN child ID</li> + * <li>if an appropriate YARN application ID is not found, go with Oozie Launcher's application ID + * ({@link WorkflowAction#getExternalId()})</li> + * </ul> + */ + @VisibleForTesting + static class YarnApplicationIdFinder { + private static final XLog LOG = XLog.getLog(YarnApplicationIdFinder.class); + + private final HadoopJobIdFinder hadoopJobIdFinder; + private final YarnApplicationReportReader reader; + private final WorkflowActionBean workflowActionBean; + + YarnApplicationIdFinder(final HadoopJobIdFinder hadoopJobIdFinder, + final YarnApplicationReportReader reader, + final WorkflowActionBean workflowActionBean) { + this.hadoopJobIdFinder = hadoopJobIdFinder; + this.reader = reader; + this.workflowActionBean = workflowActionBean; + } + + String find() throws IOException, HadoopAccessorException, URISyntaxException, InterruptedException { + final String newJobId = hadoopJobIdFinder.find(); + if (Strings.isNullOrEmpty(newJobId) && !isHadoopJobId(newJobId)) { + LOG.trace("Is not a Hadoop Job Id, falling back."); + return fallbackToYarnChildOrExternalId(); + } + + final String effectiveApplicationId; + final String newApplicationId = TypeConverter.toYarn(JobID.forName(newJobId)).getAppId().toString(); + + if (workflowActionBean.getExternalId().equals(newApplicationId) || newApplicationId == null) { + LOG.trace("New YARN application ID {0} is empty or is the same as {1}, falling back.", + newApplicationId, workflowActionBean.getExternalId()); + effectiveApplicationId = fallbackToYarnChildOrExternalId(); + } + else { + LOG.trace("New YARN application ID {0} is different, using it.", newApplicationId); + effectiveApplicationId = newApplicationId; + } + + return effectiveApplicationId; + } + + /** + * When a Hadoop could not be found, fall back finding the YARN child application ID, or the workflow's {@code externalId}: + * <ul> + * <li>look for YARN children of the actual {@code WorkflowActionBean}</li> + * <li>filter for type {@code MAPREDUCE}. Note that those will be the YARN application children of the original + * {@code Oozie Launcher} type. Filter also for the ones not in YARN applications' terminal states. What remains is the + * one we call YARN child ID</li> + * <li>if not found, go with {@link WorkflowActionBean#externalId}</li> + * <li>if the found one is not newer than the one already stored, go with {@link WorkflowActionBean#externalId}</li> + * <li>if found and there is no {@link WorkflowActionBean#externalId}, go with the YARN child ID</li> + * <li>else, go with the YARN child ID</li> + * </ul> + * @return the YARN child application's ID, or the workflow action's external ID + */ + private String fallbackToYarnChildOrExternalId() { + final List<ApplicationReport> childYarnApplications = reader.read(); + childYarnApplications.removeIf(new Predicate<ApplicationReport>() { + @Override + public boolean test(ApplicationReport applicationReport) { + return !applicationReport.getApplicationType().equals(YARN_APPLICATION_TYPE_MAPREDUCE); + } + }); + + if (childYarnApplications.isEmpty()) { + LOG.trace("No child YARN applications present, returning {0} instead", workflowActionBean.getExternalId()); + return workflowActionBean.getExternalId(); + } + + final String yarnChildId = getLastYarnId(childYarnApplications); + + if (Strings.isNullOrEmpty(yarnChildId)) { + LOG.trace("yarnChildId is empty, returning {0} instead", workflowActionBean.getExternalId()); + return workflowActionBean.getExternalId(); + } + + if (Strings.isNullOrEmpty(workflowActionBean.getExternalId())) { + LOG.trace("workflowActionBean.externalId is empty, returning {0} instead", yarnChildId); + return yarnChildId; + } + + if (new YarnApplicationIdComparator().compare(yarnChildId, workflowActionBean.getExternalId()) > 0) { + LOG.trace("yarnChildId is newer, returning {0}", yarnChildId); + return yarnChildId; + } + + LOG.trace("yarnChildId is not newer, returning {0}", workflowActionBean.getExternalId()); + return workflowActionBean.getExternalId(); + } + + /** + * Get the biggest YARN application ID given {@link YarnApplicationIdComparator}. + * @param yarnApplications the YARN application reports + * @return the biggest {@link ApplicationReport#getApplicationId()#toString()} + */ + @VisibleForTesting + protected String getLastYarnId(final List<ApplicationReport> yarnApplications) { + Preconditions.checkNotNull(yarnApplications, "YARN application list should be filled"); + Preconditions.checkArgument(!yarnApplications.isEmpty(), "no YARN applications in the list"); + + final Iterable<String> unorderedApplicationIds = + Iterables.transform(yarnApplications, new Function<ApplicationReport, String>() { + @Override + public String apply(final ApplicationReport input) { + Preconditions.checkNotNull(input, "YARN application should be filled"); + return input.getApplicationId().toString(); + } + }); + + return Ordering.from(new YarnApplicationIdComparator()).max(unorderedApplicationIds); + } + + private boolean isHadoopJobId(final String jobIdCandidate) { + try { + return JobID.forName(jobIdCandidate) != null; + } catch (final IllegalArgumentException e) { + LOG.warn("Job ID candidate is not a Hadoop Job ID.", e); + return false; + } + } + } + + /** + * Compares two YARN application IDs in the sense: + * <ul> + * <li>originating from different cluster timestamps the one with the bigger timestamp is considered greater</li> + * <li>originating from the same cluster timestamp the one with the higher sequence number is considered greater</li> + * <li>originating from the same cluster timestamp and with the same sequence number both are considered equal</li> + * </ul> + */ + @VisibleForTesting + @SuppressFBWarnings(value = "SE_COMPARATOR_SHOULD_BE_SERIALIZABLE", justification = "instances will never be serialized") + static class YarnApplicationIdComparator implements Comparator<String> { + private static final String PREFIX = "application_"; + private static final String SEPARATOR = "_"; + + @Override + public int compare(final String left, final String right) { + // Let's say two application IDs with different cluster timestamps are equal + final int middleLongPartComparisonResult = Long.compare(getMiddleLongPart(left), getMiddleLongPart(right)); + if (middleLongPartComparisonResult != 0) { + return middleLongPartComparisonResult; + } + + // Else we compare the sequence number + return Integer.compare(getLastIntegerPart(left), getLastIntegerPart(right)); + } + + private long getMiddleLongPart(final String applicationId) { + return Long.parseLong(applicationId.substring(applicationId.indexOf(PREFIX) + PREFIX.length(), + applicationId.lastIndexOf(SEPARATOR))); + } + + private int getLastIntegerPart(final String applicationId) { + return Integer.parseInt(applicationId.substring(applicationId.lastIndexOf(SEPARATOR) + SEPARATOR.length())); + } + } + + /** + * Encapsulates call to the static method + * {@link LauncherMain#getChildYarnApplications(Configuration, ApplicationsRequestScope, long)} for better testability. + */ + @VisibleForTesting + static class YarnApplicationReportReader { + private final Configuration jobConf; + + YarnApplicationReportReader(final Configuration jobConf) { + this.jobConf = jobConf; + } + + List<ApplicationReport> read() { + return LauncherMain.getChildYarnApplications(jobConf, ApplicationsRequestScope.OWN, 0L); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java index f39bba2..05511e4 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java @@ -21,15 +21,25 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.DagELFunctions; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.action.oozie.JavaSleepAction; import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.wf.KillXCommand; import org.apache.oozie.service.CallbackService; import org.apache.oozie.service.ELService; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; @@ -47,18 +57,27 @@ import org.apache.oozie.workflow.lite.EndNodeDef; import org.apache.oozie.workflow.lite.LiteWorkflowApp; import org.apache.oozie.workflow.lite.StartNodeDef; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.io.Reader; import java.io.StringReader; import java.io.Writer; +import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; public abstract class ActionExecutorTestCase extends XHCatTestCase { + protected static final int JOB_TIMEOUT = 100_000; @Override protected void setUp() throws Exception { @@ -325,4 +344,155 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase { writer.close(); } + protected void writeToFile(final String appXml, final String appPath) throws IOException { + final File wf = new File(URI.create(appPath)); + PrintWriter out = null; + try { + out = new PrintWriter(new FileWriter(wf)); + out.println(appXml); + } + catch (final IOException iex) { + throw iex; + } + finally { + if (out != null) { + out.close(); + } + } + } + + protected String submitWorkflow(final String workflowUri, final OozieClient wfClient) throws OozieClientException { + final Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, workflowUri); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("appName", "var-app-name"); + + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + + return jobId; + } + + protected ApplicationId getChildMRJobApplicationId(final Configuration conf) throws IOException { + final List<ApplicationId> applicationIdList = new ArrayList<>(); + final Path inputDir = new Path(getFsTestCaseDir(), "input"); + final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME); + final FileSystem fs = FileSystem.get(conf); + + // wait until we have the running child MR job's ID from HDFS + waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile)); + if (!fs.exists(wfIDFile) || !fs.isFile(wfIDFile)) { + throw new IOException("Workflow ID file does not exist: " + wfIDFile.toString()); + } + + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) { + final String line = reader.readLine(); + JobID.forName(line); + final String jobID = line; + final String appID = jobID.replace("job", "application"); + final ApplicationId id = ConverterUtils.toApplicationId(appID); + applicationIdList.add(id); + } + + assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(), + applicationIdList.size() == 1); + + return applicationIdList.get(0); + } + + private static class ApplicationIdExistsPredicate implements Predicate { + private final FileSystem fs; + private final Path wfIDFile; + + ApplicationIdExistsPredicate(final FileSystem fs, final Path wfIDFile) { + this.fs = fs; + this.wfIDFile = wfIDFile; + } + + @Override + public boolean evaluate() throws Exception { + return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0; + } + } + + protected static class WorkflowActionRunningPredicate extends WorkflowActionStatusPredicate { + WorkflowActionRunningPredicate(final OozieClient wfClient, final String jobId) { + super(wfClient, jobId, WorkflowJob.Status.RUNNING, WorkflowAction.Status.RUNNING); + } + } + + protected static class WorkflowActionKilledPredicate extends WorkflowActionStatusPredicate { + WorkflowActionKilledPredicate(final OozieClient wfClient, final String jobId) { + super(wfClient, jobId, WorkflowJob.Status.KILLED, WorkflowAction.Status.KILLED); + } + } + + private static abstract class WorkflowActionStatusPredicate implements Predicate { + private final OozieClient wfClient; + private final String jobId; + private final WorkflowJob.Status expectedWorkflowJobStatus; + private final WorkflowAction.Status expectedWorkflowActionStatus; + + WorkflowActionStatusPredicate(final OozieClient wfClient, + final String jobId, + final WorkflowJob.Status expectedWorkflowJobStatus, + final WorkflowAction.Status expectedWorkflowActionStatus) { + this.wfClient = wfClient; + this.jobId = jobId; + this.expectedWorkflowJobStatus = expectedWorkflowJobStatus; + this.expectedWorkflowActionStatus = expectedWorkflowActionStatus; + } + + @Override + public boolean evaluate() throws Exception { + final WorkflowJob.Status actualWorkflowJobStatus = wfClient.getJobInfo(jobId).getStatus(); + final boolean isWorkflowInState = actualWorkflowJobStatus.equals(expectedWorkflowJobStatus); + + final WorkflowAction.Status actualWorkflowActionStatus = wfClient.getJobInfo(jobId).getActions().get(1).getStatus(); + final boolean isWorkflowActionInState = actualWorkflowActionStatus.equals(expectedWorkflowActionStatus); + + return isWorkflowInState && isWorkflowActionInState; + } + } + + protected void killWorkflow(final String jobId) throws CommandException { + new KillXCommand(jobId).call(); + } + + protected void waitForWorkflowToStart(final OozieClient wfClient, final String jobId) { + waitFor(JOB_TIMEOUT, new WorkflowActionRunningPredicate(wfClient,jobId)); + } + + protected void waitForWorkflowToKill(final OozieClient wfClient, final String jobId) { + waitFor(JOB_TIMEOUT, new WorkflowActionKilledPredicate(wfClient,jobId)); + } + + protected String getJavaAction(final boolean launchMRAction) { + final Path inputDir = new Path(getFsTestCaseDir(), "input"); + final Path outputDir = new Path(getFsTestCaseDir(), "output"); + final String javaActionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" + + "</java>"; + final String javaWithMRActionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" + + "<arg>javamapreduce</arg>" + + "<arg>"+inputDir.toString()+"</arg>" + + "<arg>"+outputDir.toString()+"</arg>" + + "</java>"; + + return launchMRAction ? javaWithMRActionXml : javaActionXml; + } + + void killYarnApplication(final Configuration configuration, final ApplicationId yarnApplicationId) + throws HadoopAccessorException, IOException, YarnException { + getHadoopAccessorService().createYarnClient(getTestUser(), configuration).killApplication(yarnApplicationId); + } + + HadoopAccessorService getHadoopAccessorService() { + return Services.get().get(HadoopAccessorService.class); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index a31079a..784dc96 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -2616,11 +2616,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { waitUntilYarnAppDoneAndAssertSuccess(applicationId); } - private HadoopAccessorService getHadoopAccessorService() { - return Services.get().get(HadoopAccessorService.class); - } - - public void testChildKill() throws Exception { final JobConf clusterConf = createJobConf(); FileSystem fileSystem = FileSystem.get(clusterConf); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java new file mode 100644 index 0000000..aabb363 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdComparator.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.junit.Assert.assertEquals; + +public class TestYarnApplicationIdComparator { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private MapReduceActionExecutor.YarnApplicationIdComparator comparator; + + @Before + public void setUp() { + this.comparator = new MapReduceActionExecutor.YarnApplicationIdComparator(); + } + + @Test + public void whenWrongParametersGivenExceptionIsThrown() { + expectedException.expect(NullPointerException.class); + comparator.compare(null, null); + + expectedException.expect(NumberFormatException.class); + comparator.compare("application_a_b", "application_c_d"); + + expectedException.expect(IndexOutOfBoundsException.class); + comparator.compare("a_b_c", "d_e_f"); + } + + @Test + public void whenDifferentTimestampsLeftEqualsRight() { + assertEquals("cluster timestamps are different, the one with bigger timestamp wins", + -1, + comparator.compare("application_1534164756526_0001", "application_1534164756527_0002")); + } + + @Test + public void whenSameTimestampsGreaterSequenceWins() { + assertEquals("cluster timestamps are the same but sequences are different, left should be greater than right", + 1, + comparator.compare("application_1534164756526_0002", "application_1534164756526_0001")); + } + + @Test + public void whenSameTimestampsAndSameSequencesLeftEqualsRight() { + assertEquals("cluster timestamps and sequences are the same, left should equal right", + 0, + comparator.compare("application_1534164756526_0001", "application_1534164756526_0001")); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java new file mode 100644 index 0000000..3fd7149 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestYarnApplicationIdFinder.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import com.google.common.collect.Lists; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.service.HadoopAccessorException; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@RunWith(MockitoJUnitRunner.class) +public class TestYarnApplicationIdFinder { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Mock + private MapReduceActionExecutor.HadoopJobIdFinder hadoopJobIdFinder; + + @Mock + private MapReduceActionExecutor.YarnApplicationReportReader reader; + + @Mock + private WorkflowActionBean workflowActionBean; + + @Mock + private ApplicationReport applicationReport; + + @Mock + private ApplicationId applicationId; + + private MapReduceActionExecutor.YarnApplicationIdFinder yarnApplicationIdFinder; + + @Before + public void setUp() throws Exception { + yarnApplicationIdFinder = new MapReduceActionExecutor.YarnApplicationIdFinder(hadoopJobIdFinder, + reader, workflowActionBean); + } + + @Test + public void whenHadoopJobIdAndChildYarnApplicationAreNotPresentActionExternalIdIsFound() throws Exception { + when(hadoopJobIdFinder.find()).thenReturn(null); + when(reader.read()).thenReturn(Collections.emptyList()); + when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0000"); + + assertEquals("no Hadoop Job ID nor YARN applications: WorkflowActionBean.externalId should be found", + "application_1534164756526_0000", + yarnApplicationIdFinder.find()); + + when(applicationReport.getApplicationType()).thenReturn("Oozie Launcher"); + when(applicationReport.getApplicationId()).thenReturn(applicationId); + when(applicationId.toString()).thenReturn("application_1534164756526_0001"); + when(reader.read()).thenReturn(Lists.newArrayList(applicationReport)); + + assertEquals( + "no Hadoop Job ID nor YARN applications of MAPREDUCE type: WorkflowActionBean.externalId should be found", + "application_1534164756526_0000", + yarnApplicationIdFinder.find()); + + when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE"); + when(workflowActionBean.getWfId()).thenReturn("workflowId"); + + assertEquals( + "no Hadoop Job ID nor YARN applications of the same workflow: WorkflowActionBean.externalId should be found", + "application_1534164756526_0000", + yarnApplicationIdFinder.find()); + } + + @Test + public void whenHadoopJobIdIsNotCorrectExceptionIsThrown() throws Exception { + when(hadoopJobIdFinder.find()).thenReturn("notAHadoopJobId"); + expectedException.expect(IllegalArgumentException.class); + + yarnApplicationIdFinder.find(); + } + + @Test + public void whenHadoopJobIdIsNotPresentChildYarnApplicationIdIsFound() throws Exception { + when(hadoopJobIdFinder.find()).thenReturn(null); + when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE"); + when(workflowActionBean.getWfId()).thenReturn("workflowId"); + when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING); + when(applicationId.toString()).thenReturn("application_1534164756526_0000"); + when(applicationReport.getApplicationId()).thenReturn(applicationId); + when(reader.read()).thenReturn(Lists.newArrayList(applicationReport)); + + assertEquals("no Hadoop Job ID, but an appropriate YARN application: applicationId should be found", + "application_1534164756526_0000", + yarnApplicationIdFinder.find()); + } + + @Test + public void whenHadoopJobIsNotPresentAsYarnApplicationHadoopJobIdIsUsed() throws Exception { + setupMocks("job_1534164756526_0002", "application_1534164756526_0000", "application_1534164756526_0001"); + + assertEquals("Hadoop Job ID should be found when it is not present as a YARN application ID", + "application_1534164756526_0002", + yarnApplicationIdFinder.find()); + } + + private void setupMocks(final String mrJobId, final String wfExternalId, final String yarnApplicationId) + throws HadoopAccessorException, IOException, URISyntaxException, InterruptedException, YarnException { + when(hadoopJobIdFinder.find()).thenReturn(mrJobId); + when(applicationReport.getApplicationType()).thenReturn("MAPREDUCE"); + when(workflowActionBean.getWfId()).thenReturn("workflowId"); + when(workflowActionBean.getExternalId()).thenReturn(wfExternalId); + when(applicationReport.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING); + when(applicationId.toString()).thenReturn(yarnApplicationId); + when(applicationReport.getApplicationId()).thenReturn(applicationId); + when(reader.read()).thenReturn(Lists.newArrayList(applicationReport)); + } + + @Test + public void whenHadoopJobIsPresentAsYarnApplicationAndDifferentFromItsUsed() throws Exception { + setupMocks("job_1534164756526_0002", "application_1534164756526_0001", "application_1534164756526_0003"); + + assertEquals("Hadoop Job ID should be found when different from the YARN application ID", + "application_1534164756526_0002", + yarnApplicationIdFinder.find()); + } + + @Test + public void whenHadoopJobIsPresentAsYarnApplicationAndContainWorkflowIdNotUsed() throws Exception { + setupMocks("job_1534164756526_0002", "application_1534164756526_0002", "application_1534164756526_0003"); + + assertEquals("YARN application ID should be found when greater than WorkflowActionBean.externalId", + "application_1534164756526_0003", + yarnApplicationIdFinder.find()); + } + + @Test + public void whenOldLauncherAndMRobApplicationsAreFinishedAndNewLauncherPresentNewLauncherIsUsed() throws Exception { + final ApplicationReport oldLauncher = mock(ApplicationReport.class); + when(oldLauncher.getApplicationType()).thenReturn("Oozie Launcher"); + when(oldLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + final ApplicationId oldLauncherId = mock(ApplicationId.class); + when(oldLauncherId.toString()).thenReturn("application_1534164756526_0001"); + when(oldLauncher.getApplicationId()).thenReturn(oldLauncherId); + final ApplicationReport oldMRJob = mock(ApplicationReport.class); + when(oldMRJob.getApplicationType()).thenReturn("MAPREDUCE"); + when(oldMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + final ApplicationId oldMRJobId = mock(ApplicationId.class); + when(oldMRJobId.toString()).thenReturn("application_1534164756526_0002"); + when(oldMRJob.getApplicationId()).thenReturn(oldMRJobId); + final ApplicationReport newLauncher = mock(ApplicationReport.class); + when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher"); + when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + final ApplicationId newLauncherId = mock(ApplicationId.class); + when(newLauncherId.toString()).thenReturn("application_1534164756526_0003"); + when(newLauncher.getApplicationId()).thenReturn(newLauncherId); + final ApplicationReport newMRJob = mock(ApplicationReport.class); + when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE"); + when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING); + final ApplicationId newMRJobId = mock(ApplicationId.class); + when(newMRJobId.toString()).thenReturn("application_1534164756526_0004"); + when(newMRJob.getApplicationId()).thenReturn(newMRJobId); + when(reader.read()).thenReturn(Lists.newArrayList(oldLauncher, oldMRJob, newLauncher, newMRJob)); + + when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0003"); + assertEquals("newLauncher should be found", "application_1534164756526_0004", yarnApplicationIdFinder.find()); + + when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0004"); + assertEquals("newLauncher should be found", "application_1534164756526_0004", yarnApplicationIdFinder.find()); + + when(workflowActionBean.getExternalId()).thenReturn("application_1534164756526_0005"); + assertEquals("workflowActionBean.externalId should be found", + "application_1534164756526_0005", yarnApplicationIdFinder.find()); + } + + @Test + public void testGetLastYarnIdOnNullThrows() { + expectedException.expect(NullPointerException.class); + yarnApplicationIdFinder.getLastYarnId(null); + } + + @Test + public void testGetLastYarnIdOnEmptyListThrows() { + expectedException.expect(IllegalArgumentException.class); + yarnApplicationIdFinder.getLastYarnId(Collections.emptyList()); + } + + @Test + public void testGetLastYarnIdOnOneElementSuccess() { + when(applicationReport.getApplicationId()).thenReturn(applicationId); + when(applicationId.toString()).thenReturn("application_1534164756526_0000"); + + final String lastYarnId = yarnApplicationIdFinder.getLastYarnId(Collections.singletonList(applicationReport)); + assertEquals("last YARN id should be the only element in the list", "application_1534164756526_0000", lastYarnId); + } + + @Test + public void testGetLastYarnIdFromUnorderedListSuccess() { + final ApplicationReport newLauncher = mock(ApplicationReport.class); + when(newLauncher.getApplicationType()).thenReturn("Oozie Launcher"); + when(newLauncher.getYarnApplicationState()).thenReturn(YarnApplicationState.FINISHED); + final ApplicationId newLauncherId = mock(ApplicationId.class); + when(newLauncherId.toString()).thenReturn("application_1534164756526_0003"); + when(newLauncher.getApplicationId()).thenReturn(newLauncherId); + final ApplicationReport newMRJob = mock(ApplicationReport.class); + when(newMRJob.getApplicationType()).thenReturn("MAPREDUCE"); + when(newMRJob.getYarnApplicationState()).thenReturn(YarnApplicationState.RUNNING); + final ApplicationId newMRJobId = mock(ApplicationId.class); + when(newMRJobId.toString()).thenReturn("application_1534164756526_0004"); + when(newMRJob.getApplicationId()).thenReturn(newMRJobId); + + final String lastYarnId = yarnApplicationIdFinder.getLastYarnId(Lists.newArrayList(newMRJob, newLauncher)); + assertEquals("last YARN id should be the maximal element in the list", "application_1534164756526_0004", lastYarnId); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java index 893405e..9c7f821 100644 --- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java @@ -21,20 +21,14 @@ package org.apache.oozie.action.oozie; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.ActionExecutorTestCase; import org.apache.oozie.action.hadoop.LauncherMainTester; -import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.wf.KillXCommand; import org.apache.oozie.command.wf.SuspendXCommand; import org.apache.oozie.local.LocalOozie; import org.apache.oozie.service.HadoopAccessorService; @@ -45,22 +39,13 @@ import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; -import java.io.BufferedReader; -import java.io.File; -import java.io.FileWriter; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.io.PrintWriter; import java.io.StringReader; import java.io.Writer; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; import java.util.Properties; public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { - private static final int JOB_TIMEOUT = 100 * 1000; public void testType() { SubWorkflowActionExecutor subWorkflow = new SubWorkflowActionExecutor(); @@ -506,7 +491,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { final OozieClient wfClient = LocalOozie.getClient(); final String jobId = submitWorkflow(workflowUri, wfClient); - waitForSubWFtoStart(wfClient, jobId); + waitForWorkflowToStart(wfClient, jobId); WorkflowJob wf = wfClient.getJobInfo(jobId); // Suspending subworkflow new SuspendXCommand(wf.getActions().get(1).getExternalId()).call(); @@ -529,7 +514,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { final String jobId = submitWorkflow(workflowUri, wfClient); final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); - waitForSubWFtoStart(wfClient, jobId); + waitForWorkflowToStart(wfClient, jobId); final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf); killWorkflow(jobId); @@ -540,70 +525,11 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { } - private void killWorkflow(String jobId) throws CommandException { - new KillXCommand(jobId).call(); - } - - private ApplicationId getChildMRJobApplicationId(Configuration conf) throws IOException { - final List<ApplicationId> applicationIdList = new ArrayList<>(); - final Path inputDir = new Path(getFsTestCaseDir(), "input"); - final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME); - final FileSystem fs = FileSystem.get(conf); - - // wait until we have the running child MR job's ID from HDFS - waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile)); - - try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) { - String line = reader.readLine(); - JobID.forName(line); - String jobID = line; - String appID = jobID.replace("job", "application"); - ApplicationId id = ConverterUtils.toApplicationId(appID); - applicationIdList.add(id); - } - - assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(), - applicationIdList.size() == 1); - return applicationIdList.get(0); - } - - private void waitForSubWFtoStart(OozieClient wfClient, String jobId) { - waitFor(JOB_TIMEOUT, new SubWorkflowActionRunningPredicate(wfClient,jobId)); - } - - private String submitWorkflow(String workflowUri, OozieClient wfClient) throws OozieClientException { - Properties conf = wfClient.createConfiguration(); - conf.setProperty(OozieClient.APP_PATH, workflowUri); - conf.setProperty(OozieClient.USER_NAME, getTestUser()); - conf.setProperty("appName", "var-app-name"); - final String jobId = wfClient.submit(conf); - wfClient.start(jobId); - return jobId; - } - - private void writeToFile(String appXml, String appPath) throws IOException { - // TODO Auto-generated method stub - File wf = new File(URI.create(appPath)); - PrintWriter out = null; - try { - out = new PrintWriter(new FileWriter(wf)); - out.println(appXml); - } - catch (IOException iex) { - throw iex; - } - finally { - if (out != null) { - out.close(); - } - } - } - public String getLazyWorkflow(boolean launchMRAction) { return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" + "<start to='java' />" + " <action name='java'>" + - getAction(launchMRAction) + getJavaAction(launchMRAction) + "<ok to='end' />" + "<error to='fail' />" + "</action>" @@ -614,26 +540,6 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { + "</workflow-app>"; } - private String getAction(boolean launchMRAction) { - Path inputDir = new Path(getFsTestCaseDir(), "input"); - Path outputDir = new Path(getFsTestCaseDir(), "output"); - String javaActionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" + - "</java>"; - String javaWithMRActionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" + - "<arg>javamapreduce</arg>" + - "<arg>"+inputDir.toString()+"</arg>" + - "<arg>"+outputDir.toString()+"</arg>" + - "</java>"; - String actionXml = launchMRAction ? javaWithMRActionXml : javaActionXml; - return actionXml; - } - public void testSubWorkflowRerun() throws Exception { try { String workflowUri = createSubWorkflowWithLazyAction(false); @@ -646,7 +552,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { final String jobId = wfClient.submit(conf); wfClient.start(jobId); - waitForSubWFtoStart(wfClient, jobId); + waitForWorkflowToStart(wfClient, jobId); String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId(); wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId()); @@ -925,38 +831,4 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { + "<end name='end' />" + "</workflow-app>"; } - - private static class ApplicationIdExistsPredicate implements Predicate { - - private final FileSystem fs; - private final Path wfIDFile; - - public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) { - this.fs = fs; - this.wfIDFile = wfIDFile; - } - - @Override - public boolean evaluate() throws Exception { - return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0; - } - } - - private static class SubWorkflowActionRunningPredicate implements Predicate { - private final OozieClient wfClient; - private final String jobId; - - public SubWorkflowActionRunningPredicate(OozieClient wfClient, String jobId) { - this.wfClient = wfClient; - this.jobId = jobId; - } - - @Override - public boolean evaluate() throws Exception { - boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING; - boolean isSubWfActionRunning = wfClient.getJobInfo(jobId) - .getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING; - return isSubWfRunning && isSubWfActionRunning; - } - } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 371a60d..b835951 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3298 [MapReduce action] External ID is not filled properly and failing MR job is treated as SUCCEEDED (andras.piros via pbacsko, asasvari, gezapeti) OOZIE-3317 amend [build] Fix false positive precommit reports (kmarton via andras.piros) OOZIE-3160 PriorityDelayQueue put()/take() can cause significant CPU load due to busy waiting (pbacsko) OOZIE-2877 Git action (clayb, andras.piros via pbacsko, gezapeti) http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index c9e2a91..b6599f7 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -199,37 +199,9 @@ public abstract class LauncherMain { public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope, long startTime) { - Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); - String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); - if (tag == null) { - System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS); - return childYarnJobs; - } - - System.out.println("tag id : " + tag); - GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); - gar.setScope(scope); - gar.setApplicationTags(Collections.singleton(tag)); - long endTime = System.currentTimeMillis(); - if (startTime > endTime) { - System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " + - "Attempting to work around..."); - // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an - // offset in both directions - long diff = 2 * (startTime - endTime); - startTime = startTime - diff; - endTime = endTime + diff; - } - gar.setStartRange(startTime, endTime); - try { - ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class); - GetApplicationsResponse apps = proxy.getApplications(gar); - List<ApplicationReport> appsList = apps.getApplicationList(); - for(ApplicationReport appReport : appsList) { - childYarnJobs.add(appReport.getApplicationId()); - } - } catch (YarnException | IOException ioe) { - throw new RuntimeException("Exception occurred while finding child jobs", ioe); + final Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); + for (final ApplicationReport applicationReport : getChildYarnApplications(actionConf, scope, startTime)) { + childYarnJobs.add(applicationReport.getApplicationId()); } if (childYarnJobs.isEmpty()) { @@ -254,6 +226,39 @@ public abstract class LauncherMain { return getChildYarnJobs(actionConf, scope, startTime); } + public static List<ApplicationReport> getChildYarnApplications(final Configuration actionConf, + final ApplicationsRequestScope scope, + long startTime) { + final String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); + if (tag == null) { + System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS); + return Collections.emptyList(); + } + + System.out.println("tag id : " + tag); + final GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(scope); + gar.setApplicationTags(Collections.singleton(tag)); + long endTime = System.currentTimeMillis(); + if (startTime > endTime) { + System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " + + "Attempting to work around..."); + // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an + // offset in both directions + final long diff = 2 * (startTime - endTime); + startTime = startTime - diff; + endTime = endTime + diff; + } + gar.setStartRange(startTime, endTime); + try { + final ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class); + final GetApplicationsResponse apps = proxy.getApplications(gar); + return apps.getApplicationList(); + } catch (final YarnException | IOException e) { + throw new RuntimeException("Exception occurred while finding child jobs", e); + } + } + public static void killChildYarnJobs(Configuration actionConf) { try { Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf); http://git-wip-us.apache.org/repos/asf/oozie/blob/d5c4f3b7/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index f460b6b..68e83fa 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -56,9 +56,11 @@ import org.apache.hadoop.streaming.StreamJob; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; @@ -67,6 +69,8 @@ import org.apache.oozie.command.wf.StartXCommand; import org.apache.oozie.command.wf.SubmitXCommand; import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; @@ -74,10 +78,12 @@ import org.apache.oozie.util.ClassUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.PropertiesUtils; import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; public class TestMapReduceActionExecutor extends ActionExecutorTestCase { + private static final XLog LOG = XLog.getLog(TestMapReduceActionExecutor.class); private static final String PIPES = "pipes"; private static final String MAP_REDUCE = "map-reduce"; @@ -85,7 +91,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { @Override protected void setSystemProps() throws Exception { super.setSystemProps(); - setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName()); + setSystemProperty("oozie.service.ActionService.executor.classes", + String.join(",", MapReduceActionExecutor.class.getName(), JavaActionExecutor.class.getName())); setSystemProperty("oozie.credentials.credentialclasses", "cred=org.apache.oozie.action.hadoop.CredentialForTest"); } @@ -803,10 +810,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { w.close(); } - private HadoopAccessorService getHadoopAccessorService() { - return Services.get().get(HadoopAccessorService.class); - } - public void testMapReduceWithUberJarEnabled() throws Exception { Services serv = Services.get(); boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false); @@ -1306,4 +1309,162 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { }); } + public void testFailingMapReduceJobCausesOozieLauncherAMToFail() throws Exception { + final String workflowUri = createWorkflowWithMapReduceAction(); + + startWorkflowAndFailChildMRJob(workflowUri); + } + + private String createWorkflowWithMapReduceAction() throws IOException { + final String workflowUri = getTestCaseFileUri("workflow.xml"); + final String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" + + " <start to=\"map-reduce\"/>" + + " <action name=\"map-reduce\">" + + " <map-reduce>" + + " <resource-manager>" + getJobTrackerUri() + "</resource-manager>" + + " <name-node>" + getNameNodeUri() + "</name-node>" + + " <configuration>\n" + + " <property>\n" + + " <name>mapred.job.queue.name</name>\n" + + " <value>default</value>\n" + + " </property>\n" + + " <property>\n" + + " <name>mapred.mapper.class</name>\n" + + " <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" + + " </property>\n" + + " <property>\n" + + " <name>mapred.reducer.class</name>\n" + + " <value>org.apache.oozie.action.hadoop.SleepMapperReducerForTest</value>\n" + + " </property>\n" + + " <property>\n" + + " <name>mapred.input.dir</name>\n" + + " <value>" + getFsTestCaseDir() + "/input</value>\n" + + " </property>\n" + + " <property>\n" + + " <name>mapred.output.dir</name>\n" + + " <value>" + getFsTestCaseDir() + "/output</value>\n" + + " </property>\n" + + " </configuration>\n" + + " </map-reduce>" + + " <ok to=\"end\"/>" + + " <error to=\"fail\"/>" + + " </action>" + + " <kill name=\"fail\">" + + " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + " </kill>" + + " <end name=\"end\"/>" + + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + + return workflowUri; + } + + private void startWorkflowAndFailChildMRJob(final String workflowUri) throws Exception { + try { + LocalOozie.start(); + final OozieClient wfClient = LocalOozie.getClient(); + final String workflowId = submitWorkflow(workflowUri, wfClient); + final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); + + final Path inputFolder = createInputFolder(conf); + + waitForWorkflowToStart(wfClient, workflowId); + waitForChildYarnApplication(getHadoopAccessorService().createYarnClient(getTestUser(), conf), workflowId); + assertAndWriteNextMRJobId(workflowId, conf, inputFolder); + + final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf); + + killYarnApplication(conf, externalChildJobId); + waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString()); + waitForWorkflowToKill(wfClient, workflowId); + } finally { + LocalOozie.stop(); + } + } + + /** + * Get all YARN application IDs, select the one of type {@code MAPREDUCE} that is relevant to {@code workflowId}, + * and write to {@code inputFolder/jobID.txt}. + * <p> + * Simulating functional parts of {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} in order + * {@link MapReduceActionExecutor#check(ActionExecutor.Context, WorkflowAction)} can find it later on the call chain. + * <p> + * We need to write out an own sequence file to {@link LauncherMainTester#JOB_ID_FILE_NAME} in order + * {@link ActionExecutorTestCase#getChildMRJobApplicationId(Configuration)} can find it. We unfortunately cannot rely on the + * original sequence file written by {@link LauncherMain#writeExternalChildIDs(String, Pattern[], String)} because we don't own + * a reference to the original {@link ActionExecutor.Context} as in {@link MapReduceActionExecutor}. + * @param workflowId the workflow ID + * @param conf the {@link Configuration} used for Hadoop Common / YARN API calls + * @param inputFolder where to write the output text file + * @throws IOException when the output text file cannot be written + * @throws YarnException when the list of YARN applications cannot be queried + * @throws HadoopAccessorException when {@link YarnClient} cannot be created + */ + private void assertAndWriteNextMRJobId(final String workflowId, final Configuration conf, final Path inputFolder) + throws IOException, YarnException, HadoopAccessorException { + final Path wfIDFile = new Path(inputFolder, LauncherMainTester.JOB_ID_FILE_NAME); + try (final FileSystem fs = FileSystem.get(conf); + final Writer w = new OutputStreamWriter(fs.create(wfIDFile))) { + final List<ApplicationReport> allApplications = + getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplications(); + + assertTrue("YARN applications number mismatch", allApplications.size() >= 2); + + ApplicationReport mapReduce = null; + for (final ApplicationReport candidate : allApplications) { + if (candidate.getApplicationType().equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE) + && candidate.getName().contains(workflowId)) { + mapReduce = candidate; + } + } + assertNotNull("MAPREDUCE YARN application not found", mapReduce); + + final String applicationId = mapReduce.getApplicationId().toString(); + final String nextMRJobId = applicationId.replace("application", "job"); + + LOG.debug("Writing next MapReduce job ID: {0}", nextMRJobId); + + w.write(nextMRJobId); + } + } + + private Path createInputFolder(final Configuration conf) throws IOException { + final Path inputDir = new Path(getFsTestCaseDir(), "input"); + try (final FileSystem fs = FileSystem.get(conf)) { + fs.mkdirs(inputDir); + } + return inputDir; + } + + private void waitForChildYarnApplication(final YarnClient yarnClient, final String workflowId) { + waitFor(JOB_TIMEOUT, new ChildYarnApplicationPresentPredicate(yarnClient, workflowId)); + } + + private class ChildYarnApplicationPresentPredicate implements Predicate { + private final YarnClient yarnClient; + private final String workflowId; + + ChildYarnApplicationPresentPredicate(final YarnClient yarnClient, final String workflowId) { + this.yarnClient = yarnClient; + this.workflowId = workflowId; + } + + @Override + public boolean evaluate() throws Exception { + if (yarnClient.getApplications().isEmpty()) { + return false; + } + + for (final ApplicationReport applicationReport : yarnClient.getApplications()) { + final String name = applicationReport.getName(); + final String type = applicationReport.getApplicationType(); + if (type.equals(MapReduceActionExecutor.YARN_APPLICATION_TYPE_MAPREDUCE) && name.contains(workflowId)) { + return true; + } + } + + return false; + } + } }