Repository: oozie
Updated Branches:
  refs/heads/master cbb1eac9d -> 85ee1a268


OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/85ee1a26
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/85ee1a26
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/85ee1a26

Branch: refs/heads/master
Commit: 85ee1a268ae2b2341ee641e75f8be18e79ad3a85
Parents: cbb1eac
Author: Bowen Zhang <[email protected]>
Authored: Mon Jun 16 18:43:20 2014 -0700
Committer: Bowen Zhang <[email protected]>
Committed: Mon Jun 16 18:44:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/oozie/client/CoordinatorJob.java |  2 +-
 .../coord/CoordActionInputCheckXCommand.java    | 23 ++++++++++-
 .../command/coord/CoordActionReadyXCommand.java |  2 +-
 .../CoordMaterializeTransitionXCommand.java     |  7 +++-
 .../command/coord/CoordSubmitXCommand.java      |  3 +-
 .../jpa/CoordJobGetReadyActionsJPAExecutor.java |  4 +-
 .../apache/oozie/store/CoordinatorStore.java    |  6 +--
 core/src/main/resources/oozie-default.xml       | 12 +++++-
 .../TestCoordActionInputCheckXCommand.java      | 40 ++++++++++++++++++++
 .../site/twiki/CoordinatorFunctionalSpec.twiki  | 15 +++++++-
 release-log.txt                                 |  1 +
 11 files changed, 102 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/client/src/main/java/org/apache/oozie/client/CoordinatorJob.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/oozie/client/CoordinatorJob.java 
