Repository: oozie
Updated Branches:
  refs/heads/master 02c115693 -> 211553e38


OOZIE-2001 Workflow re-runs doesn't update coord action status 
(jaydeepvishwakarma via shwethags)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/211553e3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/211553e3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/211553e3

Branch: refs/heads/master
Commit: 211553e38a85acbb082896b6574860fe6de23d5a
Parents: 02c1156
Author: Shwetha GS <[email protected]>
Authored: Mon Nov 3 12:23:33 2014 +0530
Committer: Shwetha GS <[email protected]>
Committed: Mon Nov 3 12:26:57 2014 +0530

----------------------------------------------------------------------
 .../java/org/apache/oozie/WorkflowJobBean.java  |   2 +-
 .../coord/CoordActionUpdateXCommand.java        |   3 +-
 .../apache/oozie/command/wf/ReRunXCommand.java  |   3 +
 .../executor/jpa/WorkflowJobQueryExecutor.java  |  14 +-
 .../command/coord/TestCoordRerunXCommand.java   | 165 ++++++++++++++++---
 release-log.txt                                 |   1 +
 6 files changed, 153 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java 
b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
index f58bfdf..66a0f61 100644
--- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
+++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java
@@ -98,7 +98,7 @@ import org.json.simple.JSONObject;
 
     @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, 
w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, 
w.logToken, w.wfInstance  from WorkflowJobBean w where w.id = :id"),
 
-    @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, 
w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from 
WorkflowJobBean w where w.id = :id"),
+    @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, 
w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance, w.parentId 
from WorkflowJobBean w where w.id = :id"),
 
     @NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, 
w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w 
where w.id = :id"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
index 44b1c11..d628a9f 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java
@@ -98,7 +98,8 @@ public class CoordActionUpdateXCommand extends 
CoordinatorXCommand<Void> {
                 coordAction.setStatus(CoordinatorAction.Status.SUSPENDED);
                 coordAction.decrementAndGetPending();
             }
