Repository: oozie
Updated Branches:
  refs/heads/master 4e015d45e -> d361ee4d4


OOZIE-1807 Make bundle change command synchronous (puru via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d361ee4d
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d361ee4d
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d361ee4d

Branch: refs/heads/master
Commit: d361ee4d4f065eae4473300b1a961a464f2365b1
Parents: 4e015d4
Author: Rohini Palaniswamy <[email protected]>
Authored: Wed Jun 18 09:55:36 2014 -0700
Committer: Rohini Palaniswamy <[email protected]>
Committed: Wed Jun 18 09:55:36 2014 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/oozie/ErrorCode.java   |   1 +
 .../command/bundle/BundleJobChangeXCommand.java |  25 ++-
 .../bundle/BundleStatusUpdateXCommand.java      |  12 +-
 .../command/coord/CoordChangeXCommand.java      |   9 +-
 .../bundle/TestBundleChangeXCommand.java        | 165 ++++++++++++++++++-
 release-log.txt                                 |   1 +
 6 files changed, 198 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/ErrorCode.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java 
b/core/src/main/java/org/apache/oozie/ErrorCode.java
index d3c1b03..88a2c67 100644
--- a/core/src/main/java/org/apache/oozie/ErrorCode.java
+++ b/core/src/main/java/org/apache/oozie/ErrorCode.java
@@ -232,6 +232,7 @@ public enum ErrorCode {
     E1317(XLog.STD, "Invalid bundle job change value {0}, {1}"),
     E1318(XLog.STD, "No coord jobs for the bundle=[{0}], fail the bundle"),
     E1319(XLog.STD, "Invalid bundle coord job namespace, [{0}]"),
+    E1320(XLog.STD, "Bundle Job change error, [{0}]"),
 
     E1400(XLog.STD, "doAs (proxyuser) failure"),
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
index 1962748..41ad8ae 100644
--- 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleJobChangeXCommand.java
@@ -162,6 +162,7 @@ public class BundleJobChangeXCommand extends XCommand<Void> 
{
      */
     @Override
     protected Void execute() throws CommandException {
+        StringBuffer changeReport = new StringBuffer();
         try {
             if (isChangePauseTime || isChangeEndTime) {
                 if (isChangePauseTime) {
@@ -179,18 +180,28 @@ public class BundleJobChangeXCommand extends 
XCommand<Void> {
                 for (BundleActionBean action : this.bundleActions) {
                     // queue coord change commands;
                     if (action.getStatus() != Job.Status.KILLED && 
action.getCoordId() != null) {
-                        queue(new CoordChangeXCommand(action.getCoordId(), 
changeValue));
-                        LOG.info("Queuing CoordChangeXCommand coord job = " + 
action.getCoordId() + " to change "
-                                + changeValue);
-                        action.setPending(action.getPending() + 1);
-                        updateList.add(new UpdateEntry<BundleActionQuery>(
-                                
BundleActionQuery.UPDATE_BUNDLE_ACTION_PENDING_MODTIME, action));
+                        try {
+                            new CoordChangeXCommand(action.getCoordId(), 
changeValue).call();
+                        }
+                        catch (Exception e) {
+                            String errorMsg = action.getCoordId() + " : " + 
e.getMessage();
+                            LOG.info("Change command failed " + errorMsg);
+                            changeReport.append("[ 
").append(errorMsg).append(" ]");
+                        }
+                    }
+                    else {
+                        String errorMsg = action.getCoordId() + " : Coord is 
in killed state";
+                        LOG.info("Change command failed " + errorMsg);
+                        changeReport.append("[ ").append(errorMsg).append(" 
]");
                     }
                 }
                 updateList.add(new 
UpdateEntry<BundleJobQuery>(BundleJobQuery.UPDATE_BUNDLE_JOB_STATUS_PAUSE_ENDTIME,
                         bundleJob));
                 
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, 
updateList, null);
             }
+            if(!changeReport.toString().isEmpty()){
+                throw new CommandException(ErrorCode.E1320, 
changeReport.toString());
+            }
             return null;
         }
         catch (XException ex) {
@@ -265,4 +276,4 @@ public class BundleJobChangeXCommand extends XCommand<Void> 
{
             }
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
index 4e6e4ed..f32899b 100644
--- 
a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusUpdateXCommand.java
@@ -43,6 +43,7 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
     private JPAService jpaService = null;
     private BundleActionBean bundleaction;
     private final Job.Status prevStatus;
+    private final boolean ignorePending;
 
     /**
      * The constructor for class {@link BundleStatusUpdateXCommand}
@@ -51,9 +52,14 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
      * @param prevStatus coordinator job old status
      */
     public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, 
CoordinatorJob.Status prevStatus) {
+        this(coordjob, prevStatus, false);
+    }
+
+    public BundleStatusUpdateXCommand(CoordinatorJobBean coordjob, 
CoordinatorJob.Status prevStatus, boolean ignorePending) {
         super("BundleStatusUpdate", "BundleStatusUpdate", 1);
         this.coordjob = coordjob;
         this.prevStatus = prevStatus;
+        this.ignorePending = ignorePending;
     }
 
     @Override
@@ -71,7 +77,7 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
                             .isTerminalStatus())) {
                 bundleaction.setStatus(coordCurrentStatus);
             }
-            if (bundleaction.isPending()) {
+            if (bundleaction.isPending() && !ignorePending) {
                 bundleaction.decrementAndGetPending();
             }
             // TODO - Uncomment this when bottom up rerun can change terminal 
state
@@ -149,7 +155,7 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
                         coordjob.getStatus());
                 return;
             }
-            if (bundleaction.isPending() && 
coordjob.getStatus().equals(bundleaction.getStatus())) {
+            if (bundleaction.isPending() && 
coordjob.getStatus().equals(bundleaction.getStatus()) && !ignorePending) {
                 bundleaction.decrementAndGetPending();
             }
             bundleaction.setLastModifiedTime(new Date());
@@ -175,4 +181,4 @@ public class BundleStatusUpdateXCommand extends 
StatusUpdateXCommand {
         }
     }
 
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
index 805856c..a2748c4 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordChangeXCommand.java
@@ -359,13 +359,14 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
                             LOG.info("Changing coord status to SUCCEEDED, 
because it's in " + coordJob.getStatus()
                                     + " and new end time is before start time. 
Startime is " + coordJob.getStartTime()
                                     + " and new end time is " + newEndTime);
+
                             
coordJob.setStatus(CoordinatorJob.Status.SUCCEEDED);
                             coordJob.resetPending();
                         }
                         coordJob.setDoneMaterialization();
                     }
                     else {
-                        // move it to running iff newdtime is after starttime.
+                        // move it to running iff new end time is after 
starttime.
                         if (coordJob.getStatus() == 
CoordinatorJob.Status.SUCCEEDED) {
                             coordJob.setStatus(CoordinatorJob.Status.RUNNING);
                         }
@@ -382,6 +383,7 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
                         processLookaheadActions(coordJob, newEndTime);
                     }
                 }