b/client/src/main/java/org/apache/oozie/client/CoordinatorJob.java
index 6a60d13..7dc8007 100644
--- a/client/src/main/java/org/apache/oozie/client/CoordinatorJob.java
+++ b/client/src/main/java/org/apache/oozie/client/CoordinatorJob.java
@@ -29,7 +29,7 @@ public interface CoordinatorJob extends Job {
      * Defines the possible execution order of an Oozie application.
      */
     public static enum Execution {
-        FIFO, LIFO, LAST_ONLY
+        FIFO, LIFO, LAST_ONLY, NONE
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
index 19f867e..0dc7e33 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
@@ -150,8 +150,8 @@ public class CoordActionInputCheckXCommand extends 
CoordinatorXCommand<Void> {
         try {
             Configuration actionConf = new XConfiguration(new 
StringReader(coordAction.getRunConf()));
             cron.start();
+            Date now = new Date();
             if 
(coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.LAST_ONLY)) {
-                Date now = new Date();
                 Date nextNominalTime = computeNextNominalTime();
                 if (nextNominalTime != null) {
                     // If the current time is after the next action's nominal 
time, then we've passed the window where this action
@@ -169,6 +169,27 @@ public class CoordActionInputCheckXCommand extends 
CoordinatorXCommand<Void> {
                     }
                 }
             }
+            else if 
(coordJob.getExecutionOrder().equals(CoordinatorJobBean.Execution.NONE)) {
+                // If the current time is after the nominal time of this 
action plus some tolerance,
+                // then we've passed the window where this action
+                // should be started; so set it to SKIPPED
+                Calendar cal = 
Calendar.getInstance(DateUtils.getTimeZone(coordJob.getTimeZone()));
+                cal.setTime(nominalTime);
+                cal.add(Calendar.MINUTE, 
Services.get().getConf().getInt("oozie.coord.execution.none.tolerance", 1));
+                nominalTime = cal.getTime();
+                if (now.after(nominalTime)) {
+                    LOG.info("NONE execution: Preparing to skip action [{0}] 
because the current time [{1}] is later than "
+                            + "the nominal time [{2}] of the current action]", 
coordAction.getId(),
+                            DateUtils.formatDateOozieTZ(now), 
DateUtils.formatDateOozieTZ(nominalTime));
+                    queue(new CoordActionSkipXCommand(coordAction, 
coordJob.getUser(), coordJob.getAppName()));
+                    return null;
+                } else {
+                    LOG.debug("NONE execution: Not skipping action [{0}] 
because the current time [{1}] is earlier than "
+                            + "the nominal time [{2}] of the current action]", 
coordAction.getId(),
+                            DateUtils.formatDateOozieTZ(now), 
DateUtils.formatDateOozieTZ(coordAction.getNominalTime()));
+                }
+            }
+
             StringBuilder existList = new StringBuilder();
             StringBuilder nonExistList = new StringBuilder();
             StringBuilder nonResolvedList = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
index d97dc4d..ffe85f0 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordActionReadyXCommand.java
@@ -53,7 +53,7 @@ public class CoordActionReadyXCommand extends 
CoordinatorXCommand<Void> {
     /**
      * Check for READY actions and change state to SUBMITTED by a command to 
submit the job to WF engine.
      * This method checks all the actions associated with a jobId to figure 
out which actions
-     * to start (based on concurrency and execution order [FIFO, LIFO, 
LAST_ONLY])
+     * to start (based on concurrency and execution order [FIFO, LIFO, 
LAST_ONLY, NONE])
      *
      */
     protected Void execute() throws CommandException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 515e247..a562c77 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -231,7 +231,8 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
         if (currentMatTime.after(new Date())) {
             return currentMatTime;
         }
-        if 
(coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY)) {
+        if 
(coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
+                
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE)) {
             return new Date();
         }
         int frequency = 0;
@@ -414,7 +415,9 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
         int maxActionToBeCreated = coordJob.getMatThrottling() - 
numWaitingActions;
         // If LAST_ONLY and all materialization is in the past, ignore 
maxActionsToBeCreated
         boolean ignoreMaxActions =
-                
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) && 
endMatdTime.before(new Date());
+                
(coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.LAST_ONLY) ||
+                        
coordJob.getExecutionOrder().equals(CoordinatorJob.Execution.NONE))
+                        && endMatdTime.before(new Date());
         LOG.debug("Coordinator job :" + coordJob.getId() + ", 
maxActionToBeCreated :" + maxActionToBeCreated
                 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", 
numWaitingActions :" + numWaitingActions);
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java 
b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
index 91fb5da..11fde6f 100644
--- a/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
+++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSubmitXCommand.java
@@ -780,7 +780,8 @@ public class CoordSubmitXCommand extends 
SubmitTransitionXCommand {
             val = Execution.FIFO.toString();
         }
         coordJob.setExecutionOrder(Execution.valueOf(val));
-        String[] acceptedVals = { Execution.LIFO.toString(), 
Execution.FIFO.toString(), Execution.LAST_ONLY.toString() };
+        String[] acceptedVals = { Execution.LIFO.toString(), 
Execution.FIFO.toString(), Execution.LAST_ONLY.toString(),
+            Execution.NONE.toString()};
         ParamChecker.isMember(val, acceptedVals, "execution");
 
         // datasets

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
index 42dbca8..30f9c9f 100644
--- 
a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetReadyActionsJPAExecutor.java
@@ -55,11 +55,11 @@ public class CoordJobGetReadyActionsJPAExecutor implements 
JPAExecutor<List<Coor
         List<CoordinatorActionBean> actionBeans = null;
         try {
             Query q;
-            // check if executionOrder is FIFO, LIFO, or LAST_ONLY
+            // check if executionOrder is FIFO, LIFO, LAST_ONLY, or NONE
             if (executionOrder.equalsIgnoreCase("FIFO")) {
                 q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
             }
-            else {      // LIFO or LAST_ONLY
+            else {      // LIFO, LAST_ONLY, or NONE
                 q = em.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_LIFO");
             }
             q.setParameter("jobId", coordJobId);

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java 
b/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
index 8fd5fc3..9021778 100644
--- a/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
+++ b/core/src/main/java/org/apache/oozie/store/CoordinatorStore.java
@@ -267,11 +267,11 @@ public class CoordinatorStore extends Store {
 
     /**
      * Return CoordinatorActions for a jobID. Action should be in READY state. 
Number of returned actions should be <=
-     * concurrency number. Sort returned actions based on execution order 
(FIFO, LIFO, LAST_ONLY)
+     * concurrency number. Sort returned actions based on execution order 
(FIFO, LIFO, LAST_ONLY, NONE)
      *
      * @param id job ID
      * @param numResults number of results to return
-     * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY
+     * @param executionOrder execution for this job - FIFO, LIFO, LAST_ONLY, 
NONE
      * @return List of CoordinatorActionBean
      * @throws StoreException
      */
@@ -284,7 +284,7 @@ public class CoordinatorStore extends Store {
 
                                                                   
List<CoordinatorActionBean> caBeans;
                                                                   Query q;
-                                                                  // check if 
executionOrder is FIFO, LIFO, or LAST_ONLY
+                                                                  // check if 
executionOrder is FIFO, LIFO, NONE or LAST_ONLY
                                                                   if 
(executionOrder.equalsIgnoreCase("FIFO")) {
                                                                       q = 
entityManager.createNamedQuery("GET_COORD_ACTIONS_FOR_JOB_FIFO");
                                                                   }

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index c9d9591..b944d3d 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1843,7 +1843,17 @@
         </description>
     </property>
 
-       <!-- Coordinator Actions default length -->
+    <!-- Coordinator "NONE" execution order default time tolerance -->
+    <property>
+        <name>oozie.coord.execution.none.tolerance</name>
+        <value>1</value>
+        <description>
+            Default time tolerance in minutes after action nominal time for an 
action to be skipped
+            when execution order is "NONE"
+        </description>
+    </property>
+
+    <!-- Coordinator Actions default length -->
        <property>
                <name>oozie.coord.actions.default.length</name>
                <value>1000</value>

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
index 0eafacf..fdeec63 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
@@ -831,6 +831,46 @@ public class TestCoordActionInputCheckXCommand extends 
XDataTestCase {
         checkCoordActionStatus(actionId5, CoordinatorAction.Status.WAITING);
     }
 
+    @Test
+    public void testNone() throws Exception {
+        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNING, false, true);
+        job.setExecutionOrder(CoordinatorJobBean.Execution.NONE);
+        job.setFrequency("10");
+        job.setTimeUnit(Timeunit.MINUTE);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB,
 job);
+        String missingDeps = "hdfs:///dirx/filex";
+
+        // nominal time is one hour past. So aciton will be skipped
+        String actionId1 = addInitRecords(missingDeps, null, TZ, job, 1);
+        Date nomTime = new Date(new Date().getTime() - 60 * 60 * 1000);     // 
1 hour ago
+        setCoordActionNominalTime(actionId1, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId1, job.getId()).call();
+        checkCoordActionStatus(actionId1, CoordinatorAction.Status.SKIPPED);
+
+        // Nominal time is 60 minutes from now, action should be in WAITING 
stage
+        String actionId2 = addInitRecords(missingDeps, null, TZ, job, 2);
+        nomTime = new Date(new Date().getTime() + 60 * 60 * 1000);          // 
1 hour from now
+        setCoordActionNominalTime(actionId2, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId2, job.getId()).call();
+        checkCoordActionStatus(actionId2, CoordinatorAction.Status.WAITING);
+
+        // Nominal times of both actions are more than one minute past, so 
both will be skipped
+        String actionId3 = addInitRecords(missingDeps, null, TZ, job, 3);
+        nomTime = new Date(new Date().getTime() - 5 * 60 * 1000);           // 
5 minutes ago
+        setCoordActionNominalTime(actionId3, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId3, job.getId()).call();
+        checkCoordActionStatus(actionId3, CoordinatorAction.Status.SKIPPED);
+
+        String actionId4 = addInitRecords(missingDeps, null, TZ, job, 4);
+        nomTime = new Date(new Date().getTime() - 2 * 60 * 1000);           // 
2 minutes ago
+        setCoordActionNominalTime(actionId4, nomTime.getTime());
+        new CoordActionInputCheckXCommand(actionId4, job.getId()).call();
+        checkCoordActionStatus(actionId4, CoordinatorAction.Status.SKIPPED);
+
+
+    }
+
     protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status,
             Date start, Date end, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki 
b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
index 11018b0..961377e 100644
--- a/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
+++ b/docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
@@ -881,6 +881,9 @@ A coordinator action may remain in *READY* status for a 
while, without starting
 A coordinator action in *READY* or *WAITING* status changes to *SKIPPED* 
status if the execution strategy is LAST_ONLY and the
 current time is past the next action's nominal time.  See section 6.3 for more 
details.
 
+A coordinator action in *READY* or *WAITING* status changes to *SKIPPED* 
status if the execution strategy is NONE and the
+current time is past the action's nominal time + 1 minute.  See section 6.3 
for more details.
+
 A coordinator action in *READY* status changes to *SUBMITTED* status if total 
current *RUNNING* and *SUBMITTED* actions are less than concurrency execution 
limit.
 
 A coordinator action in *SUBMITTED* status changes to *RUNNING* status when 
the workflow engine start execution of the coordinator action.
@@ -931,7 +934,7 @@ The execution policies for the actions of a coordinator job 
can be defined in th
 
    * Timeout: A coordinator job can specify the timeout for its coordinator 
actions, this is, how long the coordinator action will be in *WAITING* or 
*READY* status before giving up on its execution.
    * Concurrency: A coordinator job can specify the concurrency for its 
coordinator actions, this is, how many coordinator actions are allowed to run 
concurrently ( *RUNNING* status) before the coordinator engine starts 
throttling them.
-   * Execution strategy: A coordinator job can specify the execution strategy 
of its coordinator actions when there is backlog of coordinator actions in the 
coordinator engine. The different execution strategies are 'oldest first', 
'newest first' and 'last one only'. A backlog normally happens because of 
delayed input data, concurrency control or because manual re-runs of 
coordinator jobs.
+   * Execution strategy: A coordinator job can specify the execution strategy 
of its coordinator actions when there is backlog of coordinator actions in the 
coordinator engine. The different execution strategies are 'oldest first', 
'newest first', 'none' and 'last one only'. A backlog normally happens because 
of delayed input data, concurrency control or because manual re-runs of 
coordinator jobs.
    * Throttle: A coordinator job can specify the materialization or creation 
throttle value for its coordinator actions, this is, how many maximum 
coordinator actions are allowed to be in WAITING state concurrently.
 
 ---++++ 6.1.7. Data Pipeline Application
@@ -979,6 +982,7 @@ A synchronous coordinator definition is a is defined by a 
name, start time and e
          * =FIFO= (oldest first) *default*.
          * =LIFO= (newest first).
          * =LAST_ONLY= (see explanation below).
+         * =NONE= (discards all older materialization, see explanation below).
       * *%BLUE% throttle: %ENDCOLOR%* The maximum coordinator actions are 
allowed to be in WAITING state concurrently. The default value is =12=.
    * *%BLUE% datasets: %ENDCOLOR%* The datasets coordinator application uses.
    * *%BLUE% input-events: %ENDCOLOR%* The coordinator job input events.
@@ -1008,6 +1012,15 @@ always want the latest action.  For example, if you have 
a coordinator running e
 Oozie comes back, there would normally be 6 actions =READY= to run.  However, 
with =LAST_ONLY=, only the current one will go
 to =SUBMITTED= and =RUNNING=; the others will go to SKIPPED.
 
+*NONE:* Similar to LAST_ONLY except all older materializations are skipped. 
When =NONE= is set, an action that is =WAITING=
+or =READY= will be =SKIPPED= when the current time is more than a certain 
configured number of minutes (tolerance) past the action's
+nominal time. By default, the threshold is 1 minute. For example, suppose 
action 1 and 2
+are both =WAITING=, the current time is 5:20pm, and both actions' nominal 
times are before 5:19pm. Both actions
+will become SKIPPED, assuming they don't transition to =SUBMITTED= (or a 
terminal state) before then.  Another way of thinking about
+this is to view it as similar to setting the =timeout= equal to 1 minute which 
is the smallest time unit, except that the =SKIPPED= status doesn't cause the
+coordinator job to eventually become =DONEWITHERROR= and can actually become 
=SUCCEEDED= (i.e. it's a "good" version
+of =TIMEDOUT=).
+
 *%PURPLE% Syntax: %ENDCOLOR%*
 
 <verbatim>

http://git-wip-us.apache.org/repos/asf/oozie/blob/85ee1a26/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 43a5aad..ba350e1 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1875 Add "NONE" to coordinator job execution_order (bzhang)
 OOZIE-1879 Workflow Rerun causes error depending on the order of forked nodes 
(rkanter)
 OOZIE-1659 oozie-site is missing email-action-0.2 schema (jagatsingh via 
rkanter)
 OOZIE-1492 Make sure HA works with HCat (ryota)

Reply via email to