Author: rohini
Date: Fri Oct  4 17:49:15 2013
New Revision: 1529239

URL: http://svn.apache.org/r1529239
Log:
OOZIE-1306 add flexibility to oozie coordinator job scheduling (bowenzhangusa 
via rohini)

Added:
    oozie/trunk/examples/src/main/apps/cron-schedule/
    oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml
    oozie/trunk/examples/src/main/apps/cron-schedule/job.properties
    oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml
Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 Fri Oct  4 17:49:15 2013
@@ -19,11 +19,13 @@ package org.apache.oozie.command.coord;
 
 import java.io.StringReader;
 import java.net.URI;
-import java.util.Calendar;
-import java.util.Date;
+import java.text.ParseException;
+import java.util.TimeZone;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
+import java.util.Date;
+import java.util.Calendar;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
@@ -48,6 +50,9 @@ import org.apache.oozie.util.ELEvaluator
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
+import org.quartz.CronExpression;
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.CoordinatorJobBean;
 
 public class CoordCommandUtils {
     public static int CURRENT = 0;
@@ -627,4 +632,78 @@ public class CoordCommandUtils {
         return resolved.toString();
     }
 
+    /**
+     * Get the next action time after a given time
+     *
+     * @param targetDate
+     * @param coordJob
+     * @return the next valid action time
+     */
+    public static Date getNextValidActionTimeForCronFrequency(Date targetDate, 
CoordinatorJobBean coordJob) throws ParseException {
+
+        String freq = coordJob.getFrequency();
+        TimeZone tz = DateUtils.getOozieProcessingTimeZone();
+        String[] cronArray = freq.split(" ");
+        Date nextTime = null;
+
+        // Current CronExpression doesn't support operations
+        // where both date of months and day of weeks are specified.
+        // As a result, we need to split this scenario into two cases
+        // and return the earlier time
+        if (!cronArray[2].trim().equals("?") && 
!cronArray[4].trim().equals("?")) {
+
+            // When any one of day of month or day of week fields is a wildcard
+            // we need to replace the wildcard with "?"
+            if (cronArray[2].trim().equals("*") || 
cronArray[4].trim().equals("*")) {
+                if (cronArray[2].trim().equals("*")) {
+                    cronArray[2] = "?";
+                }
+                else {
+                    cronArray[4] = "?";
+                }
+                freq= StringUtils.join(cronArray, " ");
+
+                // The cronExpression class takes second
+                // as the first field where oozie is operating on
+                // minute basis
+                CronExpression expr = new CronExpression("0 " + freq);
+                expr.setTimeZone(tz);
+                nextTime = expr.getNextValidTimeAfter(targetDate);
+            }
+            // If both fields are specified by non-wildcards,
+            // we need to split it into two expressions
+            else {
+                String[] cronArray1 = freq.split(" ");
+                String[] cronArray2 = freq.split(" ");
+
+                cronArray1[2] = "?";
+                cronArray2[4] = "?";
+
+                String freq1 = StringUtils.join(cronArray1, " ");
+                String freq2 = StringUtils.join(cronArray2, " ");
+
+                // The cronExpression class takes second
+                // as the first field where oozie is operating on
+                // minute basis
+                CronExpression expr1 = new CronExpression("0 " + freq1);
+                expr1.setTimeZone(tz);
+                CronExpression expr2 = new CronExpression("0 " + freq2);
+                expr2.setTimeZone(tz);
+                nextTime = expr1.getNextValidTimeAfter(targetDate);
+                Date nextTime2 = expr2.getNextValidTimeAfter(targetDate);
+                nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime: 
nextTime2;
+            }
+        }
+        else {
+            // The cronExpression class takes second
+            // as the first field where oozie is operating on
+            // minute basis
+            CronExpression expr  = new CronExpression("0 " + freq);
+            expr.setTimeZone(tz);
+            nextTime = expr.getNextValidTimeAfter(targetDate);
+        }
+
+        return nextTime;
+    }
+
 }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 Fri Oct  4 17:49:15 2013
@@ -309,7 +309,8 @@ public class CoordMaterializeTransitionX
         String jobXml = coordJob.getJobXml();
         Element eJob = XmlUtils.parseXml(jobXml);
         TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
-        int frequency = Integer.valueOf(coordJob.getFrequency());
+
+        String frequency = coordJob.getFrequency();
         TimeUnit freqTU = 
TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
         TimeUnit endOfFlag = 
TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
         Calendar start = Calendar.getInstance(appTz);
@@ -326,11 +327,8 @@ public class CoordMaterializeTransitionX
         origStart.setTime(coordJob.getStartTimestamp());
         // Move to the End of duration, if needed.
         DateUtils.moveToEnd(origStart, endOfFlag);
