Repository: oozie Updated Branches: refs/heads/master 1c4d56164 -> 2322d496c
OOZIE-2436 Fork/join workflow fails with oozie.action.yarn.tag must not be null Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/2322d496 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/2322d496 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/2322d496 Branch: refs/heads/master Commit: 2322d496c73dac43859771a9564776c098289e5e Parents: 1c4d561 Author: Purshotam Shah <[email protected]> Authored: Fri Jul 1 13:50:53 2016 -0700 Committer: Purshotam Shah <[email protected]> Committed: Fri Jul 1 13:50:53 2016 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/action/ActionExecutor.java | 26 ++++++++ .../oozie/action/hadoop/JavaActionExecutor.java | 43 ++++++------- .../action/oozie/SubWorkflowActionExecutor.java | 9 ++- .../oozie/command/wf/ActionStartXCommand.java | 30 ++++----- .../apache/oozie/command/wf/ActionXCommand.java | 40 +++++++++++- .../command/wf/ForkedActionStartXCommand.java | 7 +++ .../apache/oozie/command/wf/SignalXCommand.java | 65 +++++++++++++------- .../oozie/workflow/lite/ControlNodeHandler.java | 6 ++ .../action/hadoop/TestJavaActionExecutor.java | 6 ++ release-log.txt | 1 + 10 files changed, 162 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java index 2be4549..3f978fd 100644 --- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java @@ -56,6 +56,9 @@ public abstract class ActionExecutor { public static final String ACTION_RETRY_POLICY = CONF_PREFIX + "retry.policy"; + public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + + /** * Error code used by {@link #convertException} when there is not register error information for an exception. */ @@ -581,4 +584,27 @@ public abstract class ActionExecutor { public boolean supportsConfigurationJobXML() { return false; } + + /** + * Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only + * one child job is running. Tag is formed as follows: + * For workflow job, tag = action-id + * For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else + * coord-action-id@subflow-action-name@action-name. + * @param conf the conf + * @param wfJob the wf job + * @param action the action + * @return the action yarn tag + */ + public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) { + if (conf.get(OOZIE_ACTION_YARN_TAG) != null) { + return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName(); + } + else if (wfJob.getParentId() != null) { + return wfJob.getParentId() + "@" + action.getName(); + } + else { + return action.getId(); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 639003e..f2273d6 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -62,7 +62,6 @@ import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.command.coord.CoordActionStartXCommand; -import org.apache.oozie.command.wf.ActionStartXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; @@ -125,6 +124,8 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; + public XConfiguration workflowConf = null; + static { DISALLOWED_PROPERTIES.add(HADOOP_USER); DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER); @@ -852,7 +853,7 @@ public class JavaActionExecutor extends ActionExecutor { throws ActionExecutorException { XConfiguration wfJobConf = null; try { - wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); + wfJobConf = getWorkflowConf(context); } catch (IOException ioe) { throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", @@ -915,14 +916,6 @@ public class JavaActionExecutor extends ActionExecutor { launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true); setupLauncherConf(launcherJobConf, actionXml, appPathRoot, context); - String launcherTag = null; - // Extracting tag and appending action name to maintain the uniqueness. - if (context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) { - launcherTag = context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG); - } else { //Keeping it to maintain backward compatibly with test cases. - launcherTag = action.getId(); - } - // Properties for when a launcher job's AM gets restarted if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) { // launcher time filter is required to prune the search of launcher tag. @@ -930,14 +923,16 @@ public class JavaActionExecutor extends ActionExecutor { // time. Workflow created time is good enough when workflow is running independently or workflow is // rerunning from failed node. long launcherTime = System.currentTimeMillis(); - String coordActionNominalTime = context.getProtoActionConf() - .get(CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME); + String coordActionNominalTime = context.getProtoActionConf().get( + CoordActionStartXCommand.OOZIE_COORD_ACTION_NOMINAL_TIME); if (coordActionNominalTime != null) { launcherTime = Long.parseLong(coordActionNominalTime); - } else if (context.getWorkflow().getCreatedTime() != null) { + } + else if (context.getWorkflow().getCreatedTime() != null) { launcherTime = context.getWorkflow().getCreatedTime().getTime(); } - LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag, launcherTime); + String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action); + LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime); } else { LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties", @@ -1237,13 +1232,7 @@ public class JavaActionExecutor extends ActionExecutor { HashMap<String, CredentialsProperties> credPropertiesMap = null; if (context != null && action != null) { if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { - XConfiguration wfJobConf = null; - try { - wfJobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); - } catch (IOException ioe) { - throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "It should never happen", - ioe.getMessage()); - } + XConfiguration wfJobConf = getWorkflowConf(context); if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { credPropertiesMap = getActionCredentialsProperties(context, action); @@ -1327,7 +1316,7 @@ public class JavaActionExecutor extends ActionExecutor { throws Exception { CredentialsProperties credProp = null; String workflowXml = ((WorkflowJobBean) context.getWorkflow()).getWorkflowInstance().getApp().getDefinition(); - XConfiguration wfjobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); + XConfiguration wfjobConf = getWorkflowConf(context); Element elementJob = XmlUtils.parseXml(workflowXml); Element credentials = elementJob.getChild("credentials", elementJob.getNamespace()); if (credentials != null) { @@ -1675,7 +1664,7 @@ public class JavaActionExecutor extends ActionExecutor { String[] names = conf.getStrings(ACTION_SHARELIB_FOR + getType()); if (names == null || names.length == 0) { try { - XConfiguration jobConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); + XConfiguration jobConf = getWorkflowConf(context); names = jobConf.getStrings(ACTION_SHARELIB_FOR + getType()); if (names == null || names.length == 0) { names = Services.get().getConf().getStrings(ACTION_SHARELIB_FOR + getType()); @@ -1745,4 +1734,12 @@ public class JavaActionExecutor extends ActionExecutor { public boolean supportsConfigurationJobXML() { return true; } + + private XConfiguration getWorkflowConf(Context context) throws IOException { + if (workflowConf == null) { + workflowConf = new XConfiguration(new StringReader(context.getWorkflow().getConf())); + } + return workflowConf; + + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java index f77e52c..1ea7097 100644 --- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java @@ -58,6 +58,8 @@ public class SubWorkflowActionExecutor extends ActionExecutor { public static final String SUBWORKFLOW_RERUN = "oozie.action.subworkflow.rerun"; private static final Set<String> DISALLOWED_DEFAULT_PROPERTIES = new HashSet<String>(); + public XLog LOG = XLog.getLog(getClass()); + static { String[] badUserProps = {PropertiesUtils.DAYS, PropertiesUtils.HOURS, PropertiesUtils.MINUTES, @@ -220,11 +222,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor { JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(), subWorkflowConf); - // pushing the tag to conf for using by Launcher. - if(context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG) != null) { - subWorkflowConf.set(ActionStartXCommand.OOZIE_ACTION_YARN_TAG, - context.getVar(ActionStartXCommand.OOZIE_ACTION_YARN_TAG)); - } + subWorkflowConf.set(OOZIE_ACTION_YARN_TAG, getActionYarnTag(parentConf, context.getWorkflow(), action)); // if the rerun failed node option is provided during the time of rerun command, old subworkflow will // rerun again. @@ -247,6 +245,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor { } } catch (Exception ex) { + LOG.error(ex); throw convertException(ex); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java index 8b0be9c..41f4430 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java @@ -67,11 +67,10 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command public static final String COULD_NOT_START = "COULD_NOT_START"; public static final String START_DATA_MISSING = "START_DATA_MISSING"; public static final String EXEC_DATA_MISSING = "EXEC_DATA_MISSING"; - public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; private String jobId = null; protected String actionId = null; - private WorkflowJobBean wfJob = null; + protected WorkflowJobBean wfJob = null; protected WorkflowActionBean wfAction = null; private JPAService jpaService = null; private ActionExecutor executor = null; @@ -185,7 +184,7 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command isUserRetry = true; prepareForRetry(wfAction); } - context = new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); + context = getContext(isRetry, isUserRetry); boolean caught = false; try { if (!(executor instanceof ControlNodeActionExecutor)) { @@ -230,21 +229,6 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command Instrumentation.Cron cron = new Instrumentation.Cron(); cron.start(); context.setStartTime(); - /* - Creating and forwarding the tag, It will be useful during repeat attempts of Launcher, to ensure only - one child job is running. Tag is formed as follows: - For workflow job, tag = action-id - For Coord job, tag = coord-action-id@action-name (if not part of sub flow), else - coord-action-id@subflow-action-name@action-name. - */ - if (conf.get(OOZIE_ACTION_YARN_TAG) != null) { - context.setVar(OOZIE_ACTION_YARN_TAG, conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName()); - } else if (wfJob.getParentId() != null) { - context.setVar(OOZIE_ACTION_YARN_TAG, wfJob.getParentId() + "@" + wfAction.getName()); - } else { - context.setVar(OOZIE_ACTION_YARN_TAG, wfAction.getId()); - } - executor.start(context, wfAction); cron.stop(); FaultInjection.activate("org.apache.oozie.command.SkipCommitFaultInjection"); @@ -356,6 +340,16 @@ public class ActionStartXCommand extends ActionXCommand<org.apache.oozie.command new ActionEndXCommand(wfAction.getId(), wfAction.getType()).call(); } + /** + * Get action executor context + * @param isRetry + * @param isUserRetry + * @return + */ + protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry) { + return new ActionXCommand.ActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); + } + protected void updateJobLastModified(){ wfJob.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java index 525ef94..e65c3bf 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java @@ -23,6 +23,8 @@ import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.Set; @@ -277,9 +279,9 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { * */ public static class ActionExecutorContext implements ActionExecutor.Context { - private final WorkflowJobBean workflow; + protected final WorkflowJobBean workflow; private Configuration protoConf; - private final WorkflowActionBean action; + protected final WorkflowActionBean action; private final boolean isRetry; private final boolean isUserRetry; private boolean started; @@ -353,6 +355,15 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { * @see org.apache.oozie.action.ActionExecutor.Context#setVar(java.lang.String, java.lang.String) */ public void setVar(String name, String value) { + setVarToWorkflow(name, value); + } + + /** + * This is not thread safe, don't use if workflowjob is shared among multiple actions command + * @param name + * @param value + */ + public void setVarToWorkflow(String name, String value) { name = action.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + name; WorkflowInstance wfInstance = workflow.getWorkflowInstance(); wfInstance.setVar(name, value); @@ -520,7 +531,32 @@ public abstract class ActionXCommand<T> extends WorkflowXCommand<T> { public void setJobStatus(Job.Status jobStatus) { this.jobStatus = jobStatus; } + } + + public static class ForkedActionExecutorContext extends ActionExecutorContext { + private Map<String, String> contextVariableMap = new HashMap<String, String>(); + + public ForkedActionExecutorContext(WorkflowJobBean workflow, WorkflowActionBean action, boolean isRetry, + boolean isUserRetry) { + super(workflow, action, isRetry, isUserRetry); + } + + public void setVar(String name, String value) { + if (value != null) { + contextVariableMap.remove(name); + } + else { + contextVariableMap.put(name, value); + } + } + public String getVar(String name) { + return contextVariableMap.get(name); + } + + public Map<String, String> getContextMap() { + return contextVariableMap; + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java index 47dca75..91da0b8 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ForkedActionStartXCommand.java @@ -25,6 +25,7 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.XCommand; +import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; public class ForkedActionStartXCommand extends ActionStartXCommand { @@ -96,4 +97,10 @@ public class ForkedActionStartXCommand extends ActionStartXCommand { protected void callActionEnd() { queue(new ActionEndXCommand(wfAction.getId(), wfAction.getType())); } + + @Override + protected ActionExecutorContext getContext(boolean isRetry, boolean isUserRetry){ + return new ActionXCommand.ForkedActionExecutorContext(wfJob, wfAction, isRetry, isUserRetry); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index d2bb403..e95a60a 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -39,6 +39,7 @@ import org.apache.oozie.XException; import org.apache.oozie.command.CommandException; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; +import org.apache.oozie.command.wf.ActionXCommand.ForkedActionExecutorContext; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; @@ -448,8 +449,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { } // Changing to synchronous call from asynchronous queuing to prevent // undue delay from between end of previous and start of next action - if (wfJob.getStatus() != WorkflowJob.Status.RUNNING - && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { + if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { // only for asynchronous actions, parent coord action's external id will // persisted and following update will succeed. updateParentIfNecessary(wfJob); @@ -458,7 +458,7 @@ public class SignalXCommand extends WorkflowXCommand<Void> { else if (syncAction != null) { new ActionStartXCommand(wfJob, syncAction.getId(), syncAction.getType()).call(); } - else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)){ + else if (!workflowActionBeanListForForked.isEmpty() && !checkForSuspendNode(workflowActionBeanListForForked)) { startForkedActions(workflowActionBeanListForForked); } LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" + actionId); @@ -467,9 +467,12 @@ public class SignalXCommand extends WorkflowXCommand<Void> { public void startForkedActions(List<WorkflowActionBean> workflowActionBeanListForForked) throws CommandException { - List<CallableWrapper<ActionExecutorContext>> tasks = - new ArrayList<CallableWrapper<ActionExecutorContext>>(); - boolean updateLastModified = true; + List<CallableWrapper<ActionExecutorContext>> tasks = new ArrayList<CallableWrapper<ActionExecutorContext>>(); + List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); + List<JsonBean> insertList = new ArrayList<JsonBean>(); + + boolean endWorkflow = false; + boolean submitJobByQueuing = false; for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { LOG.debug("Starting forked actions parallely : " + workflowActionBean.getId()); tasks.add(Services.get().get(CallableQueueService.class).new CallableWrapper<ActionExecutorContext>( @@ -477,18 +480,26 @@ public class SignalXCommand extends WorkflowXCommand<Void> { } try { - List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class).invokeAll(tasks); + List<Future<ActionExecutorContext>> futures = Services.get().get(CallableQueueService.class) + .invokeAll(tasks); for (Future<ActionExecutorContext> result : futures) { + if (result == null) { + submitJobByQueuing = true; + continue; + } ActionExecutorContext context = result.get(); + Map<String, String> contextVariableMap = ((ForkedActionExecutorContext) context).getContextMap(); + LOG.debug("contextVariableMap size of action " + context.getAction().getId() + " is " + contextVariableMap.size()); + for (String key : contextVariableMap.keySet()) { + context.setVarToWorkflow(key, contextVariableMap.get(key)); + } if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.FAILED)) { LOG.warn("Action has failed, failing job" + context.getAction().getId()); new ActionStartXCommand(context.getAction().getId(), null).failJob(context); updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, (WorkflowActionBean) context.getAction())); if (context.isShouldEndWF()) { - endWF(); - updateLastModified = false; - break; + endWorkflow = true; } } if (context.getJobStatus() != null && context.getJobStatus().equals(Job.Status.SUSPENDED)) { @@ -498,20 +509,34 @@ public class SignalXCommand extends WorkflowXCommand<Void> { updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, (WorkflowActionBean) context.getAction())); if (context.isShouldEndWF()) { - endWF(); - updateLastModified = false; - break; + endWorkflow = true; } } } + if (endWorkflow) { + endWF(insertList); + } + } catch (Exception e) { LOG.error("Error running forked jobs parallely", e); startForkedActionsByQueuing(workflowActionBeanListForForked); + submitJobByQueuing = false; + } + if (submitJobByQueuing && !endWorkflow) { + LOG.error("There is error in running forked jobs parallely"); + startForkedActionsByQueuing(workflowActionBeanListForForked); + } + wfJob.setLastModifiedTime(new Date()); + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, + wfJob)); + try { + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, updateList, null); } - if (updateLastModified) { - updateJobLastModified(); + catch (JPAExecutorException e) { + throw new CommandException(e); } + LOG.debug("forked actions submitted parallely"); } @@ -519,17 +544,11 @@ public class SignalXCommand extends WorkflowXCommand<Void> { //queuing all jobs, submitted job will fail in precondition for (WorkflowActionBean workflowActionBean : workflowActionBeanListForForked) { LOG.debug("Queuing fork action " + workflowActionBean.getId()); - queue(new ActionStartXCommand(wfJob, workflowActionBean.getId(), workflowActionBean.getType())); + queue(new ActionStartXCommand(workflowActionBean.getId(), workflowActionBean.getType())); } } - protected void updateJobLastModified() { - wfJob.setLastModifiedTime(new Date()); - updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, - wfJob)); - } - - protected void endWF() throws CommandException { + private void endWF(List<JsonBean> insertList) throws CommandException { updateParentIfNecessary(wfJob, 3); new WfEndXCommand(wfJob).call(); // To delete the WF temp dir SLAEventBean slaEvent2 = SLADbXOperations.createStatusEvent(wfJob.getSlaXml(), wfJob.getId(), Status.FAILED, http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java index c1f7cb1..8da8f03 100644 --- a/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java +++ b/core/src/main/java/org/apache/oozie/workflow/lite/ControlNodeHandler.java @@ -19,6 +19,7 @@ package org.apache.oozie.workflow.lite; import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.XLog; import org.apache.oozie.workflow.WorkflowException; import java.util.ArrayList; @@ -31,6 +32,8 @@ import java.util.List; public abstract class ControlNodeHandler extends NodeHandler { public static final String FORK_COUNT_PREFIX = "workflow.fork."; + public XLog LOG = XLog.getLog(getClass()); + /** * Called by {@link #enter(Context)} when returning TRUE. @@ -62,6 +65,7 @@ public abstract class ControlNodeHandler extends NodeHandler { else if (nodeClass.equals(JoinNodeDef.class)) { String parentExecutionPath = context.getParentExecutionPath(context.getExecutionPath()); String forkCount = context.getVar(FORK_COUNT_PREFIX + parentExecutionPath); + if (forkCount == null) { throw new WorkflowException(ErrorCode.E0720, context.getNodeDef().getName()); } @@ -73,6 +77,8 @@ public abstract class ControlNodeHandler extends NodeHandler { else { context.setVar(FORK_COUNT_PREFIX + parentExecutionPath, null); } + LOG.debug("count = " + count + " for parent execution path " + parentExecutionPath); + doTouch = (count == 0); } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 85bb993..5f9e29a 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -1256,6 +1256,12 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { + "<value>java-job-conf</value>" + "</property>" + "</configuration>"; wfBean.setConf(jobConf); + ae = new JavaActionExecutor() { + @Override + protected String getDefaultShareLibName(Element actionXml) { + return "java-action-executor"; + } + }; Assert.assertArrayEquals(new String[] { "java-job-conf" }, ae.getShareLibNames(context, new Element("java"), actionConf)); http://git-wip-us.apache.org/repos/asf/oozie/blob/2322d496/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 8612854..ab44c24 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2436 Fork/join workflow fails with "oozie.action.yarn.tag must not be null" (puru) OOZIE-2578 Oozie example distcp job fails to run within an encrypted zone with checksum match error (pbacsko via rkanter) OOZIE-2362 SQL injection in BulkJPAExecutor (pbacsko via rkanter) OOZIE-2577 Flaky tests TestCoordActionInputCheckXCommand.testTimeout and testTimeoutWithException (pbacsko via rkanter)
