Repository: oozie Updated Branches: refs/heads/branch-4.1 9b9dbc4b9 -> d7514d577
OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d7514d57 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d7514d57 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d7514d57 Branch: refs/heads/branch-4.1 Commit: d7514d5773a94095ad2e2057243c740e637b741e Parents: 9b9dbc4 Author: Purshotam Shah <[email protected]> Authored: Thu Nov 13 15:55:38 2014 -0800 Committer: Purshotam Shah <[email protected]> Committed: Thu Nov 13 15:55:38 2014 -0800 ---------------------------------------------------------------------- .../CoordMaterializeTransitionXCommand.java | 42 ++++++------ .../TestCoordMaterializeTransitionXCommand.java | 69 +++++++++++++++++++- .../command/coord/TestCoordSubmitXCommand.java | 18 ----- .../org/apache/oozie/test/XDataTestCase.java | 19 ++++++ release-log.txt | 1 + 5 files changed, 111 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index 6c9d4ae..623c2d3 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -253,7 +253,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo endMatInstance = (Calendar) startInstance.clone(); endMatInstance.add(freqTU.getCalendarUnit(), i * frequency); if (endMatInstance.getTime().compareTo(new Date()) >= 0) { - if (previousInstance.after(currentMatTime)) { + if (previousInstance.getTime().after(currentMatTime)) { return previousInstance.getTime(); } else { @@ -424,41 +424,45 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo boolean isCronFrequency = false; + Calendar effStart = (Calendar) start.clone(); try { - Integer.parseInt(coordJob.getFrequency()); - } catch (NumberFormatException e) { + int intFrequency = Integer.parseInt(coordJob.getFrequency()); + effStart = (Calendar) origStart.clone(); + effStart.add(freqTU.getCalendarUnit(), lastActionNumber * intFrequency); + } + catch (NumberFormatException e) { isCronFrequency = true; } boolean firstMater = true; - while (start.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) { - if (pause != null && start.compareTo(pause) >= 0) { + while (effStart.compareTo(end) < 0 && (ignoreMaxActions || maxActionToBeCreated-- > 0)) { + if (pause != null && effStart.compareTo(pause) >= 0) { break; } - Date nextTime = start.getTime(); + Date nextTime = effStart.getTime(); if (isCronFrequency) { - if (start.getTime().compareTo(startMatdTime) == 0 && firstMater) { - start.add(Calendar.MINUTE, -1); + if (effStart.getTime().compareTo(startMatdTime) == 0 && firstMater) { + effStart.add(Calendar.MINUTE, -1); firstMater = false; } - nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), coordJob); - start.setTime(nextTime); + nextTime = CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob); + effStart.setTime(nextTime); } - if (start.compareTo(end) < 0) { + if (effStart.compareTo(end) < 0) { - if (pause != null && start.compareTo(pause) >= 0) { + if (pause != null && effStart.compareTo(pause) >= 0) { break; } CoordinatorActionBean actionBean = new CoordinatorActionBean(); lastActionNumber++; int timeout = coordJob.getTimeout(); - LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(start.getTime()) + ", lastactionnumber=" + lastActionNumber - + " timeout=" + timeout + " minutes"); + LOG.debug("Materializing action for time=" + DateUtils.formatDateOozieTZ(effStart.getTime()) + + ", lastactionnumber=" + lastActionNumber + " timeout=" + timeout + " minutes"); Date actualTime = new Date(); action = CoordCommandUtils.materializeOneInstance(jobId, dryrun, (Element) eJob.clone(), nextTime, actualTime, lastActionNumber, jobConf, actionBean); @@ -478,22 +482,22 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo } if (!isCronFrequency) { - start = (Calendar) origStart.clone(); - start.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency())); + effStart = (Calendar) origStart.clone(); + effStart.add(freqTU.getCalendarUnit(), lastActionNumber * Integer.parseInt(coordJob.getFrequency())); } } if (isCronFrequency) { - if (start.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) { + if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || maxActionToBeCreated-- > 0)) { //Since we exceed the throttle, we need to move the nextMadtime forward //to avoid creating duplicate actions if (!firstMater) { - start.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), coordJob)); + effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), coordJob)); } } } - endMatdTime = start.getTime(); + endMatdTime = effStart.getTime(); if (!dryrun) { return action; http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java index 4a852cf..4286bcb 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java @@ -17,19 +17,25 @@ */ package org.apache.oozie.command.coord; +import java.io.File; import java.sql.Timestamp; import java.util.Arrays; +import java.util.Calendar; import java.util.Date; import java.util.List; +import java.util.TimeZone; +import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.SLAEventBean; import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.CoordinatorJob.Timeunit; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.coord.TimeUnit; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor; @@ -46,6 +52,9 @@ import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; @SuppressWarnings("deprecation") public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { @@ -550,7 +559,7 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId())); - assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), job.getNextMaterializedTime()); + assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), job.getNextMaterializedTime()); // test with hours, time should not pass the current time. startTime = new Date(new Date().getTime()); @@ -627,6 +636,64 @@ public class TestCoordMaterializeTransitionXCommand extends XDataTestCase { Date pauseTime, int timeout, String freq) throws Exception { return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, timeout, freq, CoordinatorJob.Execution.FIFO); } + public void testMaterizationEndOfMonths() throws Exception { + Configuration conf = new XConfiguration(); + File appPathFile = new File(getTestCaseDir(), "coordinator.xml"); + String appXml = "<coordinator-app name=\"test\" frequency=\"${coord:endOfMonths(1)}\" start=\"2009-02-01T01:00Z\" " + + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" " + + "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> " + + "<execution>LIFO</execution> </controls> <datasets> " + + "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" " + + "timezone=\"UTC\"> <uri-template>" + + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}") + + "</uri-template> " + + "</dataset> " + + "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" " + + "timezone=\"UTC\"> <uri-template>" + + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}") + + "</uri-template> " + + " </dataset> " + + "</datasets> <input-events> " + + "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> " + + "</input-events> " + + "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> " + + "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> " + + "<app-path>hdfs:///tmp/workflows/</app-path> " + + "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> " + + "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> " + + "</property></configuration> </workflow> </action> </coordinator-app>"; + writeToFile(appXml, appPathFile); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + CoordSubmitXCommand sc = new CoordSubmitXCommand(conf); + String jobId = sc.call(); + + Date currentTime = new Date(); + Date startTime = org.apache.commons.lang.time.DateUtils.addMonths(currentTime, -3); + Date endTime = org.apache.commons.lang.time.DateUtils.addMonths(currentTime, 3); + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId); + assertEquals(job.getLastActionNumber(), 0); + + job.setStartTime(startTime); + job.setEndTime(endTime); + job.setMatThrottling(10); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(job.getId(), 3600).call(); + job = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, job.getId()); + assertEquals(job.getLastActionNumber(), 3); + + String jobXml = job.getJobXml(); + Element eJob = XmlUtils.parseXml(jobXml); + TimeZone appTz = DateUtils.getTimeZone(job.getTimeZone()); + TimeUnit endOfFlag = TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration")); + TimeUnit freqTU = TimeUnit.valueOf(job.getTimeUnitStr()); + Calendar origStart = Calendar.getInstance(appTz); + origStart.setTime(job.getStartTimestamp()); + // Move to the End of duration, if needed. + DateUtils.moveToEnd(origStart, endOfFlag); + origStart.add(freqTU.getCalendarUnit(), 3 * Integer.parseInt(job.getFrequency())); + assertEquals(job.getNextMaterializedTime(), origStart.getTime()); + } protected CoordinatorJobBean addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date endTime, Date pauseTime, int timeout, String freq, CoordinatorJob.Execution execution) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java index fedf4a8..82c9e99 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java @@ -19,8 +19,6 @@ package org.apache.oozie.command.coord; import java.io.File; import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; import java.io.Reader; import java.io.Writer; import java.net.URI; @@ -1219,22 +1217,6 @@ public class TestCoordSubmitXCommand extends XDataTestCase { return null; } - private void writeToFile(String appXml, File appPathFile) throws Exception { - PrintWriter out = null; - try { - out = new PrintWriter(new FileWriter(appPathFile)); - out.println(appXml); - } - catch (IOException iex) { - throw iex; - } - finally { - if (out != null) { - out.close(); - } - } - } - /** * Test timeout setting * http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index 7614f03..b72c641 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -18,9 +18,12 @@ package org.apache.oozie.test; import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.io.Reader; import java.io.UnsupportedEncodingException; import java.io.Writer; @@ -1638,4 +1641,20 @@ public abstract class XDataTestCase extends XHCatTestCase { CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord); } + protected void writeToFile(String appXml, File appPathFile) throws Exception { + PrintWriter out = null; + try { + out = new PrintWriter(new FileWriter(appPathFile)); + out.println(appXml); + } + catch (IOException iex) { + throw iex; + } + finally { + if (out != null) { + out.close(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 65d454c..a50a3f5 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (4.1 - unreleased) +OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize (puru) OOZIE-2063 Cron syntax creates duplicate actions (bzhang) OOZIE-2032 If using SSL, the port reported by Oozie is incorrect for HA tasks (rkanter) OOZIE-1959 TestZKUtilsWithSecurity fails (rkanter)
