Repository: oozie Updated Branches: refs/heads/master abb508780 -> bac6d005e
OOZIE-2029 Workflow re-run with RERUN_FAIL_NODES=true should re-run only the failed nodes of the sub-workflow (jaydeepvishwakarma via shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/bac6d005 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/bac6d005 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/bac6d005 Branch: refs/heads/master Commit: bac6d005e8718e9e6df94f47677c2d4d9726d5e5 Parents: abb5087 Author: shwethags <[email protected]> Authored: Tue Dec 9 16:28:39 2014 +0530 Committer: shwethags <[email protected]> Committed: Tue Dec 9 16:28:39 2014 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/WorkflowActionBean.java | 4 +- .../action/oozie/SubWorkflowActionExecutor.java | 12 +++- .../apache/oozie/command/wf/ReRunXCommand.java | 6 +- .../apache/oozie/command/wf/SignalXCommand.java | 49 ++++++++++--- .../jpa/WorkflowActionQueryExecutor.java | 2 + .../oozie/TestSubWorkflowActionExecutor.java | 72 ++++++++++++++++++++ release-log.txt | 1 + 7 files changed, 129 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/WorkflowActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java index d27f59b..06edf53 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java @@ -88,7 +88,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_ACTION_FAIL", query = "select a.id, a.wfId, a.name, a.statusStr, a.pending, a.type, a.logToken, a.transition, a.errorCode, a.errorMessage from WorkflowActionBean a where a.id = :id"), - @NamedQuery(name = "GET_ACTION_SIGNAL", query = "select a.id, a.wfId, a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, a.transition, a.errorCode, a.errorMessage, a.executionPath, a.signalValue, a.slaXml from WorkflowActionBean a where a.id = :id"), + @NamedQuery(name = "GET_ACTION_SIGNAL", query = "select a.id, a.wfId, a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, a.transition, a.errorCode, a.errorMessage, a.executionPath, a.signalValue, a.slaXml, a.externalId from WorkflowActionBean a where a.id = :id"), @NamedQuery(name = "GET_ACTION_CHECK", query = "select a.id, a.wfId, a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, a.transition, a.retries, a.userRetryCount, a.userRetryMax, a.userRetryInterval, a.trackerUri, a.startTimestamp, a.endTimestamp, a.lastCheckTimestamp, a.errorCode, a.errorMessage, a.externalId, a.externalStatus, a.externalChildIDs, a.conf from WorkflowActionBean a where a.id = :id"), @@ -110,7 +110,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) from WorkflowActionBean a where a.wfId = :wfId AND (a.statusStr = 'START_RETRY' OR a.statusStr = 'START_MANUAL' OR a.statusStr = 'END_RETRY' OR a.statusStr = 'END_MANUAL')"), - @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr, a.endTimestamp from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") }) + @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, a.name, a.statusStr, a.endTimestamp, a.type from WorkflowActionBean a where a.wfId = :wfId order by a.startTimestamp") }) @Table(name = "WF_ACTIONS") public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean { @Id http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java index bda34b5..debbf90 100644 --- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java @@ -124,7 +124,7 @@ public class SubWorkflowActionExecutor extends ActionExecutor { } protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, Configuration conf) throws ActionExecutorException { - int depth = conf.getInt(SUBWORKFLOW_DEPTH, 0); + int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0); int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH); if (depth >= maxDepth) { throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "SUBWF001", @@ -180,8 +180,14 @@ public class SubWorkflowActionExecutor extends ActionExecutor { //TODO: this has to be refactored later to be done in a single place for REST calls and this JobUtils.normalizeAppPath(context.getWorkflow().getUser(), context.getWorkflow().getGroup(), subWorkflowConf); - - subWorkflowId = oozieClient.run(subWorkflowConf.toProperties()); + // if the rerun failed node option is provided during the time of rerun command, old subworkflow will + // rerun again. + if(action.getExternalId() != null && parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) { + oozieClient.reRun(action.getExternalId(), subWorkflowConf.toProperties()); + subWorkflowId = action.getExternalId(); + } else { + subWorkflowId = oozieClient.run(subWorkflowConf.toProperties()); + } } else { subWorkflowId = runningJobId; http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java index 460e90c..19d3e8d 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java @@ -223,7 +223,11 @@ public class ReRunXCommand extends WorkflowXCommand<Void> { } for (int i = 0; i < actions.size(); i++) { - if (!nodesToSkip.contains(actions.get(i).getName())) { + // Skipping to delete the sub workflow when rerun failed node option has been provided. As same + // action will be used to rerun the job. + if (!nodesToSkip.contains(actions.get(i).getName()) && + !(conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) && + SubWorkflowActionExecutor.ACTION_TYPE.equals(actions.get(i).getType()))) { deleteList.add(actions.get(i)); LOG.info("Deleting Action[{0}] for re-run", actions.get(i).getId()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java index 7ca8646..bccac51 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java @@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.control.ForkActionExecutor; import org.apache.oozie.action.control.StartActionExecutor; +import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; +import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.SLAEvent.SlaAppType; import org.apache.oozie.client.SLAEvent.Status; @@ -318,6 +320,25 @@ public class SignalXCommand extends WorkflowXCommand<Void> { } else { for (WorkflowActionBean newAction : WorkflowStoreService.getActionsToStart(workflowInstance)) { + boolean isOldWFAction = false; + + // In case of subworkflow rerun when failed option have been provided, rerun command do not delete + // old action. To avoid twice entry for same action, Checking in Db if the workflow action already exist. + if(SubWorkflowActionExecutor.ACTION_TYPE.equals(newAction.getType())) { + try { + WorkflowActionBean oldAction = WorkflowActionQueryExecutor.getInstance() + .get(WorkflowActionQuery.GET_ACTION_CHECK, + newAction.getId()); + newAction.setExternalId(oldAction.getExternalId()); + newAction.setCreatedTime(oldAction.getCreatedTime()); + isOldWFAction = true; + } catch (JPAExecutorException e) { + if(e.getErrorCode() != ErrorCode.E0605) { + throw new CommandException(e); + } + } + } + String skipVar = workflowInstance.getVar(newAction.getName() + WorkflowInstance.NODE_VAR_SEPARATOR + ReRunXCommand.TO_SKIP); boolean skipNewAction = false, suspendNewAction = false; @@ -334,23 +355,29 @@ public class SignalXCommand extends WorkflowXCommand<Void> { queue(new SignalXCommand(jobId, oldAction.getId())); } else { - try { - // Make sure that transition node for a forked action - // is inserted only once - WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, - newAction.getId()); - - continue; - } - catch (JPAExecutorException jee) { + if(!skipAction) { + try { + // Make sure that transition node for a forked action + // is inserted only once + WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK, + newAction.getId()); + + continue; + } catch (JPAExecutorException jee) { + } } suspendNewAction = checkForSuspendNode(newAction); newAction.setPending(); String actionSlaXml = getActionSLAXml(newAction.getName(), workflowInstance.getApp() .getDefinition(), wfJob.getConf()); newAction.setSlaXml(actionSlaXml); - newAction.setCreatedTime(new Date()); - insertList.add(newAction); + if(!isOldWFAction) { + newAction.setCreatedTime(new Date()); + insertList.add(newAction); + } else { + updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START, + newAction)); + } LOG.debug("SignalXCommand: Name: " + newAction.getName() + ", Id: " + newAction.getId() + ", Authcode:" + newAction.getCred()); if (wfAction != null) { // null during wf job submit http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java index f91d7d4..2c459e4 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java @@ -273,6 +273,7 @@ public class WorkflowActionQueryExecutor extends bean.setExecutionPath((String) arr[11]); bean.setSignalValue((String) arr[12]); bean.setSlaXmlBlob((StringBlob) arr[13]); + bean.setExternalId((String) arr[14]); break; case GET_ACTION_CHECK: bean = new WorkflowActionBean(); @@ -356,6 +357,7 @@ public class WorkflowActionQueryExecutor extends bean.setName((String) arr[1]); bean.setStatusStr((String) arr[2]); bean.setEndTime(DateUtils.toDate((Timestamp) arr[3])); + bean.setType((String) arr[4]); break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java index 0d7e926..9ab897a 100644 --- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java @@ -562,4 +562,76 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase { + "<end name='end' />" + "</workflow-app>"; } + + public void testSubWorkflowRerun() throws Exception { + try { + Path subWorkflowAppPath = getFsTestCaseDir(); + FileSystem fs = getFileSystem(); + Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml"); + Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath)); + writer.write(getLazyWorkflow()); + writer.close(); + String workflowUri = getTestCaseFileUri("workflow.xml"); + String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" + + "<start to=\"subwf\"/>" + + "<action name=\"subwf\">" + + " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" + + " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" + + " </sub-workflow>" + + " <ok to=\"end\"/>" + + " <error to=\"fail\"/>" + + "</action>" + + "<kill name=\"fail\">" + + " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name=\"end\"/>" + + "</workflow-app>"; + + writeToFile(appXml, workflowUri); + LocalOozie.start(); + final OozieClient wfClient = LocalOozie.getClient(); + Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.APP_PATH, workflowUri); + conf.setProperty(OozieClient.USER_NAME, getTestUser()); + conf.setProperty("appName", "var-app-name"); + final String jobId = wfClient.submit(conf); + wfClient.start(jobId); + + waitFor(JOB_TIMEOUT, new Predicate() { + public boolean evaluate() throws Exception { + return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) && + (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING); + } + }); + + String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId(); + wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId()); + + waitFor(JOB_TIMEOUT, new Predicate() { + public boolean evaluate() throws Exception { + return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.KILLED) && + (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.ERROR); + } + }); + + conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true"); + wfClient.reRun(jobId,conf); + + waitFor(JOB_TIMEOUT, new Predicate() { + public boolean evaluate() throws Exception { + return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.SUCCEEDED) && + (wfClient.getJobInfo(jobId).getActions().get(2).getStatus() == WorkflowAction.Status.OK); + + } + }); + + WorkflowJob job = wfClient.getJobInfo(wfClient.getJobInfo(jobId).getActions().get(2).getExternalId()); + assertEquals(job.getStatus(), WorkflowJob.Status.SUCCEEDED); + assertEquals(job.getId(), subWorkflowExternalId); + + } finally { + LocalOozie.stop(); + } + + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 9b8708e..2aab6a9 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2029 Workflow re-run with RERUN_FAIL_NODES=true should re-run only the failed nodes of the sub-workflow (jaydeepvishwakarma via shwethags) OOZIE-2035 NotificationXCommand should support proxy (puru) OOZIE-2065 Oozie returns incorrect total action for coord dryrun (puru) OOZIE-2069 RecoveryService reads incorrect configuration (puru)