-        // Cloning the start time to be used in loop iteration
-        Calendar effStart = (Calendar) origStart.clone();
-        // Move the time when the previous action finished
-        effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
 
+        Date effStart = (Date) startMatdTime.clone();
         StringBuilder actionStrings = new StringBuilder();
         Date jobPauseTime = coordJob.getPauseTime();
         Calendar pause = null;
@@ -346,35 +344,69 @@ public class CoordMaterializeTransitionX
         LOG.debug("Coordinator job :" + coordJob.getId() + ", 
maxActionToBeCreated :" + maxActionToBeCreated
                 + ", Mat_Throttle :" + coordJob.getMatThrottling() + ", 
numWaitingActions :" + numWaitingActions);
 
-        while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
-            if (pause != null && effStart.compareTo(pause) >= 0) {
+        boolean isCronFrequency = false;
+
+        try {
+            Integer.parseInt(coordJob.getFrequency());
+        } catch (NumberFormatException e) {
+            isCronFrequency = true;
+        }
+
+        boolean firstMater = true;
+        while (start.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
+            if (pause != null && start.compareTo(pause) >= 0) {
                 break;
             }
-            CoordinatorActionBean actionBean = new CoordinatorActionBean();
-            lastActionNumber++;
 
-            int timeout = coordJob.getTimeout();
-            LOG.debug("Materializing action for time=" + effStart.getTime() + 
", lastactionnumber=" + lastActionNumber
-                    + " timeout=" + timeout + " minutes");
-            Date actualTime = new Date();
-            action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, 
(Element) eJob.clone(),
-                    effStart.getTime(), actualTime, lastActionNumber, jobConf, 
actionBean);
-            actionBean.setTimeOut(timeout);
+            Date nextTime = start.getTime();
 
-            if (!dryrun) {
-                storeToDB(actionBean, action); // Storing to table
+            if (isCronFrequency) {
+                if (start.getTime().compareTo(startMatdTime) == 0 && 
firstMater) {
+                    start.add(Calendar.MINUTE, -1);
+                    firstMater = false;
+                }
 
+                nextTime = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), 
coordJob);
+                start.setTime(nextTime);
+            }
+
+            if (start.compareTo(end) < 0) {
+
+                if (pause != null && start.compareTo(pause) >= 0) {
+                    break;
+                }
+                CoordinatorActionBean actionBean = new CoordinatorActionBean();
+                lastActionNumber++;
+
+                int timeout = coordJob.getTimeout();
+                LOG.debug("Materializing action for time=" + start.getTime() + 
", lastactionnumber=" + lastActionNumber
+                        + " timeout=" + timeout + " minutes");
+                Date actualTime = new Date();
+                action = CoordCommandUtils.materializeOneInstance(jobId, 
dryrun, (Element) eJob.clone(),
+                        nextTime, actualTime, lastActionNumber, jobConf, 
actionBean);
+                actionBean.setTimeOut(timeout);
+
+                if (!dryrun) {
+                    storeToDB(actionBean, action); // Storing to table
+
+                }
+                else {
+                    actionStrings.append("action for new instance");
+                    actionStrings.append(action);
+                }
             }
             else {
-                actionStrings.append("action for new instance");
-                actionStrings.append(action);
+                break;
+            }
+
+            if (!isCronFrequency) {
+                start = (Calendar) origStart.clone();
+                start.add(freqTU.getCalendarUnit(), lastActionNumber * 
Integer.parseInt(coordJob.getFrequency()));
             }
-            // Restore the original start time
-            effStart = (Calendar) origStart.clone();
-            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * 
frequency);
         }
 
-        endMatdTime = new Date(effStart.getTimeInMillis());
+        endMatdTime = start.getTime();
+
         if (!dryrun) {
             return action;
         }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
 Fri Oct  4 17:49:15 2013
@@ -712,9 +712,8 @@ public class StatusTransitService implem
                 List<String> coordJobIdList = jpaService
                         .execute(new 
CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
                 Set<String> coordIds = new HashSet<String>();
-                for (String coordJobId : coordJobIdList) {
-                    coordIds.add(coordJobId);
-                }
+                coordIds.addAll(coordJobIdList);
+
                 pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
                 for (String coordId : coordIds.toArray(new 
String[coordIds.size()])) {
                     CoordinatorJobBean coordJob;

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
 Fri Oct  4 17:49:15 2013
@@ -22,11 +22,15 @@ import java.io.StringReader;
 import java.text.ParseException;
 import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.test.XDataTestCase;
@@ -246,6 +250,79 @@ public class TestCoordCommandUtils exten
         }
     }
 
