Author: rkanter
Date: Fri Aug 9 17:26:39 2013
New Revision: 1512408
URL: http://svn.apache.org/r1512408
Log:
OOZIE-1448 A CoordActionUpdateXCommand gets queued for all workflows even if
they were not launched by a coordinator (rkanter)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
Fri Aug 9 17:26:39 2013
@@ -40,7 +40,6 @@ import org.apache.oozie.client.SLAEvent.
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
@@ -278,8 +277,7 @@ public class ActionStartXCommand extends
case FAILED:
try {
failJob(context);
- // update coordinator action
- new CoordActionUpdateXCommand(wfJob, 3).call();
+ updateParentIfNecessary(wfJob, 3);
new WfEndXCommand(wfJob).call(); // To delete the WF
temp dir
SLAEventBean slaEvent1 =
SLADbXOperations.createStatusEvent(wfAction.getSlaXml(), wfAction.getId(),
Status.FAILED,
SlaAppType.WORKFLOW_ACTION);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
Fri Aug 9 17:26:39 2013
@@ -36,7 +36,6 @@ import org.apache.oozie.action.ActionExe
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.service.CallbackService;
import org.apache.oozie.service.ELService;
import org.apache.oozie.service.HadoopAccessorException;
@@ -129,8 +128,7 @@ public abstract class ActionXCommand<T>
throw new CommandException(ErrorCode.E0727, id, e.getMessage());
}
finally {
- // update coordinator action
- new CoordActionUpdateXCommand(workflow, 3).call();
+ updateParentIfNecessary(workflow, 3);
}
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/KillXCommand.java
Fri Aug 9 17:26:39 2013
@@ -29,7 +29,6 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
@@ -177,8 +176,7 @@ public class KillXCommand extends Workfl
if(wfJob.getStatus() == WorkflowJob.Status.KILLED) {
new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
}
- // update coordinator action
- new CoordActionUpdateXCommand(wfJob).call();
+ updateParentIfNecessary(wfJob);
}
LOG.info("ENDED WorkflowKillXCommand for jobId=" + wfId);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
Fri Aug 9 17:26:39 2013
@@ -153,8 +153,7 @@ public class ResumeXCommand extends Work
throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
}
finally {
- // update coordinator action
- new CoordActionUpdateXCommand(workflow).call();
+ updateParentIfNecessary(workflow);
}
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
Fri Aug 9 17:26:39 2013
@@ -338,8 +338,7 @@ public class SignalXCommand extends Work
LOG.debug(
"Updated the workflow status to " + wfJob.getId() + " status
=" + wfJob.getStatusStr());
if (wfJob.getStatus() != WorkflowJob.Status.RUNNING &&
wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) {
- // update coordinator action
- new CoordActionUpdateXCommand(wfJob).call(); //Note: Called
even if wf is not necessarily instantiated by coordinator
+ updateParentIfNecessary(wfJob);
new WfEndXCommand(wfJob).call(); //To delete the WF temp dir
}
LOG.debug("ENDED SignalCommand for jobid=" + jobId + ", actionId=" +
actionId);
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/SuspendXCommand.java
Fri Aug 9 17:26:39 2013
@@ -28,7 +28,6 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionRetryManualGetJPAExecutor;
@@ -74,8 +73,7 @@ public class SuspendXCommand extends Wor
throw new CommandException(je);
}
finally {
- // update coordinator action
- new CoordActionUpdateXCommand(wfJobBean).call();
+ updateParentIfNecessary(wfJobBean);
}
return null;
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/wf/WorkflowXCommand.java
Fri Aug 9 17:26:39 2013
@@ -20,7 +20,9 @@ package org.apache.oozie.command.wf;
import org.apache.oozie.AppType;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.XCommand;
+import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
import org.apache.oozie.event.WorkflowActionEvent;
import org.apache.oozie.event.WorkflowJobEvent;
@@ -77,4 +79,17 @@ public abstract class WorkflowXCommand<T
}
}
+ protected void updateParentIfNecessary(WorkflowJobBean wfjob, int
maxRetries) throws CommandException {
+ // update coordinator action if the wf was actually started by a coord
+ if (wfjob.getParentId() != null &&
wfjob.getParentId().contains("-C@")) {
+ new CoordActionUpdateXCommand(wfjob, maxRetries).call();
+ }
+ }
+
+ protected void updateParentIfNecessary(WorkflowJobBean wfjob) throws
CommandException {
+ // update coordinator action if the wf was actually started by a coord
+ if (wfjob.getParentId() != null &&
wfjob.getParentId().contains("-C@")) {
+ new CoordActionUpdateXCommand(wfjob).call();
+ }
+ }
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/service/TestStatusTransitService.java
Fri Aug 9 17:26:39 2013
@@ -734,24 +734,6 @@ public class TestStatusTransitService ex
}
- @Override
- protected CoordinatorActionBean addRecordToCoordActionTable(String jobId,
int actionNum,
- CoordinatorAction.Status status, String resourceXmlName, int
pending) throws Exception {
- CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, pending);
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordActionInsertJPAExecutor coordActionInsertCmd = new
CoordActionInsertJPAExecutor(action);
- jpaService.execute(coordActionInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord action record to table");
- throw je;
- }
- return action;
- }
-
/**
* Test : all bundle actions are succeeded - bundle job's status will be
updated to succeeded.
*
@@ -925,37 +907,33 @@ public class TestStatusTransitService ex
assertNotNull(jpaService);
final String bundleId = bundleJob.getId();
- addRecordToBundleActionTable(bundleId, "action1", 1,
Job.Status.KILLED);
- addRecordToBundleActionTable(bundleId, "action2", 1,
Job.Status.KILLED);
+ addRecordToBundleActionTable(bundleId, "action1-C", 1,
Job.Status.KILLED);
+ addRecordToBundleActionTable(bundleId, "action2-C", 1,
Job.Status.KILLED);
String currentDatePlusMonth =
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
- addRecordToCoordJobTableWithBundle(bundleId, "action1",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
- addRecordToCoordJobTableWithBundle(bundleId, "action2",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action1-C",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action2-C",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
- final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- this.addRecordToWfJobTable(coordAction1_1.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_2.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_3.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_4.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_1 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_2 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_3 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_4 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+
+ final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0);
+ final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0);
+
+ final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_3.getId(), wfJob1_3.getStatusStr(), 0);
+ final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_4.getId(), wfJob1_4.getStatusStr(), 0);
- new CoordKillXCommand("action1").call();
- new CoordKillXCommand("action2").call();
+ new CoordKillXCommand("action1-C").call();
+ new CoordKillXCommand("action2-C").call();
waitFor(5 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -979,19 +957,19 @@ public class TestStatusTransitService ex
assertFalse(bundleJob.isPending());
assertEquals(Job.Status.KILLED, bundleJob.getStatus());
- BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1"));
+ BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1-C"));
assertFalse(bundleAction1.isPending());
assertEquals(Job.Status.KILLED, bundleAction1.getStatus());
- CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1"));
+ CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1-C"));
assertFalse(coordJob1.isPending());
assertEquals(Job.Status.KILLED, coordJob1.getStatus());
- BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2"));
+ BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2-C"));
assertFalse(bundleAction2.isPending());
assertEquals(Job.Status.KILLED, bundleAction2.getStatus());
- CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2"));
+ CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2-C"));
assertFalse(coordJob2.isPending());
assertEquals(Job.Status.KILLED, coordJob2.getStatus());
}
@@ -1102,36 +1080,32 @@ public class TestStatusTransitService ex
assertNotNull(jpaService);
final String bundleId = bundleJob.getId();
- addRecordToBundleActionTable(bundleId, "action1", 1,
Job.Status.RUNNING);
- addRecordToBundleActionTable(bundleId, "action2", 1,
Job.Status.RUNNING);
+ addRecordToBundleActionTable(bundleId, "action1-C", 1,
Job.Status.RUNNING);
+ addRecordToBundleActionTable(bundleId, "action2-C", 1,
Job.Status.RUNNING);
String currentDatePlusMonth =
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
- addRecordToCoordJobTableWithBundle(bundleId, "action1",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
- addRecordToCoordJobTableWithBundle(bundleId, "action2",
CoordinatorJob.Status.RUNNING, start, end, true, false, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action1-C",
CoordinatorJob.Status.RUNNING, start, end, false, true, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action2-C",
CoordinatorJob.Status.RUNNING, start, end, true, false, 2);
- final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 1);
- final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 1);
-
- this.addRecordToWfJobTable(coordAction1_1.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_2.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_3.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_4.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_1 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_2 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_3 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_4 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+
+ final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0);
+ final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0);
+
+ final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_3.getId(), wfJob1_3.getStatusStr(), 1);
+ final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_4.getId(), wfJob1_4.getStatusStr(), 1);
- new CoordKillXCommand("action1").call();
+ new CoordKillXCommand("action1-C").call();
waitFor(5 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -1155,19 +1129,19 @@ public class TestStatusTransitService ex
assertTrue(bundleJob.isPending());
assertEquals(Job.Status.RUNNINGWITHERROR, bundleJob.getStatus());
- BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1"));
+ BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1-C"));
assertFalse(bundleAction1.isPending());
assertEquals(Job.Status.KILLED, bundleAction1.getStatus());
- CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1"));
+ CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1-C"));
assertFalse(coordJob1.isPending());
assertEquals(Job.Status.KILLED, coordJob1.getStatus());
- BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2"));
+ BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2-C"));
assertTrue(bundleAction2.isPending());
assertEquals(Job.Status.RUNNING, bundleAction2.getStatus());
- CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2"));
+ CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2-C"));
assertTrue(coordJob2.isPending());
assertEquals(Job.Status.RUNNING, coordJob2.getStatus());
}
@@ -1186,37 +1160,33 @@ public class TestStatusTransitService ex
assertNotNull(jpaService);
final String bundleId = bundleJob.getId();
- addRecordToBundleActionTable(bundleId, "action1", 1,
Job.Status.SUSPENDED);
- addRecordToBundleActionTable(bundleId, "action2", 1,
Job.Status.SUSPENDED);
+ addRecordToBundleActionTable(bundleId, "action1-C", 1,
Job.Status.SUSPENDED);
+ addRecordToBundleActionTable(bundleId, "action2-C", 1,
Job.Status.SUSPENDED);
String currentDatePlusMonth =
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
Date start = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
Date end = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
- addRecordToCoordJobTableWithBundle(bundleId, "action1",
CoordinatorJob.Status.RUNNING, start, end, false, false, 2);
- addRecordToCoordJobTableWithBundle(bundleId, "action2",
CoordinatorJob.Status.RUNNING, start, end, false, false, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action1-C",
CoordinatorJob.Status.RUNNING, start, end, false, false, 2);
+ addRecordToCoordJobTableWithBundle(bundleId, "action2-C",
CoordinatorJob.Status.RUNNING, start, end, false, false, 2);
- final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2", 1,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
- final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2", 2,
- CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0);
-
- this.addRecordToWfJobTable(coordAction1_1.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_2.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_3.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
- this.addRecordToWfJobTable(coordAction1_4.getExternalId(),
WorkflowJob.Status.RUNNING,
- WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_1 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_2 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_3 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ WorkflowJobBean wfJob1_4 =
addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+
+ final CoordinatorActionBean coordAction1_1 =
addRecordToCoordActionTable("action1-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_1.getId(), wfJob1_1.getStatusStr(), 0);
+ final CoordinatorActionBean coordAction1_2 =
addRecordToCoordActionTable("action1-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_2.getId(), wfJob1_2.getStatusStr(), 0);
+
+ final CoordinatorActionBean coordAction1_3 =
addRecordToCoordActionTable("action2-C", 1,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_3.getId(), wfJob1_3.getStatusStr(), 0);
+ final CoordinatorActionBean coordAction1_4 =
addRecordToCoordActionTable("action2-C", 2,
+ CoordinatorAction.Status.RUNNING, "coord-action-get.xml",
wfJob1_4.getId(), wfJob1_4.getStatusStr(), 0);
- new CoordSuspendXCommand("action1").call();
- new CoordSuspendXCommand("action2").call();
+ new CoordSuspendXCommand("action1-C").call();
+ new CoordSuspendXCommand("action2-C").call();
waitFor(5 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
@@ -1240,19 +1210,19 @@ public class TestStatusTransitService ex
assertFalse(bundleJob.isPending());
assertEquals(Job.Status.SUSPENDED, bundleJob.getStatus());
- BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1"));
+ BundleActionBean bundleAction1 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action1-C"));
assertFalse(bundleAction1.isPending());
assertEquals(Job.Status.SUSPENDED, bundleAction1.getStatus());
- CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1"));
+ CoordinatorJobBean coordJob1 = jpaService.execute(new
CoordJobGetJPAExecutor("action1-C"));
assertFalse(coordJob1.isPending());
assertEquals(Job.Status.SUSPENDED, coordJob1.getStatus());
- BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2"));
+ BundleActionBean bundleAction2 = jpaService.execute(new
BundleActionGetJPAExecutor(bundleId, "action2-C"));
assertFalse(bundleAction2.isPending());
assertEquals(Job.Status.SUSPENDED, bundleAction2.getStatus());
- CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2"));
+ CoordinatorJobBean coordJob2 = jpaService.execute(new
CoordJobGetJPAExecutor("action2-C"));
assertFalse(coordJob2.isPending());
assertEquals(Job.Status.SUSPENDED, coordJob2.getStatus());
}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1512408&r1=1512407&r2=1512408&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Aug 9 17:26:39 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1448 A CoordActionUpdateXCommand gets queued for all workflows even if
they were not launched by a coordinator (rkanter)
OOZIE-1443 forkjoin validation should not allow a fork to go to the same node
multiple times (rkanter)
OOZIE-1471 Support glob in FS action and prepare blocks (ryota)
OOZIE-1403 forkjoin validation blocks some valid cases involving decision
nodes (rkanter)