Repository: oozie
Updated Branches:
  refs/heads/master abb508780 -> bac6d005e


OOZIE-2029 Workflow re-run with RERUN_FAIL_NODES=true should re-run only the 
failed nodes of the sub-workflow (jaydeepvishwakarma via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/bac6d005
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/bac6d005
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/bac6d005

Branch: refs/heads/master
Commit: bac6d005e8718e9e6df94f47677c2d4d9726d5e5
Parents: abb5087
Author: shwethags <[email protected]>
Authored: Tue Dec 9 16:28:39 2014 +0530
Committer: shwethags <[email protected]>
Committed: Tue Dec 9 16:28:39 2014 +0530

----------------------------------------------------------------------
 .../org/apache/oozie/WorkflowActionBean.java    |  4 +-
 .../action/oozie/SubWorkflowActionExecutor.java | 12 +++-
 .../apache/oozie/command/wf/ReRunXCommand.java  |  6 +-
 .../apache/oozie/command/wf/SignalXCommand.java | 49 ++++++++++---
 .../jpa/WorkflowActionQueryExecutor.java        |  2 +
 .../oozie/TestSubWorkflowActionExecutor.java    | 72 ++++++++++++++++++++
 release-log.txt                                 |  1 +
 7 files changed, 129 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
index d27f59b..06edf53 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowActionBean.java
@@ -88,7 +88,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_ACTION_FAIL", query = "select a.id, a.wfId, 
a.name, a.statusStr, a.pending, a.type, a.logToken, a.transition, a.errorCode, 
a.errorMessage from WorkflowActionBean a where a.id = :id"),
 
-    @NamedQuery(name = "GET_ACTION_SIGNAL", query = "select a.id, a.wfId, 
a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, 
a.transition, a.errorCode, a.errorMessage, a.executionPath, a.signalValue, 
a.slaXml from WorkflowActionBean a where a.id = :id"),
+    @NamedQuery(name = "GET_ACTION_SIGNAL", query = "select a.id, a.wfId, 
a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, 
a.transition, a.errorCode, a.errorMessage, a.executionPath, a.signalValue, 
a.slaXml, a.externalId from WorkflowActionBean a where a.id = :id"),
 
     @NamedQuery(name = "GET_ACTION_CHECK", query = "select a.id, a.wfId, 
a.name, a.statusStr, a.pending, a.pendingAgeTimestamp, a.type, a.logToken, 
a.transition, a.retries, a.userRetryCount, a.userRetryMax, a.userRetryInterval, 
a.trackerUri, a.startTimestamp, a.endTimestamp, a.lastCheckTimestamp, 
a.errorCode, a.errorMessage, a.externalId, a.externalStatus, 
a.externalChildIDs, a.conf from WorkflowActionBean a where a.id = :id"),
 
@@ -110,7 +110,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_RETRY_MANUAL_ACTIONS", query = "select OBJECT(a) 
from WorkflowActionBean a where a.wfId = :wfId AND (a.statusStr = 'START_RETRY' 
OR a.statusStr = 'START_MANUAL' OR a.statusStr = 'END_RETRY' OR a.statusStr = 
'END_MANUAL')"),
 
-    @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, 
a.name, a.statusStr, a.endTimestamp from WorkflowActionBean a where a.wfId = 
:wfId order by a.startTimestamp") })
+    @NamedQuery(name = "GET_ACTIONS_FOR_WORKFLOW_RERUN", query = "select a.id, 
a.name, a.statusStr, a.endTimestamp, a.type from WorkflowActionBean a where 
a.wfId = :wfId order by a.startTimestamp") })
 @Table(name = "WF_ACTIONS")
 public class WorkflowActionBean implements Writable, WorkflowAction, JsonBean {
     @Id

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index bda34b5..debbf90 100644
--- 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -124,7 +124,7 @@ public class SubWorkflowActionExecutor extends 
ActionExecutor {
     }
 
     protected void verifyAndInjectSubworkflowDepth(Configuration parentConf, 
Configuration conf) throws ActionExecutorException {
-        int depth = conf.getInt(SUBWORKFLOW_DEPTH, 0);
+        int depth = parentConf.getInt(SUBWORKFLOW_DEPTH, 0);
         int maxDepth = ConfigurationService.getInt(SUBWORKFLOW_MAX_DEPTH);
         if (depth >= maxDepth) {
             throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "SUBWF001",
@@ -180,8 +180,14 @@ public class SubWorkflowActionExecutor extends 
ActionExecutor {
                 //TODO: this has to be refactored later to be done in a single 
place for REST calls and this
                 JobUtils.normalizeAppPath(context.getWorkflow().getUser(), 
context.getWorkflow().getGroup(),
                                           subWorkflowConf);
-
-                subWorkflowId = 
oozieClient.run(subWorkflowConf.toProperties());
+                // if the rerun failed node option is provided during the time 
of rerun command, old subworkflow will
+                // rerun again.
+                if(action.getExternalId() != null && 
parentConf.getBoolean(OozieClient.RERUN_FAIL_NODES, false)) {
+                    oozieClient.reRun(action.getExternalId(), 
subWorkflowConf.toProperties());
+                    subWorkflowId = action.getExternalId();
+                } else {
+                    subWorkflowId = 
oozieClient.run(subWorkflowConf.toProperties());
+                }
             }
             else {
                 subWorkflowId = runningJobId;

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
index 460e90c..19d3e8d 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
@@ -223,7 +223,11 @@ public class ReRunXCommand extends WorkflowXCommand<Void> {
         }
 
         for (int i = 0; i < actions.size(); i++) {
-            if (!nodesToSkip.contains(actions.get(i).getName())) {
+            // Skipping to delete the sub workflow when rerun failed node 
option has been provided. As same
+            // action will be used to rerun the job.
+            if (!nodesToSkip.contains(actions.get(i).getName()) &&
+                    !(conf.getBoolean(OozieClient.RERUN_FAIL_NODES, false) &&
+                    
SubWorkflowActionExecutor.ACTION_TYPE.equals(actions.get(i).getType()))) {
                 deleteList.add(actions.get(i));
                 LOG.info("Deleting Action[{0}] for re-run", 
actions.get(i).getId());
             }

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
index 7ca8646..bccac51 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.control.ForkActionExecutor;
 import org.apache.oozie.action.control.StartActionExecutor;
+import org.apache.oozie.action.oozie.SubWorkflowActionExecutor;
+import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.SLAEvent.SlaAppType;
 import org.apache.oozie.client.SLAEvent.Status;
@@ -318,6 +320,25 @@ public class SignalXCommand extends WorkflowXCommand<Void> 
{
         }
         else {
             for (WorkflowActionBean newAction : 
WorkflowStoreService.getActionsToStart(workflowInstance)) {
+                boolean isOldWFAction = false;
+
+                // In case of subworkflow rerun when failed option have been 
provided, rerun command do not delete
+                // old action. To avoid twice entry for same action, Checking 
in Db if the workflow action already exist.
+                
if(SubWorkflowActionExecutor.ACTION_TYPE.equals(newAction.getType())) {
+                    try {
+                        WorkflowActionBean oldAction = 
WorkflowActionQueryExecutor.getInstance()
+                                .get(WorkflowActionQuery.GET_ACTION_CHECK,
+                                        newAction.getId());
+                        newAction.setExternalId(oldAction.getExternalId());
+                        newAction.setCreatedTime(oldAction.getCreatedTime());
+                        isOldWFAction = true;
+                    } catch (JPAExecutorException e) {
+                        if(e.getErrorCode() != ErrorCode.E0605) {
+                            throw new CommandException(e);
+                        }
+                    }
+                }
+
                 String skipVar = workflowInstance.getVar(newAction.getName() + 
WorkflowInstance.NODE_VAR_SEPARATOR
                         + ReRunXCommand.TO_SKIP);
                 boolean skipNewAction = false, suspendNewAction = false;
@@ -334,23 +355,29 @@ public class SignalXCommand extends 
WorkflowXCommand<Void> {
                     queue(new SignalXCommand(jobId, oldAction.getId()));
                 }
                 else {
-                    try {
-                        // Make sure that transition node for a forked action
-                        // is inserted only once
-                        
WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
-                                newAction.getId());
-
-                        continue;
-                    }
-                    catch (JPAExecutorException jee) {
+                    if(!skipAction) {
+                        try {
+                            // Make sure that transition node for a forked 
action
+                            // is inserted only once
+                            
WorkflowActionQueryExecutor.getInstance().get(WorkflowActionQuery.GET_ACTION_ID_TYPE_LASTCHECK,
+                                    newAction.getId());
+
+                            continue;
+                        } catch (JPAExecutorException jee) {
+                        }
                     }
                     suspendNewAction = checkForSuspendNode(newAction);
                     newAction.setPending();
                     String actionSlaXml = getActionSLAXml(newAction.getName(), 
workflowInstance.getApp()
                             .getDefinition(), wfJob.getConf());
                     newAction.setSlaXml(actionSlaXml);
-                    newAction.setCreatedTime(new Date());
-                    insertList.add(newAction);
+                    if(!isOldWFAction) {
+                        newAction.setCreatedTime(new Date());
+                        insertList.add(newAction);
+                    } else {
+                        updateList.add(new 
UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_START,
+                                newAction));
+                    }
                     LOG.debug("SignalXCommand: Name: " + newAction.getName() + 
", Id: " + newAction.getId()
                             + ", Authcode:" + newAction.getCred());
                     if (wfAction != null) { // null during wf job submit

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
index f91d7d4..2c459e4 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionQueryExecutor.java
@@ -273,6 +273,7 @@ public class WorkflowActionQueryExecutor extends
                 bean.setExecutionPath((String) arr[11]);
                 bean.setSignalValue((String) arr[12]);
                 bean.setSlaXmlBlob((StringBlob) arr[13]);
+                bean.setExternalId((String) arr[14]);
                 break;
             case GET_ACTION_CHECK:
                 bean = new WorkflowActionBean();
@@ -356,6 +357,7 @@ public class WorkflowActionQueryExecutor extends
                 bean.setName((String) arr[1]);
                 bean.setStatusStr((String) arr[2]);
                 bean.setEndTime(DateUtils.toDate((Timestamp) arr[3]));
+                bean.setType((String) arr[4]);
                 break;
             default:
                 throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor 
cannot construct action bean for "

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
 
b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
index 0d7e926..9ab897a 100644
--- 
a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
@@ -562,4 +562,76 @@ public class TestSubWorkflowActionExecutor extends 
ActionExecutorTestCase {
                 + "<end name='end' />"
                 + "</workflow-app>";
     }
+
+    public void testSubWorkflowRerun() throws Exception {
+        try {
+            Path subWorkflowAppPath = getFsTestCaseDir();
+            FileSystem fs = getFileSystem();
+            Path subWorkflowPath = new Path(subWorkflowAppPath, 
"workflow.xml");
+            Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath));
+            writer.write(getLazyWorkflow());
+            writer.close();
+            String workflowUri = getTestCaseFileUri("workflow.xml");
+            String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" 
name=\"workflow\">" +
+                    "<start to=\"subwf\"/>" +
+                    "<action name=\"subwf\">" +
+                    "     <sub-workflow xmlns='uri:oozie:workflow:0.4'>" +
+                    "          <app-path>" + subWorkflowAppPath.toString() + 
"</app-path>" +
+                    "     </sub-workflow>" +
+                    "     <ok to=\"end\"/>" +
+                    "     <error to=\"fail\"/>" +
+                    "</action>" +
+                    "<kill name=\"fail\">" +
+                    "     <message>Sub workflow failed, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
+                    "</kill>" +
+                    "<end name=\"end\"/>" +
+                    "</workflow-app>";
+
+            writeToFile(appXml, workflowUri);
+            LocalOozie.start();
+            final OozieClient wfClient = LocalOozie.getClient();
+            Properties conf = wfClient.createConfiguration();
+            conf.setProperty(OozieClient.APP_PATH, workflowUri);
+            conf.setProperty(OozieClient.USER_NAME, getTestUser());
+            conf.setProperty("appName", "var-app-name");
+            final String jobId = wfClient.submit(conf);
+            wfClient.start(jobId);
+
+            waitFor(JOB_TIMEOUT, new Predicate() {
+                public boolean evaluate() throws Exception {
+                    return (wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.RUNNING) &&
+                            
(wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == 
WorkflowAction.Status.RUNNING);
+                }
+            });
+
+            String subWorkflowExternalId = 
wfClient.getJobInfo(jobId).getActions().get(1).getExternalId();
+            
wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId());
+
+            waitFor(JOB_TIMEOUT, new Predicate() {
+                public boolean evaluate() throws Exception {
+                    return (wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.KILLED) &&
+                            
(wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == 
WorkflowAction.Status.ERROR);
+                }
+            });
+
+            conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
+            wfClient.reRun(jobId,conf);
+
+            waitFor(JOB_TIMEOUT, new Predicate() {
+                public boolean evaluate() throws Exception {
+                    return (wfClient.getJobInfo(jobId).getStatus() == 
WorkflowJob.Status.SUCCEEDED) &&
+                            
(wfClient.getJobInfo(jobId).getActions().get(2).getStatus() == 
WorkflowAction.Status.OK);
+
+                }
+            });
+
+            WorkflowJob job = 
wfClient.getJobInfo(wfClient.getJobInfo(jobId).getActions().get(2).getExternalId());
+            assertEquals(job.getStatus(), WorkflowJob.Status.SUCCEEDED);
+            assertEquals(job.getId(), subWorkflowExternalId);
+
+        } finally {
+            LocalOozie.stop();
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/bac6d005/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 9b8708e..2aab6a9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2029 Workflow re-run with RERUN_FAIL_NODES=true should re-run only the 
failed nodes of the sub-workflow (jaydeepvishwakarma via shwethags)
 OOZIE-2035 NotificationXCommand should support proxy (puru)
 OOZIE-2065 Oozie returns incorrect total action for coord dryrun (puru)
 OOZIE-2069 RecoveryService reads incorrect configuration (puru)

Reply via email to