+    public void testGetNextValidActionTime() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
"10,20 * * * *");
+        Date actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:15Z");
+        Date expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+        Date retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "10/20 * * 5-7 4,5");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:15Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:30Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "20-30 * 20 5-7 4,5");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:21Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "30 * 20 5-7 ?");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-20T00:30Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "0 9-16 * * 2-6");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-20T00:20Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T09:00Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(retDate, job);
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T10:00Z");
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "20-30 * * 1 *");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2014-01-01T00:20Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+
+        job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, 
startTime, endTime, "20-30 10 * * MON,WED");
+        actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+        expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T10:20Z");
+        retDate = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+        assertEquals(expectedDate, retDate);
+    }
+
+    protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date 
endTime,
+                                                          String freq) throws 
Exception {
+        CoordinatorJobBean coordJob = createCoordJob(status, startTime, 
endTime, false, false, 0);
+        coordJob.setStartTime(startTime);
+        coordJob.setEndTime(endTime);
+        coordJob.setFrequency(freq);
+        coordJob.setTimeUnit(CoordinatorJob.Timeunit.MINUTE);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
+            jpaService.execute(coordInsertCmd);
+        }
+        catch (JPAExecutorException ex) {
+            ex.printStackTrace();
+            fail("Unable to insert the test coord job record to table");
+            throw ex;
+        }
+
+        return coordJob;
+    }
+
     private String getPullMissingDependencies(String testDir) {
         String missDeps = getTestCaseFileUri("2009/29/_SUCCESS") + "#"
                 + getTestCaseFileUri("2009/22/_SUCCESS") + "#"

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 Fri Oct  4 17:49:15 2013
@@ -18,6 +18,7 @@
 package org.apache.oozie.command.coord;
 
 import java.sql.Timestamp;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 
@@ -35,6 +36,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
 import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.JPAService;
@@ -115,22 +117,248 @@ public class TestCoordMaterializeTransit
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
     }
 
+    public void testActionMaterWithCronFrequency1() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "10,20 * * * *");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
+        checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 2);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency2() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "10-20 * * * *");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:11Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:12Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:13Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:14Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:16Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:17Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:18Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:19Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),};
+        checkCoordActionsNominalTime(job.getId(), 11, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 11);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency3() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "0/15 2 * 5-7 4,5");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        checkCoordActionsNominalTime(job.getId(), 0, new Date[]{});
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 0);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T02:00Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency4() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "0/15 * * 5-7 4,5");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:45Z"),};
+        checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 4);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency5() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "20/15 * * 5-7 4,5");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
+        checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 3);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency6() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "20");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:40Z"),};
+        checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 3);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithCronFrequency7() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "20/15 * * 7,10 THU");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
+                DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
+        checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 3);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithDST1() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "0 * * * *");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600*4).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"),
+                DateUtils.parseDateOozieTZ("2013-03-10T09:00Z"),
+                DateUtils.parseDateOozieTZ("2013-03-10T10:00Z"),
+                DateUtils.parseDateOozieTZ("2013-03-10T11:00Z"),};
+        checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 4);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2013-03-10T12:00Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
+
+    public void testActionMaterWithDST2() throws Exception {
+        Date startTime = DateUtils.parseDateOozieTZ("2012-11-04T07:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2012-11-04T11:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
null,
+                "0 * * * *");
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600*4).call();
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2012-11-04T07:00Z"),
+                DateUtils.parseDateOozieTZ("2012-11-04T08:00Z"),
+                DateUtils.parseDateOozieTZ("2012-11-04T09:00Z"),
+                DateUtils.parseDateOozieTZ("2012-11-04T10:00Z"),};
+        checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            job =  jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+            assertTrue(job.isDoneMaterialization());
+            assertEquals(job.getLastActionNumber(), 4);
+            assertEquals(job.getNextMaterializedTime(), 
DateUtils.parseDateOozieTZ("2012-11-04T11:00Z"));
+        }
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + job.getId() + " was not stored properly in db");
+        }
+    }
     public void testActionMaterWithPauseTime1() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
-        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime);
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime, "5");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
-        checkCoordActions(job.getId(), 1, null);
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2009-03-06T10:00Z")};
+        checkCoordActionsNominalTime(job.getId(), 1, nominalTimes);
     }
 
     public void testActionMaterWithPauseTime2() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
