Repository: oozie Updated Branches: refs/heads/master 4e015d45e -> d361ee4d4
OOZIE-1807 Make bundle change command synchronous (puru via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d361ee4d Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d361ee4d Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d361ee4d Branch: refs/heads/master Commit: d361ee4d4f065eae4473300b1a961a464f2365b1 Parents: 4e015d4 Author: Rohini Palaniswamy <[email protected]> Authored: Wed Jun 18 09:55:36 2014 -0700 Committer: Rohini Palaniswamy <[email protected]> Committed: Wed Jun 18 09:55:36 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 1 + .../command/bundle/BundleJobChangeXCommand.java | 25 ++- .../bundle/BundleStatusUpdateXCommand.java | 12 +- .../command/coord/CoordChangeXCommand.java | 9 +- .../bundle/TestBundleChangeXCommand.java | 165 ++++++++++++++++++- release-log.txt | 1 + 6 files changed, 198 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index d3c1b03..88a2c67 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -232,6 +232,7 @@ public enum ErrorCode { E1317(XLog.STD, "Invalid bundle job change value {0}, {1}"), E1318(XLog.STD, "No coord jobs for the bundle=[{0}], fail the bundle"), E1319(XLog.STD, "Invalid bundle coord job namespace, [{0}]"), + E1320(XLog.STD, "Bundle Job change error, [{0}]"), E1400(XLog.STD, "doAs (proxyuser) failure"), http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java index 1962748..41ad8ae 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java @@ -162,6 +162,7 @@ public class BundleJobChangeXCommand extends XCommand<Void> { */ @Override protected Void execute() throws CommandException { + StringBuffer changeReport = new StringBuffer(); try { if (isChangePauseTime || isChangeEndTime) { if (isChangePauseTime) { @@ -179,18 +180,28 @@ public class BundleJobChangeXCommand extends XCommand<Void> { for (BundleActionBean action : this.bundleActions) { // queue coord change commands; if (action.getStatus() != Job.Status.KILLED && action.getCoordId() != null) { - queue(new CoordChangeXCommand(action.getCoordId(), changeValue)); - LOG.info("Queuing CoordChangeXCommand coord job = " + action.getCoordId() + " to change " - + changeValue); - action.setPending(action.getPending() + 1); - updateList.add(new UpdateEntry<BundleActionQuery>( - BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, action)); + try { + new CoordChangeXCommand(action.getCoordId(), changeValue).call(); + } + catch (Exception e) { + String errorMsg = action.getCoordId() + " : " + e.getMessage(); + LOG.info("Change command failed " + errorMsg); + changeReport.append("[ ").append(errorMsg).append(" ]"); + } + } + else { + String errorMsg = action.getCoordId() + " : Coord is in killed state"; + LOG.info("Change command failed " + errorMsg); + changeReport.append("[ ").append(errorMsg).append(" ]"); } } updateList.add(new UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME, bundleJob)); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); } + if(!changeReport.toString().isEmpty()){ + throw new CommandException(ErrorCode.E1320, changeReport.toString()); + } return null; } catch (XException ex) { @@ -265,4 +276,4 @@ public class BundleJobChangeXCommand extends XCommand<Void> { } } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java index 4e6e4ed..f32899b 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java @@ -43,6 +43,7 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { private JPAService jpaService = null; private BundleActionBean bundleaction; private final Job.Status prevStatus; + private final boolean ignorePending; /** * The constructor for class {@link BundleStatusUpdateXCommand} @@ -51,9 +52,14 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { * @param prevStatus coordinator job old status */ public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus) { + this(coordjob, prevStatus, false); + } + + public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, CoordinatorJob.Status prevStatus, boolean ignorePending) { super("BundleStatusUpdate", "BundleStatusUpdate", 1); this.coordjob = coordjob; this.prevStatus = prevStatus; + this.ignorePending = ignorePending; } @Override @@ -71,7 +77,7 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { .isTerminalStatus())) { bundleaction.setStatus(coordCurrentStatus); } - if (bundleaction.isPending()) { + if (bundleaction.isPending() && !ignorePending) { bundleaction.decrementAndGetPending(); } // TODO - Uncomment this when bottom up rerun can change terminal state @@ -149,7 +155,7 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { coordjob.getStatus()); return; } - if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus())) { + if (bundleaction.isPending() && coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) { bundleaction.decrementAndGetPending(); } bundleaction.setLastModifiedTime(new Date()); @@ -175,4 +181,4 @@ public class BundleStatusUpdateXCommand extends StatusUpdateXCommand { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java index 805856c..a2748c4 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java @@ -359,13 +359,14 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { LOG.info("Changing coord status to SUCCEEDED, because it's in " + coordJob.getStatus() + " and new end time is before start time. Startime is " + coordJob.getStartTime() + " and new end time is " + newEndTime); + coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED); coordJob.resetPending(); } coordJob.setDoneMaterialization(); } else { - // move it to running iff newdtime is after starttime. + // move it to running iff new end time is after starttime. if (coordJob.getStatus() == CoordinatorJob.Status.SUCCEEDED) { coordJob.setStatus(CoordinatorJob.Status.RUNNING); } @@ -382,6 +383,7 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { processLookaheadActions(coordJob, newEndTime); } } + else { LOG.info("Didn't change endtime. Endtime is in between coord end time and next materialization time." + "Coord endTime = " + DateUtils.formatDateOozieTZ(newEndTime) @@ -453,7 +455,8 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId); // update bundle action if (coordJob.getBundleId() != null) { - BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus); + //ignore pending as it'sync command + BundleStatusUpdateXCommand bundleStatusUpdate = new BundleStatusUpdateXCommand(coordJob, prevStatus, true); bundleStatusUpdate.call(); } } @@ -505,4 +508,4 @@ public class CoordChangeXCommand extends CoordinatorXCommand<Void> { protected boolean isLockRequired() { return true; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java index 6560db6..aac51b1 100644 --- a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java @@ -94,8 +94,13 @@ public class TestBundleChangeXCommand extends XDataTestCase { assertNotNull(jpaService); CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID, coordJob); + coordJob.setAppName("COORD-TEST1"); + assertNotNull(jpaService); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob); + + BundleActionBean bundleAction = new BundleActionBean(); - bundleAction.setBundleActionId("11111"); + bundleAction.setBundleActionId(bundleJob.getId() + "_COORD-TEST1"); bundleAction.setCoordId(coordJob.getId()); bundleAction.setBundleId(bundleJob.getId()); bundleAction.setStatus(Job.Status.SUCCEEDED); @@ -241,4 +246,160 @@ public class TestBundleChangeXCommand extends XDataTestCase { assertTrue(coord.getLastModifiedTime().after(lastMod)); } -} + + //check command report + public void testBundleChangeReport() throws Exception { + BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + coordJob1.setBundleId(bundleJob.getId()); + coordJob1.setAppName("COORD-TEST1"); + final JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob1); + BundleActionBean bundleAction1 = new BundleActionBean(); + bundleAction1.setBundleActionId(bundleJob.getId() + "_COORD-TEST1"); + bundleAction1.setCoordId(coordJob1.getId()); + bundleAction1.setBundleId(bundleJob.getId()); + bundleAction1.setStatus(Job.Status.SUCCEEDED); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction1)); + + CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false); + coordJob2.setBundleId(bundleJob.getId()); + coordJob2.setAppName("COORD-TEST2"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2); + BundleActionBean bundleAction2 = new BundleActionBean(); + bundleAction2.setBundleActionId(bundleJob.getId() + "_COORD-TEST2"); + bundleAction2.setCoordId(coordJob2.getId()); + bundleAction2.setBundleId(bundleJob.getId()); + bundleAction2.setStatus(Job.Status.KILLED); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction2)); + + CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + addRecordToCoordActionTable(coordJob3.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", + 0, DateUtils.parseDateOozieTZ("2013-08-01T02:00Z")); + addRecordToCoordActionTable(coordJob3.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0, + DateUtils.parseDateOozieTZ("2013-08-01T03:00Z")); + addRecordToCoordActionTable(coordJob3.getId(), 4, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, + DateUtils.parseDateOozieTZ("2013-08-01T04:00Z")); + + coordJob3.setBundleId(bundleJob.getId()); + coordJob3.setAppName("COORD-TEST3"); + coordJob3.setLastActionNumber(4); + coordJob3.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T04:00Z")); + coordJob3.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z")); + + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3); + BundleActionBean bundleAction3 = new BundleActionBean(); + bundleAction3.setBundleActionId(bundleJob.getId() + "_COORD-TEST3"); + bundleAction3.setCoordId(coordJob3.getId()); + bundleAction3.setBundleId(bundleJob.getId()); + bundleAction3.setStatus(Job.Status.RUNNING); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction3)); + BundleJobGetJPAExecutor bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId()); + + String dateStr = "2099-01-01T01:00Z"; + bundleJob = jpaService.execute(bundleJobGetCmd); + assertEquals(bundleJob.getPauseTime(), null); + String reports = null; + try { + new BundleJobChangeXCommand(bundleJob.getId(), "pausetime=" + dateStr).call(); + } + catch (Exception e) { + reports = e.getMessage(); + } + assertTrue(reports.contains(coordJob2.getId() + " : Coord is in killed state")); + + bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId()); + dateStr = "2013-08-01T03:00Z"; + + bundleJob = jpaService.execute(bundleJobGetCmd); + try { + new BundleJobChangeXCommand(bundleJob.getId(), "endtime=" + dateStr).call(); + } + catch (Exception e) { + reports = e.getMessage(); + } + assertTrue(reports.contains(coordJob2.getId() + " : Coord is in killed state")); + assertTrue(reports.contains(coordJob3.getId() + " : E1022: Cannot delete running/completed coordinator action")); + } + + // Check partial update. + // 3 coord job with status SUCCEEDED, PREP, RUNNING. + // Changing endtime of running coord will fail because end time is before one of running action nominal time. + public void testCheckBundleActionStatus() throws Exception { + BundleJobBean bundleJob = this.addRecordToBundleJobTable(Job.Status.RUNNING, false); + CoordinatorJobBean coordJob1 = addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false); + coordJob1.setBundleId(bundleJob.getId()); + coordJob1.setAppName("COORD-TEST1"); + coordJob1.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T02:00Z")); + coordJob1.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z")); + + final JPAService jpaService = Services.get().get(JPAService.class); + assertNotNull(jpaService); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob1); + BundleActionBean bundleAction1 = new BundleActionBean(); + bundleAction1.setBundleActionId(bundleJob.getId() + "_COORD-TEST1"); + bundleAction1.setCoordId(coordJob1.getId()); + bundleAction1.setBundleId(bundleJob.getId()); + bundleAction1.setStatus(Job.Status.SUCCEEDED); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction1)); + + CoordinatorJobBean coordJob2 = addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false); + coordJob2.setBundleId(bundleJob.getId()); + coordJob2.setAppName("COORD-TEST2"); + coordJob2.setStartTime(DateUtils.parseDateOozieTZ("2099-08-01T02:00Z")); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob2); + BundleActionBean bundleAction2 = new BundleActionBean(); + bundleAction2.setBundleActionId(bundleJob.getId() + "_COORD-TEST2"); + bundleAction2.setCoordId(coordJob2.getId()); + bundleAction2.setBundleId(bundleJob.getId()); + bundleAction2.setStatus(Job.Status.PREP); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction2)); + + CoordinatorJobBean coordJob3 = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(coordJob3.getId(), 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); + addRecordToCoordActionTable(coordJob3.getId(), 2, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", + 0, DateUtils.parseDateOozieTZ("2013-08-01T02:00Z")); + addRecordToCoordActionTable(coordJob3.getId(), 3, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0, + DateUtils.parseDateOozieTZ("2013-08-01T03:00Z")); + addRecordToCoordActionTable(coordJob3.getId(), 4, CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0, + DateUtils.parseDateOozieTZ("2013-08-01T04:00Z")); + + coordJob3.setBundleId(bundleJob.getId()); + coordJob3.setAppName("COORD-TEST3"); + coordJob3.setLastActionNumber(4); + coordJob3.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T04:00Z")); + coordJob3.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z")); + + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coordJob3); + BundleActionBean bundleAction3 = new BundleActionBean(); + bundleAction3.setBundleActionId(bundleJob.getId() + "_COORD-TEST3"); + bundleAction3.setPending(1); + bundleAction3.setCoordId(coordJob3.getId()); + bundleAction3.setBundleId(bundleJob.getId()); + bundleAction3.setStatus(Job.Status.RUNNING); + jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction3)); + + String dateStr = "2013-08-01T03:00Z"; + try { + new BundleJobChangeXCommand(bundleJob.getId(), "endtime=" + dateStr).call(); + fail("should throw exception"); + } + catch (Exception e) { + String reports = e.getMessage(); + assertTrue(reports.contains(coordJob3.getId() + " : E1022: Cannot delete running/completed coordinator action")); + } + BundleActionBean action1 = BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION, + bundleJob.getId() + "_COORD-TEST1"); + assertEquals(CoordinatorJob.Status.RUNNING, action1.getStatus()); + BundleActionBean action2 = BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION, + bundleJob.getId() + "_COORD-TEST2"); + assertEquals(CoordinatorJob.Status.SUCCEEDED, action2.getStatus()); + BundleActionBean action3 = BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION, + bundleJob.getId() + "_COORD-TEST3"); + //No change in running + assertEquals(CoordinatorJob.Status.RUNNING, action3.getStatus()); + assertEquals(action3.getPending(), 1); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4ea53ec..f14f81d 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1807 Make bundle change command synchronous (puru via rohini) OOZIE-1678 HA support for SLA (ryota) OOZIE-1685 Oozie doesnât process correctly workflows with a non-default name node (benjzh via rohini) OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)