+
                 else {
                     LOG.info("Didn't change endtime. Endtime is in between 
coord end time and next materialization time."
                             + "Coord endTime = " + 
DateUtils.formatDateOozieTZ(newEndTime)
@@ -453,7 +455,8 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
             LOG.info("ENDED CoordChangeXCommand for jobId=" + jobId);
             // update bundle action
             if (coordJob.getBundleId() != null) {
-                BundleStatusUpdateXCommand bundleStatusUpdate = new 
BundleStatusUpdateXCommand(coordJob, prevStatus);
+                //ignore pending as it'sync command
+                BundleStatusUpdateXCommand bundleStatusUpdate = new 
BundleStatusUpdateXCommand(coordJob, prevStatus, true);
                 bundleStatusUpdate.call();
             }
         }
@@ -505,4 +508,4 @@ public class CoordChangeXCommand extends 
CoordinatorXCommand<Void> {
     protected boolean isLockRequired() {
         return true;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
index 6560db6..aac51b1 100644
--- 
a/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/bundle/TestBundleChangeXCommand.java
@@ -94,8 +94,13 @@ public class TestBundleChangeXCommand extends XDataTestCase {
         assertNotNull(jpaService);
         
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB_BUNDLEID,
 coordJob);
 
+        coordJob.setAppName("COORD-TEST1");
+        assertNotNull(jpaService);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob);
+
+
         BundleActionBean bundleAction = new BundleActionBean();
-        bundleAction.setBundleActionId("11111");
+        bundleAction.setBundleActionId(bundleJob.getId() + "_COORD-TEST1");
         bundleAction.setCoordId(coordJob.getId());
         bundleAction.setBundleId(bundleJob.getId());
         bundleAction.setStatus(Job.Status.SUCCEEDED);
@@ -241,4 +246,160 @@ public class TestBundleChangeXCommand extends 
XDataTestCase {
         assertTrue(coord.getLastModifiedTime().after(lastMod));
 
     }
-}
+
+    //check command report
+    public void testBundleChangeReport() throws Exception {
+        BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+        CoordinatorJobBean coordJob1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob1.setBundleId(bundleJob.getId());
+        coordJob1.setAppName("COORD-TEST1");
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob1);
+        BundleActionBean bundleAction1 = new BundleActionBean();
+        bundleAction1.setBundleActionId(bundleJob.getId() + "_COORD-TEST1");
+        bundleAction1.setCoordId(coordJob1.getId());
+        bundleAction1.setBundleId(bundleJob.getId());
+        bundleAction1.setStatus(Job.Status.SUCCEEDED);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction1));
+
+        CoordinatorJobBean coordJob2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.KILLED, false, false);
+        coordJob2.setBundleId(bundleJob.getId());
+        coordJob2.setAppName("COORD-TEST2");
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob2);
+        BundleActionBean bundleAction2 = new BundleActionBean();
+        bundleAction2.setBundleActionId(bundleJob.getId() + "_COORD-TEST2");
+        bundleAction2.setCoordId(coordJob2.getId());
+        bundleAction2.setBundleId(bundleJob.getId());
+        bundleAction2.setStatus(Job.Status.KILLED);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction2));
+
+        CoordinatorJobBean coordJob3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        addRecordToCoordActionTable(coordJob3.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(coordJob3.getId(), 2, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml",
+                0, DateUtils.parseDateOozieTZ("2013-08-01T02:00Z"));
+        addRecordToCoordActionTable(coordJob3.getId(), 3, 
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0,
+                DateUtils.parseDateOozieTZ("2013-08-01T03:00Z"));
+        addRecordToCoordActionTable(coordJob3.getId(), 4, 
CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0,
+                DateUtils.parseDateOozieTZ("2013-08-01T04:00Z"));
+
+        coordJob3.setBundleId(bundleJob.getId());
+        coordJob3.setAppName("COORD-TEST3");
+        coordJob3.setLastActionNumber(4);
+        coordJob3.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T04:00Z"));
+        
coordJob3.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z"));
+
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob3);
+        BundleActionBean bundleAction3 = new BundleActionBean();
+        bundleAction3.setBundleActionId(bundleJob.getId() + "_COORD-TEST3");
+        bundleAction3.setCoordId(coordJob3.getId());
+        bundleAction3.setBundleId(bundleJob.getId());
+        bundleAction3.setStatus(Job.Status.RUNNING);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction3));
+        BundleJobGetJPAExecutor bundleJobGetCmd = new 
BundleJobGetJPAExecutor(bundleJob.getId());
+
+        String dateStr = "2099-01-01T01:00Z";
+        bundleJob = jpaService.execute(bundleJobGetCmd);
+        assertEquals(bundleJob.getPauseTime(), null);
+        String reports = null;
+        try {
+            new BundleJobChangeXCommand(bundleJob.getId(), "pausetime=" + 
dateStr).call();
+        }
+        catch (Exception e) {
+            reports = e.getMessage();
+        }
+        assertTrue(reports.contains(coordJob2.getId() + " : Coord is in killed 
state"));
+
+        bundleJobGetCmd = new BundleJobGetJPAExecutor(bundleJob.getId());
+        dateStr = "2013-08-01T03:00Z";
+
+        bundleJob = jpaService.execute(bundleJobGetCmd);
+        try {
+            new BundleJobChangeXCommand(bundleJob.getId(), "endtime=" + 
dateStr).call();
+        }
+        catch (Exception e) {
+            reports = e.getMessage();
+        }
+        assertTrue(reports.contains(coordJob2.getId() + " : Coord is in killed 
state"));
+        assertTrue(reports.contains(coordJob3.getId() + " : E1022: Cannot 
delete running/completed coordinator action"));
+    }
+
+    // Check partial update.
+    // 3 coord job with status SUCCEEDED, PREP, RUNNING.
+    // Changing endtime of running coord will fail because end time is before 
one of running action nominal time.
+    public void testCheckBundleActionStatus() throws Exception {
+        BundleJobBean bundleJob = 
this.addRecordToBundleJobTable(Job.Status.RUNNING, false);
+        CoordinatorJobBean coordJob1 = 
addRecordToCoordJobTable(CoordinatorJob.Status.SUCCEEDED, false, false);
+        coordJob1.setBundleId(bundleJob.getId());
+        coordJob1.setAppName("COORD-TEST1");
+        coordJob1.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T02:00Z"));
+        
coordJob1.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z"));
+
+        final JPAService jpaService = Services.get().get(JPAService.class);
+        assertNotNull(jpaService);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob1);
+        BundleActionBean bundleAction1 = new BundleActionBean();
+        bundleAction1.setBundleActionId(bundleJob.getId() + "_COORD-TEST1");
+        bundleAction1.setCoordId(coordJob1.getId());
+        bundleAction1.setBundleId(bundleJob.getId());
+        bundleAction1.setStatus(Job.Status.SUCCEEDED);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction1));
+
+        CoordinatorJobBean coordJob2 = 
addRecordToCoordJobTable(CoordinatorJob.Status.PREP, false, false);
+        coordJob2.setBundleId(bundleJob.getId());
+        coordJob2.setAppName("COORD-TEST2");
+        
coordJob2.setStartTime(DateUtils.parseDateOozieTZ("2099-08-01T02:00Z"));
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob2);
+        BundleActionBean bundleAction2 = new BundleActionBean();
+        bundleAction2.setBundleActionId(bundleJob.getId() + "_COORD-TEST2");
+        bundleAction2.setCoordId(coordJob2.getId());
+        bundleAction2.setBundleId(bundleJob.getId());
+        bundleAction2.setStatus(Job.Status.PREP);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction2));
+
+        CoordinatorJobBean coordJob3 = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false);
+        addRecordToCoordActionTable(coordJob3.getId(), 1, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0);
+        addRecordToCoordActionTable(coordJob3.getId(), 2, 
CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml",
+                0, DateUtils.parseDateOozieTZ("2013-08-01T02:00Z"));
+        addRecordToCoordActionTable(coordJob3.getId(), 3, 
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0,
+                DateUtils.parseDateOozieTZ("2013-08-01T03:00Z"));
+        addRecordToCoordActionTable(coordJob3.getId(), 4, 
CoordinatorAction.Status.RUNNING, "coord-action-get.xml", 0,
+                DateUtils.parseDateOozieTZ("2013-08-01T04:00Z"));
+
+        coordJob3.setBundleId(bundleJob.getId());
+        coordJob3.setAppName("COORD-TEST3");
+        coordJob3.setLastActionNumber(4);
+        coordJob3.setEndTime(DateUtils.parseDateOozieTZ("2013-08-01T04:00Z"));
+        
coordJob3.setStartTime(DateUtils.parseDateOozieTZ("2013-08-01T00:00Z"));
+
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coordJob3);
+        BundleActionBean bundleAction3 = new BundleActionBean();
+        bundleAction3.setBundleActionId(bundleJob.getId() + "_COORD-TEST3");
+        bundleAction3.setPending(1);
+        bundleAction3.setCoordId(coordJob3.getId());
+        bundleAction3.setBundleId(bundleJob.getId());
+        bundleAction3.setStatus(Job.Status.RUNNING);
+        jpaService.execute(new BundleActionInsertJPAExecutor(bundleAction3));
+
+        String dateStr = "2013-08-01T03:00Z";
+        try {
+            new BundleJobChangeXCommand(bundleJob.getId(), "endtime=" + 
dateStr).call();
+            fail("should throw exception");
+        }
+        catch (Exception e) {
+            String reports = e.getMessage();
+            assertTrue(reports.contains(coordJob3.getId() + " : E1022: Cannot 
delete running/completed coordinator action"));
+        }
+        BundleActionBean action1 = 
BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION,
+                bundleJob.getId() + "_COORD-TEST1");
+        assertEquals(CoordinatorJob.Status.RUNNING, action1.getStatus());
+        BundleActionBean action2 = 
BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION,
+                bundleJob.getId() + "_COORD-TEST2");
+        assertEquals(CoordinatorJob.Status.SUCCEEDED, action2.getStatus());
+        BundleActionBean action3 = 
BundleActionQueryExecutor.getInstance().get(BundleActionQuery.GET_BUNDLE_ACTION,
+                bundleJob.getId() + "_COORD-TEST3");
+        //No change in running
+        assertEquals(CoordinatorJob.Status.RUNNING, action3.getStatus());
+        assertEquals(action3.getPending(), 1);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/d361ee4d/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4ea53ec..f14f81d 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1807 Make bundle change command synchronous (puru via rohini)
 OOZIE-1678 HA support for SLA (ryota)
 OOZIE-1685 Oozie doesn’t process correctly workflows with a non-default name 
node (benjzh via rohini)
 OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)

Reply via email to