-        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime);
+        CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime, "5");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
-        checkCoordActions(job.getId(), 2, null);
+        Date[] nominalTimes = new Date[] 
{DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"),
+                DateUtils.parseDateOozieTZ("2009-03-06T10:05Z")};
+        checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
     }
 
     /**
@@ -142,7 +370,7 @@ public class TestCoordMaterializeTransit
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
-        final CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime);
+        final CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, 
pauseTime, "5");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
         waitFor(1000*60, new Predicate() {
             public boolean evaluate() throws Exception {
@@ -157,7 +385,7 @@ public class TestCoordMaterializeTransit
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
         Date pauseTime = null;
         CoordinatorJobBean job = 
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
-                pauseTime, 300);
+                pauseTime, 300, "5");
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
         checkCoordActionsTimeout(job.getId() + "@1", 300);
     }
@@ -247,21 +475,21 @@ public class TestCoordMaterializeTransit
     }
 
     protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date 
endTime,
-            Date pauseTime) throws Exception {
-        return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, 
-1);
+            Date pauseTime, String freq) throws Exception {
+        return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, 
-1, freq);
     }
 
     protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date 
endTime,
-            Date pauseTime, int timeout) throws Exception {
+            Date pauseTime, int timeout, String freq) throws Exception {
         CoordinatorJobBean coordJob = createCoordJob(status, startTime, 
endTime, false, false, 0);
         coordJob.setStartTime(startTime);
         coordJob.setEndTime(endTime);
         coordJob.setPauseTime(pauseTime);
-        coordJob.setFrequency("5");
+        coordJob.setFrequency(freq);
         coordJob.setTimeUnit(Timeunit.MINUTE);
         coordJob.setTimeout(timeout);
         coordJob.setConcurrency(3);
-        coordJob.setMatThrottling(3);
+        coordJob.setMatThrottling(20);
 
         try {
             JPAService jpaService = Services.get().get(JPAService.class);
@@ -359,6 +587,27 @@ public class TestCoordMaterializeTransit
         }
     }
 
+    private void checkCoordActionsNominalTime(String jobId, int number, Date[] 
nominalTimes) {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            List<CoordinatorActionBean> actions = jpaService.execute(new 
CoordJobGetActionsSubsetJPAExecutor(
+                    jobId, new ArrayList<String>(), 1, 1000, false));
+
+            if (actions.size() != number) {
+                fail("Should have " + number + " actions created for job " + 
jobId + ", but has " + actions.size() + " actions.");
+            }
+
+            for (int i=0; i < nominalTimes.length; i++ ) {
+                assertEquals(nominalTimes[i], actions.get(i).getNominalTime());
+            }
+        }
+
+        catch (JPAExecutorException se) {
+            se.printStackTrace();
+            fail("Job ID " + jobId + " was not stored properly in db");
+        }
+    }
+
     private void checkCoordActionsTimeout(String actionId, int expected) {
         try {
             JPAService jpaService = Services.get().get(JPAService.class);

Added: oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml Fri Oct  4 
17:49:15 2013
@@ -0,0 +1,39 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<coordinator-app name="cron-coord" frequency="0/10 * * * *" start="${start}" 
end="${end}" timezone="UTC"
+                 xmlns="uri:oozie:coordinator:0.2">
+        <action>
+        <workflow>
+            <app-path>${workflowAppUri}</app-path>
+            <configuration>
+                <property>
+                    <name>jobTracker</name>
+                    <value>${jobTracker}</value>
+                </property>
+                <property>
+                    <name>nameNode</name>
+                    <value>${nameNode}</value>
+                </property>
+                <property>
+                    <name>queueName</name>
+                    <value>${queueName}</value>
+                </property>
+            </configuration>
+        </workflow>
+    </action>
+</coordinator-app>

Added: oozie/trunk/examples/src/main/apps/cron-schedule/job.properties
URL: 
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/job.properties?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/job.properties (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/job.properties Fri Oct  4 
17:49:15 2013
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+nameNode=hdfs://localhost:8020
+jobTracker=localhost:8021
+queueName=default
+examplesRoot=examples
+
+oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron-schedule
+start=2010-01-01T00:00Z
+end=2010-01-01T01:00Z
+workflowAppUri=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron-schedule
+

Added: oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml Fri Oct  4 
17:49:15 2013
@@ -0,0 +1,21 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf">
+    <start to="end"/>
+    <end name="end"/>
+</workflow-app>

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Oct  4 17:49:15 2013
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (trunk - unreleased)
 
+OOZIE-1306 add flexibility to oozie coordinator job scheduling (bowenzhangusa 
via rohini)
 OOZIE-1526 Oozie does not work with a secure HA JobTracker or ResourceManager 
(rkanter)
 OOZIE-1500 Fix many OS-specific issues on Windows (dwann via rohini)
 OOZIE-1556 Change Bundle SELECT query to fetch only necessary columns and 
consolidate JPA Executors (ryota)


Reply via email to