Repository: oozie Updated Branches: refs/heads/master 02c115693 -> 211553e38
OOZIE-2001 Workflow re-runs doesn't update coord action status (jaydeepvishwakarma via shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/211553e3 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/211553e3 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/211553e3 Branch: refs/heads/master Commit: 211553e38a85acbb082896b6574860fe6de23d5a Parents: 02c1156 Author: Shwetha GS <[email protected]> Authored: Mon Nov 3 12:23:33 2014 +0530 Committer: Shwetha GS <[email protected]> Committed: Mon Nov 3 12:26:57 2014 +0530 ---------------------------------------------------------------------- .../java/org/apache/oozie/WorkflowJobBean.java | 2 +- .../coord/CoordActionUpdateXCommand.java | 3 +- .../apache/oozie/command/wf/ReRunXCommand.java | 3 + .../executor/jpa/WorkflowJobQueryExecutor.java | 14 +- .../command/coord/TestCoordRerunXCommand.java | 165 ++++++++++++++++--- release-log.txt | 1 + 6 files changed, 153 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/WorkflowJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java index f58bfdf..66a0f61 100644 --- a/core/src/main/java/org/apache/oozie/WorkflowJobBean.java +++ b/core/src/main/java/org/apache/oozie/WorkflowJobBean.java @@ -98,7 +98,7 @@ import org.json.simple.JSONObject; @NamedQuery(name = "GET_WORKFLOW_SUSPEND", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.parentId, w.startTimestamp, w.endTimestamp, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"), - @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"), + @NamedQuery(name = "GET_WORKFLOW_RERUN", query = "select w.id, w.user, w.group, w.appName, w.statusStr, w.run, w.logToken, w.wfInstance, w.parentId from WorkflowJobBean w where w.id = :id"), @NamedQuery(name = "GET_WORKFLOW_DEFINITION", query = "select w.id, w.user, w.group, w.appName, w.logToken, w.wfInstance from WorkflowJobBean w where w.id = :id"), http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java index 44b1c11..d628a9f 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java @@ -98,7 +98,8 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { coordAction.setStatus(CoordinatorAction.Status.SUSPENDED); coordAction.decrementAndGetPending(); } - else if (workflow.getStatus() == WorkflowJob.Status.RUNNING) { + else if (workflow.getStatus() == WorkflowJob.Status.RUNNING || + workflow.getStatus() == WorkflowJob.Status.PREP) { // resume workflow job and update coord action accordingly coordAction.setStatus(CoordinatorAction.Status.RUNNING); coordAction.decrementAndGetPending(); http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java index 4f2e975..460e90c 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ReRunXCommand.java @@ -253,6 +253,9 @@ public class ReRunXCommand extends WorkflowXCommand<Void> { catch (JPAExecutorException je) { throw new CommandException(je); } + finally { + updateParentIfNecessary(wfBean); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java index e5f69ac..1acab4a 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobQueryExecutor.java @@ -18,13 +18,6 @@ package org.apache.oozie.executor.jpa; -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - import org.apache.oozie.BinaryBlob; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; @@ -33,6 +26,12 @@ import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.util.DateUtils; +import javax.persistence.EntityManager; +import javax.persistence.Query; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; + /** * Query Executor that provides API to run query for Workflow Job */ @@ -266,6 +265,7 @@ public class WorkflowJobQueryExecutor extends QueryExecutor<WorkflowJobBean, Wor bean.setRun((Integer) arr[5]); bean.setLogToken((String) arr[6]); bean.setWfInstanceBlob((BinaryBlob) (arr[7])); + bean.setParentId((String)arr[8]); break; case GET_WORKFLOW_DEFINITION: bean = new WorkflowJobBean(); http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java index 65338a3..ac023ca 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordRerunXCommand.java @@ -18,43 +18,21 @@ package org.apache.oozie.command.coord; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Reader; -import java.io.Writer; -import java.util.Date; -import java.util.List; -import java.util.Properties; -import java.util.regex.Matcher; - import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.Job; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.OozieClientException; +import org.apache.oozie.action.oozie.JavaSleepAction; +import org.apache.oozie.client.*; import org.apache.oozie.client.CoordinatorJob.Execution; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELFunctions; -import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; -import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.*; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.local.LocalOozie; -import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.SchemaService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.StatusTransitService; -import org.apache.oozie.service.StoreService; +import org.apache.oozie.service.*; import org.apache.oozie.store.CoordinatorStore; import org.apache.oozie.store.StoreException; import org.apache.oozie.test.XDataTestCase; @@ -65,6 +43,12 @@ import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.JDOMException; +import java.io.*; +import java.util.Date; +import java.util.List; +import java.util.Properties; +import java.util.regex.Matcher; + public class TestCoordRerunXCommand extends XDataTestCase { private Services services; @@ -1213,4 +1197,133 @@ public class TestCoordRerunXCommand extends XDataTestCase { return actionNomialTime; } + + /** + * It will verify the Action status running when workflow triggered. + * @throws Exception + */ + public void testActionStatusRunningWithWorkflow() throws Exception { + Date start = DateUtils.parseDateOozieTZ("2009-12-15T01:00Z"); + Date end = DateUtils.parseDateOozieTZ("2009-12-16T01:00Z"); + CoordinatorJobBean coordJob = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, start, end, false, + false, 1); + + CoordinatorActionBean action = addRecordToWithLazyAction(coordJob.getId(), 1, + CoordinatorAction.Status.SUBMITTED, "coord-rerun-action1.xml"); + + String actionId = action.getId(); + new CoordActionStartXCommand(actionId, getTestUser(), "myapp", "myjob").call(); + + final JPAService jpaService = Services.get().get(JPAService.class); + action = jpaService.execute(new CoordActionGetJPAExecutor(actionId)); + + if (action.getStatus() == CoordinatorAction.Status.SUBMITTED) { + fail("CoordActionStartCommand didn't work because the status for action id" + actionId + " is :" + + action.getStatus() + " expected to be NOT SUBMITTED (i.e. RUNNING)"); + } + + final String wfId = action.getExternalId(); + + final OozieClient wfClient = LocalOozie.getClient(); + + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(wfId).getStatus() == WorkflowJob.Status.RUNNING; + } + }); + + wfClient.kill(wfId); + + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(wfId).getStatus() == WorkflowJob.Status.KILLED; + } + }); + assertEquals(WorkflowJob.Status.KILLED, wfClient.getJobInfo(wfId).getStatus()); + + Properties conf = wfClient.createConfiguration(); + conf.setProperty(OozieClient.RERUN_FAIL_NODES, "true"); + wfClient.reRun(wfId,conf); + + waitFor(15 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return wfClient.getJobInfo(wfId).getStatus() == WorkflowJob.Status.RUNNING; + } + }); + + assertEquals(WorkflowJob.Status.RUNNING, wfClient.getJobInfo(wfId).getStatus()); + OozieClient coordActionClient = LocalOozie.getCoordClient(); + assertEquals(CoordinatorAction.Status.RUNNING,coordActionClient.getCoordActionInfo(actionId).getStatus()); + } + + private CoordinatorActionBean addRecordToWithLazyAction + (String jobId, int actionNum, CoordinatorAction.Status status, String resourceXmlName) throws IOException { + Path appPath = new Path(getFsTestCaseDir(), "coord"); + String actionXml = getCoordActionXml(appPath, resourceXmlName); + String actionNomialTime = getActionNomialTime(actionXml); + + CoordinatorActionBean action = new CoordinatorActionBean(); + action.setJobId(jobId); + action.setId(Services.get().get(UUIDService.class).generateChildId(jobId, actionNum + "")); + action.setActionNumber(actionNum); + try { + action.setNominalTime(DateUtils.parseDateOozieTZ(actionNomialTime)); + } + catch (Exception e) { + e.printStackTrace(); + fail("Unable to get action nominal time"); + throw new IOException(e); + } + action.setLastModifiedTime(new Date()); + action.setStatus(status); + action.setActionXml(actionXml); + + Properties conf = getLazyWorkflowProp(appPath); + String createdConf = XmlUtils.writePropToString(conf); + action.setCreatedConf(createdConf); + + JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + CoordActionInsertJPAExecutor coordActionInsertCmd = new CoordActionInsertJPAExecutor(action); + try { + jpaService.execute(coordActionInsertCmd); + } catch (JPAExecutorException e) { + e.printStackTrace(); + fail("Unable to insert the test coord action record to table"); + } + return action; + } + + private Properties getLazyWorkflowProp(Path appPath) throws IOException { + Path wfAppPath = new Path(getFsTestCaseDir(), "workflow"); + final OozieClient coordClient = LocalOozie.getCoordClient(); + Properties conf = coordClient.createConfiguration(); + conf.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath.toString()); + conf.setProperty("jobTracker", getJobTrackerUri()); + conf.setProperty("nameNode", getNameNodeUri()); + conf.setProperty("wfAppPath", wfAppPath.toString()); + conf.remove("user.name"); + conf.setProperty("user.name", getTestUser()); + writeToFile(getLazyWorkflow(), wfAppPath, "workflow.xml"); + return conf; + } + public String getLazyWorkflow() { + return "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>" + + "<start to='java' />" + + " <action name='java'>" + + "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class>" + JavaSleepAction.class.getName() + "</main-class>" + + "<arg>exit0</arg>" + + "</java>" + + "<ok to='end' />" + + "<error to='fail' />" + + "</action>" + + "<kill name='fail'>" + + "<message>shell action fail, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" + + "</kill>" + + "<end name='end' />" + + "</workflow-app>"; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/211553e3/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 83f736e..8ed6dd8 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2001 Workflow re-runs doesn't update coord action status (jaydeepvishwakarma via shwethags) OOZIE-2048 HadoopAccessorService should also process ssl_client.xml (venkatnrangan via bzhang) OOZIE-2047 Oozie does not support Hive tables that use datatypes introduced since Hive 0.8 (venkatnrangan via bzhang) OOZIE-1808 Change DG_QuickStart.twiki to reflect changes in sharelib installation (ryota)
