Author: rohini
Date: Fri Oct 4 17:49:15 2013
New Revision: 1529239
URL: http://svn.apache.org/r1529239
Log:
OOZIE-1306 add flexibility to oozie coordinator job scheduling (bowenzhangusa
via rohini)
Added:
oozie/trunk/examples/src/main/apps/cron-schedule/
oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml
oozie/trunk/examples/src/main/apps/cron-schedule/job.properties
oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
Fri Oct 4 17:49:15 2013
@@ -19,11 +19,13 @@ package org.apache.oozie.command.coord;
import java.io.StringReader;
import java.net.URI;
-import java.util.Calendar;
-import java.util.Date;
+import java.text.ParseException;
+import java.util.TimeZone;
+import java.util.Map;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Date;
+import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
@@ -48,6 +50,9 @@ import org.apache.oozie.util.ELEvaluator
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.quartz.CronExpression;
+import org.apache.commons.lang.StringUtils;
+import org.apache.oozie.CoordinatorJobBean;
public class CoordCommandUtils {
public static int CURRENT = 0;
@@ -627,4 +632,78 @@ public class CoordCommandUtils {
return resolved.toString();
}
+ /**
+ * Get the next action time after a given time
+ *
+ * @param targetDate
+ * @param coordJob
+ * @return the next valid action time
+ */
+ public static Date getNextValidActionTimeForCronFrequency(Date targetDate,
CoordinatorJobBean coordJob) throws ParseException {
+
+ String freq = coordJob.getFrequency();
+ TimeZone tz = DateUtils.getOozieProcessingTimeZone();
+ String[] cronArray = freq.split(" ");
+ Date nextTime = null;
+
+ // Current CronExpression doesn't support operations
+ // where both date of months and day of weeks are specified.
+ // As a result, we need to split this scenario into two cases
+ // and return the earlier time
+ if (!cronArray[2].trim().equals("?") &&
!cronArray[4].trim().equals("?")) {
+
+ // When any one of day of month or day of week fields is a wildcard
+ // we need to replace the wildcard with "?"
+ if (cronArray[2].trim().equals("*") ||
cronArray[4].trim().equals("*")) {
+ if (cronArray[2].trim().equals("*")) {
+ cronArray[2] = "?";
+ }
+ else {
+ cronArray[4] = "?";
+ }
+ freq= StringUtils.join(cronArray, " ");
+
+ // The cronExpression class takes second
+ // as the first field where oozie is operating on
+ // minute basis
+ CronExpression expr = new CronExpression("0 " + freq);
+ expr.setTimeZone(tz);
+ nextTime = expr.getNextValidTimeAfter(targetDate);
+ }
+ // If both fields are specified by non-wildcards,
+ // we need to split it into two expressions
+ else {
+ String[] cronArray1 = freq.split(" ");
+ String[] cronArray2 = freq.split(" ");
+
+ cronArray1[2] = "?";
+ cronArray2[4] = "?";
+
+ String freq1 = StringUtils.join(cronArray1, " ");
+ String freq2 = StringUtils.join(cronArray2, " ");
+
+ // The cronExpression class takes second
+ // as the first field where oozie is operating on
+ // minute basis
+ CronExpression expr1 = new CronExpression("0 " + freq1);
+ expr1.setTimeZone(tz);
+ CronExpression expr2 = new CronExpression("0 " + freq2);
+ expr2.setTimeZone(tz);
+ nextTime = expr1.getNextValidTimeAfter(targetDate);
+ Date nextTime2 = expr2.getNextValidTimeAfter(targetDate);
+ nextTime = nextTime.compareTo(nextTime2) < 0 ? nextTime:
nextTime2;
+ }
+ }
+ else {
+ // The cronExpression class takes second
+ // as the first field where oozie is operating on
+ // minute basis
+ CronExpression expr = new CronExpression("0 " + freq);
+ expr.setTimeZone(tz);
+ nextTime = expr.getNextValidTimeAfter(targetDate);
+ }
+
+ return nextTime;
+ }
+
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
Fri Oct 4 17:49:15 2013
@@ -309,7 +309,8 @@ public class CoordMaterializeTransitionX
String jobXml = coordJob.getJobXml();
Element eJob = XmlUtils.parseXml(jobXml);
TimeZone appTz = DateUtils.getTimeZone(coordJob.getTimeZone());
- int frequency = Integer.valueOf(coordJob.getFrequency());
+
+ String frequency = coordJob.getFrequency();
TimeUnit freqTU =
TimeUnit.valueOf(eJob.getAttributeValue("freq_timeunit"));
TimeUnit endOfFlag =
TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
Calendar start = Calendar.getInstance(appTz);
@@ -326,11 +327,8 @@ public class CoordMaterializeTransitionX
origStart.setTime(coordJob.getStartTimestamp());
// Move to the End of duration, if needed.
DateUtils.moveToEnd(origStart, endOfFlag);
- // Cloning the start time to be used in loop iteration
- Calendar effStart = (Calendar) origStart.clone();
- // Move the time when the previous action finished
- effStart.add(freqTU.getCalendarUnit(), lastActionNumber * frequency);
+ Date effStart = (Date) startMatdTime.clone();
StringBuilder actionStrings = new StringBuilder();
Date jobPauseTime = coordJob.getPauseTime();
Calendar pause = null;
@@ -346,35 +344,69 @@ public class CoordMaterializeTransitionX
LOG.debug("Coordinator job :" + coordJob.getId() + ",
maxActionToBeCreated :" + maxActionToBeCreated
+ ", Mat_Throttle :" + coordJob.getMatThrottling() + ",
numWaitingActions :" + numWaitingActions);
- while (effStart.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
- if (pause != null && effStart.compareTo(pause) >= 0) {
+ boolean isCronFrequency = false;
+
+ try {
+ Integer.parseInt(coordJob.getFrequency());
+ } catch (NumberFormatException e) {
+ isCronFrequency = true;
+ }
+
+ boolean firstMater = true;
+ while (start.compareTo(end) < 0 && maxActionToBeCreated-- > 0) {
+ if (pause != null && start.compareTo(pause) >= 0) {
break;
}
- CoordinatorActionBean actionBean = new CoordinatorActionBean();
- lastActionNumber++;
- int timeout = coordJob.getTimeout();
- LOG.debug("Materializing action for time=" + effStart.getTime() +
", lastactionnumber=" + lastActionNumber
- + " timeout=" + timeout + " minutes");
- Date actualTime = new Date();
- action = CoordCommandUtils.materializeOneInstance(jobId, dryrun,
(Element) eJob.clone(),
- effStart.getTime(), actualTime, lastActionNumber, jobConf,
actionBean);
- actionBean.setTimeOut(timeout);
+ Date nextTime = start.getTime();
- if (!dryrun) {
- storeToDB(actionBean, action); // Storing to table
+ if (isCronFrequency) {
+ if (start.getTime().compareTo(startMatdTime) == 0 &&
firstMater) {
+ start.add(Calendar.MINUTE, -1);
+ firstMater = false;
+ }
+ nextTime =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(),
coordJob);
+ start.setTime(nextTime);
+ }
+
+ if (start.compareTo(end) < 0) {
+
+ if (pause != null && start.compareTo(pause) >= 0) {
+ break;
+ }
+ CoordinatorActionBean actionBean = new CoordinatorActionBean();
+ lastActionNumber++;
+
+ int timeout = coordJob.getTimeout();
+ LOG.debug("Materializing action for time=" + start.getTime() +
", lastactionnumber=" + lastActionNumber
+ + " timeout=" + timeout + " minutes");
+ Date actualTime = new Date();
+ action = CoordCommandUtils.materializeOneInstance(jobId,
dryrun, (Element) eJob.clone(),
+ nextTime, actualTime, lastActionNumber, jobConf,
actionBean);
+ actionBean.setTimeOut(timeout);
+
+ if (!dryrun) {
+ storeToDB(actionBean, action); // Storing to table
+
+ }
+ else {
+ actionStrings.append("action for new instance");
+ actionStrings.append(action);
+ }
}
else {
- actionStrings.append("action for new instance");
- actionStrings.append(action);
+ break;
+ }
+
+ if (!isCronFrequency) {
+ start = (Calendar) origStart.clone();
+ start.add(freqTU.getCalendarUnit(), lastActionNumber *
Integer.parseInt(coordJob.getFrequency()));
}
- // Restore the original start time
- effStart = (Calendar) origStart.clone();
- effStart.add(freqTU.getCalendarUnit(), lastActionNumber *
frequency);
}
- endMatdTime = new Date(effStart.getTimeInMillis());
+ endMatdTime = start.getTime();
+
if (!dryrun) {
return action;
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/service/StatusTransitService.java
Fri Oct 4 17:49:15 2013
@@ -712,9 +712,8 @@ public class StatusTransitService implem
List<String> coordJobIdList = jpaService
.execute(new
CoordActionsGetByLastModifiedTimeJPAExecutor(lastInstanceStartTime));
Set<String> coordIds = new HashSet<String>();
- for (String coordJobId : coordJobIdList) {
- coordIds.add(coordJobId);
- }
+ coordIds.addAll(coordJobIdList);
+
pendingJobCheckList = new ArrayList<CoordinatorJobBean>();
for (String coordId : coordIds.toArray(new
String[coordIds.size()])) {
CoordinatorJobBean coordJob;
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordCommandUtils.java
Fri Oct 4 17:49:15 2013
@@ -22,11 +22,15 @@ import java.io.StringReader;
import java.text.ParseException;
import java.util.Date;
import java.util.List;
+import java.util.TimeZone;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
@@ -246,6 +250,79 @@ public class TestCoordCommandUtils exten
}
}
+ public void testGetNextValidActionTime() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
"10,20 * * * *");
+ Date actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:15Z");
+ Date expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+ Date retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "10/20 * * 5-7 4,5");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:15Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:30Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "20-30 * 20 5-7 4,5");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-18T00:21Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "30 * 20 5-7 ?");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-20T00:30Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "0 9-16 * * 2-6");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-20T00:20Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T09:00Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(retDate, job);
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T10:00Z");
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "20-30 * * 1 *");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2014-01-01T00:20Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+
+ job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING,
startTime, endTime, "20-30 10 * * MON,WED");
+ actionTime = DateUtils.parseDateOozieTZ("2013-07-18T00:20Z");
+ expectedDate = DateUtils.parseDateOozieTZ("2013-07-22T10:20Z");
+ retDate =
CoordCommandUtils.getNextValidActionTimeForCronFrequency(actionTime, job);
+ assertEquals(expectedDate, retDate);
+ }
+
+ protected CoordinatorJobBean
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date
endTime,
+ String freq) throws
Exception {
+ CoordinatorJobBean coordJob = createCoordJob(status, startTime,
endTime, false, false, 0);
+ coordJob.setStartTime(startTime);
+ coordJob.setEndTime(endTime);
+ coordJob.setFrequency(freq);
+ coordJob.setTimeUnit(CoordinatorJob.Timeunit.MINUTE);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
+ jpaService.execute(coordInsertCmd);
+ }
+ catch (JPAExecutorException ex) {
+ ex.printStackTrace();
+ fail("Unable to insert the test coord job record to table");
+ throw ex;
+ }
+
+ return coordJob;
+ }
+
private String getPullMissingDependencies(String testDir) {
String missDeps = getTestCaseFileUri("2009/29/_SUCCESS") + "#"
+ getTestCaseFileUri("2009/22/_SUCCESS") + "#"
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
Fri Oct 4 17:49:15 2013
@@ -18,6 +18,7 @@
package org.apache.oozie.command.coord;
import java.sql.Timestamp;
+import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -35,6 +36,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordJobGetRunningActionsCountJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.executor.jpa.CoordJobGetActionsSubsetJPAExecutor;
import org.apache.oozie.executor.jpa.SLAEventsGetForSeqIdJPAExecutor;
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
@@ -115,22 +117,248 @@ public class TestCoordMaterializeTransit
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
}
+ public void testActionMaterWithCronFrequency1() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "10,20 * * * *");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:20Z")};
+ checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 2);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency2() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "10-20 * * * *");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:10Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:11Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:12Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:13Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:14Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:16Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:17Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:18Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:19Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),};
+ checkCoordActionsNominalTime(job.getId(), 11, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 11);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:10Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency3() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "0/15 2 * 5-7 4,5");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ checkCoordActionsNominalTime(job.getId(), 0, new Date[]{});
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 0);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T02:00Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency4() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "0/15 * * 5-7 4,5");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:15Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:30Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:45Z"),};
+ checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 4);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency5() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "20/15 * * 5-7 4,5");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
+ checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 3);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency6() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "20");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:00Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:40Z"),};
+ checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 3);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:00Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithCronFrequency7() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-07-18T00:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-07-18T01:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "20/15 * * 7,10 THU");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-07-18T00:20Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:35Z"),
+ DateUtils.parseDateOozieTZ("2013-07-18T00:50Z"),};
+ checkCoordActionsNominalTime(job.getId(), 3, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 3);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-07-18T01:20Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithDST1() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2013-03-10T08:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2013-03-10T12:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "0 * * * *");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600*4).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2013-03-10T08:00Z"),
+ DateUtils.parseDateOozieTZ("2013-03-10T09:00Z"),
+ DateUtils.parseDateOozieTZ("2013-03-10T10:00Z"),
+ DateUtils.parseDateOozieTZ("2013-03-10T11:00Z"),};
+ checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 4);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2013-03-10T12:00Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
+
+ public void testActionMaterWithDST2() throws Exception {
+ Date startTime = DateUtils.parseDateOozieTZ("2012-11-04T07:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2012-11-04T11:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
null,
+ "0 * * * *");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600*4).call();
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2012-11-04T07:00Z"),
+ DateUtils.parseDateOozieTZ("2012-11-04T08:00Z"),
+ DateUtils.parseDateOozieTZ("2012-11-04T09:00Z"),
+ DateUtils.parseDateOozieTZ("2012-11-04T10:00Z"),};
+ checkCoordActionsNominalTime(job.getId(), 4, nominalTimes);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
+ assertTrue(job.isDoneMaterialization());
+ assertEquals(job.getLastActionNumber(), 4);
+ assertEquals(job.getNextMaterializedTime(),
DateUtils.parseDateOozieTZ("2012-11-04T11:00Z"));
+ }
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + job.getId() + " was not stored properly in db");
+ }
+ }
public void testActionMaterWithPauseTime1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:04Z");
- CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime);
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime, "5");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- checkCoordActions(job.getId(), 1, null);
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2009-03-06T10:00Z")};
+ checkCoordActionsNominalTime(job.getId(), 1, nominalTimes);
}
public void testActionMaterWithPauseTime2() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T10:08Z");
- CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime);
+ CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime, "5");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- checkCoordActions(job.getId(), 2, null);
+ Date[] nominalTimes = new Date[]
{DateUtils.parseDateOozieTZ("2009-03-06T10:00Z"),
+ DateUtils.parseDateOozieTZ("2009-03-06T10:05Z")};
+ checkCoordActionsNominalTime(job.getId(), 2, nominalTimes);
}
/**
@@ -142,7 +370,7 @@ public class TestCoordMaterializeTransit
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = DateUtils.parseDateOozieTZ("2009-03-06T09:58Z");
- final CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime);
+ final CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
pauseTime, "5");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
waitFor(1000*60, new Predicate() {
public boolean evaluate() throws Exception {
@@ -157,7 +385,7 @@ public class TestCoordMaterializeTransit
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
Date pauseTime = null;
CoordinatorJobBean job =
addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime,
- pauseTime, 300);
+ pauseTime, 300, "5");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
checkCoordActionsTimeout(job.getId() + "@1", 300);
}
@@ -247,21 +475,21 @@ public class TestCoordMaterializeTransit
}
protected CoordinatorJobBean
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date
endTime,
- Date pauseTime) throws Exception {
- return addRecordToCoordJobTable(status, startTime, endTime, pauseTime,
-1);
+ Date pauseTime, String freq) throws Exception {
+ return addRecordToCoordJobTable(status, startTime, endTime, pauseTime,
-1, freq);
}
protected CoordinatorJobBean
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date
endTime,
- Date pauseTime, int timeout) throws Exception {
+ Date pauseTime, int timeout, String freq) throws Exception {
CoordinatorJobBean coordJob = createCoordJob(status, startTime,
endTime, false, false, 0);
coordJob.setStartTime(startTime);
coordJob.setEndTime(endTime);
coordJob.setPauseTime(pauseTime);
- coordJob.setFrequency("5");
+ coordJob.setFrequency(freq);
coordJob.setTimeUnit(Timeunit.MINUTE);
coordJob.setTimeout(timeout);
coordJob.setConcurrency(3);
- coordJob.setMatThrottling(3);
+ coordJob.setMatThrottling(20);
try {
JPAService jpaService = Services.get().get(JPAService.class);
@@ -359,6 +587,27 @@ public class TestCoordMaterializeTransit
}
}
+ private void checkCoordActionsNominalTime(String jobId, int number, Date[]
nominalTimes) {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ List<CoordinatorActionBean> actions = jpaService.execute(new
CoordJobGetActionsSubsetJPAExecutor(
+ jobId, new ArrayList<String>(), 1, 1000, false));
+
+ if (actions.size() != number) {
+ fail("Should have " + number + " actions created for job " +
jobId + ", but has " + actions.size() + " actions.");
+ }
+
+ for (int i=0; i < nominalTimes.length; i++ ) {
+ assertEquals(nominalTimes[i], actions.get(i).getNominalTime());
+ }
+ }
+
+ catch (JPAExecutorException se) {
+ se.printStackTrace();
+ fail("Job ID " + jobId + " was not stored properly in db");
+ }
+ }
+
private void checkCoordActionsTimeout(String actionId, int expected) {
try {
JPAService jpaService = Services.get().get(JPAService.class);
Added: oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml
URL:
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/coordinator.xml Fri Oct 4
17:49:15 2013
@@ -0,0 +1,39 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<coordinator-app name="cron-coord" frequency="0/10 * * * *" start="${start}"
end="${end}" timezone="UTC"
+ xmlns="uri:oozie:coordinator:0.2">
+ <action>
+ <workflow>
+ <app-path>${workflowAppUri}</app-path>
+ <configuration>
+ <property>
+ <name>jobTracker</name>
+ <value>${jobTracker}</value>
+ </property>
+ <property>
+ <name>nameNode</name>
+ <value>${nameNode}</value>
+ </property>
+ <property>
+ <name>queueName</name>
+ <value>${queueName}</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
Added: oozie/trunk/examples/src/main/apps/cron-schedule/job.properties
URL:
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/job.properties?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/job.properties (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/job.properties Fri Oct 4
17:49:15 2013
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+nameNode=hdfs://localhost:8020
+jobTracker=localhost:8021
+queueName=default
+examplesRoot=examples
+
+oozie.coord.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron-schedule
+start=2010-01-01T00:00Z
+end=2010-01-01T01:00Z
+workflowAppUri=${nameNode}/user/${user.name}/${examplesRoot}/apps/cron-schedule
+
Added: oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml
URL:
http://svn.apache.org/viewvc/oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml?rev=1529239&view=auto
==============================================================================
--- oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml (added)
+++ oozie/trunk/examples/src/main/apps/cron-schedule/workflow.xml Fri Oct 4
17:49:15 2013
@@ -0,0 +1,21 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<workflow-app xmlns="uri:oozie:workflow:0.2" name="no-op-wf">
+ <start to="end"/>
+ <end name="end"/>
+</workflow-app>
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1529239&r1=1529238&r2=1529239&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Fri Oct 4 17:49:15 2013
@@ -1,5 +1,6 @@
-- Oozie 4.1.0 release (trunk - unreleased)
+OOZIE-1306 add flexibility to oozie coordinator job scheduling (bowenzhangusa
via rohini)
OOZIE-1526 Oozie does not work with a secure HA JobTracker or ResourceManager
(rkanter)
OOZIE-1500 Fix many OS-specific issues on Windows (dwann via rohini)
OOZIE-1556 Change Bundle SELECT query to fetch only necessary columns and
consolidate JPA Executors (ryota)