-            else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) {
+            else if (workflow.getStatus() == WorkflowJob.Status.RUNNING ||
+                    workflow.getStatus() == WorkflowJob.Status.PREP) {
                 // resume workflow job and update coord action accordingly
                 coordAction.setStatus(CoordinatorAction.Status.RUNNING);
                 coordAction.decrementAndGetPending();

http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java 
b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
index 4f2e975..460e90c 100644
--- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java
@@ -253,6 +253,9 @@ public class ReRunXCommand extends WorkflowXCommand<Void> {
         catch (JPAExecutorException je) {
             throw new CommandException(je);
         }
+        finally {
+            updateParentIfNecessary(wfBean);
+        }
 
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
index e5f69ac..1acab4a 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java
@@ -18,13 +18,6 @@
 
 package org.apache.oozie.executor.jpa;
 
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.persistence.EntityManager;
-import javax.persistence.Query;
-
 import org.apache.oozie.BinaryBlob;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.StringBlob;
@@ -33,6 +26,12 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.DateUtils;
 
+import javax.persistence.EntityManager;
+import javax.persistence.Query;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Query Executor that provides API to run query for Workflow Job
  */
@@ -266,6 +265,7 @@ public class WorkflowJobQueryExecutor extends 
QueryExecutor<WorkflowJobBean, Wor
                 bean.setRun((Integer) arr[5]);
                 bean.setLogToken((String) arr[6]);
                 bean.setWfInstanceBlob((BinaryBlob) (arr[7]));
+                bean.setParentId((String)arr[8]);
                 break;
             case GET_WORKFLOW_DEFINITION:
                 bean = new WorkflowJobBean();

http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
index 65338a3..ac023ca 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java
@@ -18,43 +18,21 @@
 
 package org.apache.oozie.command.coord;
 
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.Reader;
-import java.io.Writer;
-import java.util.Date;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Matcher;
-
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
-import org.apache.oozie.client.Job;
-import org.apache.oozie.client.OozieClient;
-import org.apache.oozie.client.OozieClientException;
+import org.apache.oozie.action.oozie.JavaSleepAction;
+import org.apache.oozie.client.*;
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
-import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.*;
 import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.SchemaService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.StatusTransitService;
-import org.apache.oozie.service.StoreService;
+import org.apache.oozie.service.*;
 import org.apache.oozie.store.CoordinatorStore;
 import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.XDataTestCase;
@@ -65,6 +43,12 @@ import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 
+import java.io.*;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Matcher;
+
 public class TestCoordRerunXCommand extends XDataTestCase {
     private Services services;
 
@@ -1213,4 +1197,133 @@ public class TestCoordRerunXCommand extends 
XDataTestCase {
 
         return actionNomialTime;
     }
+
+    /**
+     * It will verify the Action status running when workflow triggered.
+     * @throws Exception
+     */
+    public void testActionStatusRunningWithWorkflow() throws Exception {
+        Date start = DateUtils.parseDateOozieTZ("2009-12-15T01:00Z");
+        Date end = DateUtils.parseDateOozieTZ("2009-12-16T01:00Z");
+        CoordinatorJobBean coordJob = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false,
+                false, 1);
+
+        CoordinatorActionBean action = 
addRecordToWithLazyAction(coordJob.getId(), 1,
+                CoordinatorAction.Status.SUBMITTED, "coord-rerun-action1.xml");
+
+        String actionId = action.getId();
+        new CoordActionStartXCommand(actionId, getTestUser(), "myapp", 
"myjob").call();
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        action = jpaService.execute(new CoordActionGetJPAExecutor(actionId));
+
+        if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) {
+            fail("CoordActionStartCommand didn't work because the status for 
action id" + actionId + " is :"
+                    + action.getStatus() + " expected to be NOT SUBMITTED 
(i.e. RUNNING)");
+        }
+
+        final String wfId = action.getExternalId();
+
+        final OozieClient wfClient = LocalOozie.getClient();
+
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(wfId).getStatus() == 
WorkflowJob.Status.RUNNING;
+            }
+        });
+
+        wfClient.kill(wfId);
+
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(wfId).getStatus() == 
WorkflowJob.Status.KILLED;
+            }
+        });
+        assertEquals(WorkflowJob.Status.KILLED, 
wfClient.getJobInfo(wfId).getStatus());
+
+        Properties conf = wfClient.createConfiguration();
+        conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true");
+        wfClient.reRun(wfId,conf);
+
+        waitFor(15 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return wfClient.getJobInfo(wfId).getStatus() == 
WorkflowJob.Status.RUNNING;
+            }
+        });
+
+        assertEquals(WorkflowJob.Status.RUNNING, 
wfClient.getJobInfo(wfId).getStatus());
+        OozieClient coordActionClient = LocalOozie.getCoordClient();
+        
assertEquals(CoordinatorAction.Status.RUNNING,coordActionClient.getCoordActionInfo(actionId).getStatus());
+    }
+
+    private CoordinatorActionBean addRecordToWithLazyAction
+            (String jobId, int actionNum, CoordinatorAction.Status status, 
String resourceXmlName) throws IOException {
+        Path appPath = new Path(getFsTestCaseDir(), "coord");
+        String actionXml = getCoordActionXml(appPath, resourceXmlName);
+        String actionNomialTime = getActionNomialTime(actionXml);
+
+        CoordinatorActionBean action = new CoordinatorActionBean();
+        action.setJobId(jobId);
+        
action.setId(Services.get().get(UUIDService.class).generateChildId(jobId, 
actionNum + ""));
+        action.setActionNumber(actionNum);
+        try {
+            
action.setNominalTime(DateUtils.parseDateOozieTZ(actionNomialTime));
+        }
+        catch (Exception e) {
+            e.printStackTrace();
+            fail("Unable to get action nominal time");
+            throw new IOException(e);
+        }
+        action.setLastModifiedTime(new Date());
+        action.setStatus(status);
+        action.setActionXml(actionXml);
+
+        Properties conf = getLazyWorkflowProp(appPath);
+        String createdConf = XmlUtils.writePropToString(conf);
+        action.setCreatedConf(createdConf);
+
+        JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        CoordActionInsertJPAExecutor coordActionInsertCmd = new 
CoordActionInsertJPAExecutor(action);
+        try {
+            jpaService.execute(coordActionInsertCmd);
+        } catch (JPAExecutorException e) {
+            e.printStackTrace();
+            fail("Unable to insert the test coord action record to table");
+        }
+        return action;
+    }
+
+    private Properties getLazyWorkflowProp(Path appPath) throws IOException {
+        Path wfAppPath = new Path(getFsTestCaseDir(), "workflow");
+        final OozieClient coordClient = LocalOozie.getCoordClient();
+        Properties conf = coordClient.createConfiguration();
+        conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath.toString());
+        conf.setProperty("jobTracker", getJobTrackerUri());
+        conf.setProperty("nameNode", getNameNodeUri());
+        conf.setProperty("wfAppPath", wfAppPath.toString());
+        conf.remove("user.name");
+        conf.setProperty("user.name", getTestUser());
+        writeToFile(getLazyWorkflow(), wfAppPath, "workflow.xml");
+        return conf;
+    }
+    public String getLazyWorkflow() {
+        return  "<workflow-app xmlns='uri:oozie:workflow:0.1'  
xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>" +
+                "<start to='java' />" +
+                "       <action name='java'>" +
+                "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class>" + JavaSleepAction.class.getName() + 
"</main-class>" +
+                "<arg>exit0</arg>" +
+                "</java>"
+                + "<ok to='end' />"
+                + "<error to='fail' />"
+                + "</action>"
+                + "<kill name='fail'>"
+                + "<message>shell action fail, error 
message[${wf:errorMessage(wf:lastErrorNode())}]</message>"
+                + "</kill>"
+                + "<end name='end' />"
+                + "</workflow-app>";
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 83f736e..8ed6dd8 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2001 Workflow re-runs doesn't update coord action status 
(jaydeepvishwakarma via shwethags)
 OOZIE-2048 HadoopAccessorService should also process ssl_client.xml 
(venkatnrangan via bzhang)
 OOZIE-2047 Oozie does not support Hive tables that use datatypes introduced 
since Hive 0.8 (venkatnrangan via bzhang)
 OOZIE-1808 Change DG_QuickStart.twiki to reflect changes in sharelib 
installation (ryota)

Reply via email to