Repository: oozie
Updated Branches:
  refs/heads/branch-4.1 9b9dbc4b9 -> d7514d577


OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d7514d57
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d7514d57
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d7514d57

Branch: refs/heads/branch-4.1
Commit: d7514d5773a94095ad2e2057243c740e637b741e
Parents: 9b9dbc4
Author: Purshotam Shah <[email protected]>
Authored: Thu Nov 13 15:55:38 2014 -0800
Committer: Purshotam Shah <[email protected]>
Committed: Thu Nov 13 15:55:38 2014 -0800

----------------------------------------------------------------------
 .../CoordMaterializeTransitionXCommand.java     | 42 ++++++------
 .../TestCoordMaterializeTransitionXCommand.java | 69 +++++++++++++++++++-
 .../command/coord/TestCoordSubmitXCommand.java  | 18 -----
 .../org/apache/oozie/test/XDataTestCase.java    | 19 ++++++
 release-log.txt                                 |  1 +
 5 files changed, 111 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
index 6c9d4ae..623c2d3 100644
--- 
a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
+++ 
b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
@@ -253,7 +253,7 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
             endMatInstance = (Calendar) startInstance.clone();
             endMatInstance.add(freqTU.getCalendarUnit(), i * frequency);
             if (endMatInstance.getTime().compareTo(new Date()) >= 0) {
-                if (previousInstance.after(currentMatTime)) {
+                if (previousInstance.getTime().after(currentMatTime)) {
                     return previousInstance.getTime();
                 }
                 else {
@@ -424,41 +424,45 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
 
         boolean isCronFrequency = false;
 
+        Calendar effStart = (Calendar) start.clone();
         try {
-            Integer.parseInt(coordJob.getFrequency());
-        } catch (NumberFormatException e) {
+            int intFrequency = Integer.parseInt(coordJob.getFrequency());
+            effStart = (Calendar) origStart.clone();
+            effStart.add(freqTU.getCalendarUnit(), lastActionNumber * 
intFrequency);
+        }
+        catch (NumberFormatException e) {
             isCronFrequency = true;
         }
 
         boolean firstMater = true;
-        while (start.compareTo(end) < 0 && (ignoreMaxActions || 
maxActionToBeCreated-- > 0)) {
-            if (pause != null && start.compareTo(pause) >= 0) {
+        while (effStart.compareTo(end) < 0 && (ignoreMaxActions || 
maxActionToBeCreated-- > 0)) {
+            if (pause != null && effStart.compareTo(pause) >= 0) {
                 break;
             }
 
-            Date nextTime = start.getTime();
+            Date nextTime = effStart.getTime();
 
             if (isCronFrequency) {
-                if (start.getTime().compareTo(startMatdTime) == 0 && 
firstMater) {
-                    start.add(Calendar.MINUTE, -1);
+                if (effStart.getTime().compareTo(startMatdTime) == 0 && 
firstMater) {
+                    effStart.add(Calendar.MINUTE, -1);
                     firstMater = false;
                 }
 
-                nextTime = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(), 
coordJob);
-                start.setTime(nextTime);
+                nextTime = 
CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(), 
coordJob);
+                effStart.setTime(nextTime);
             }
 
-            if (start.compareTo(end) < 0) {
+            if (effStart.compareTo(end) < 0) {
 
-                if (pause != null && start.compareTo(pause) >= 0) {
+                if (pause != null && effStart.compareTo(pause) >= 0) {
                     break;
                 }
                 CoordinatorActionBean actionBean = new CoordinatorActionBean();
                 lastActionNumber++;
 
                 int timeout = coordJob.getTimeout();
-                LOG.debug("Materializing action for time=" + 
DateUtils.formatDateOozieTZ(start.getTime()) + ", lastactionnumber=" + 
lastActionNumber
-                        + " timeout=" + timeout + " minutes");
+                LOG.debug("Materializing action for time=" + 
DateUtils.formatDateOozieTZ(effStart.getTime())
+                        + ", lastactionnumber=" + lastActionNumber + " 
timeout=" + timeout + " minutes");
                 Date actualTime = new Date();
                 action = CoordCommandUtils.materializeOneInstance(jobId, 
dryrun, (Element) eJob.clone(),
                         nextTime, actualTime, lastActionNumber, jobConf, 
actionBean);
@@ -478,22 +482,22 @@ public class CoordMaterializeTransitionXCommand extends 
MaterializeTransitionXCo
             }
 
             if (!isCronFrequency) {
-                start = (Calendar) origStart.clone();
-                start.add(freqTU.getCalendarUnit(), lastActionNumber * 
Integer.parseInt(coordJob.getFrequency()));
+                effStart = (Calendar) origStart.clone();
+                effStart.add(freqTU.getCalendarUnit(), lastActionNumber * 
Integer.parseInt(coordJob.getFrequency()));
             }
         }
 
         if (isCronFrequency) {
-            if (start.compareTo(end) < 0 && !(ignoreMaxActions || 
maxActionToBeCreated-- > 0)) {
+            if (effStart.compareTo(end) < 0 && !(ignoreMaxActions || 
maxActionToBeCreated-- > 0)) {
                 //Since we exceed the throttle, we need to move the 
nextMadtime forward
                 //to avoid creating duplicate actions
                 if (!firstMater) {
-                    
start.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(start.getTime(),
 coordJob));
+                    
effStart.setTime(CoordCommandUtils.getNextValidActionTimeForCronFrequency(effStart.getTime(),
 coordJob));
                 }
             }
         }
 
-        endMatdTime = start.getTime();
+        endMatdTime = effStart.getTime();
 
         if (!dryrun) {
             return action;

http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
index 4a852cf..4286bcb 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
@@ -17,19 +17,25 @@
  */
 package org.apache.oozie.command.coord;
 
+import java.io.File;
 import java.sql.Timestamp;
 import java.util.Arrays;
+import java.util.Calendar;
 import java.util.Date;
 import java.util.List;
+import java.util.TimeZone;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.coord.TimeUnit;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
@@ -46,6 +52,9 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
 
 @SuppressWarnings("deprecation")
 public class TestCoordMaterializeTransitionXCommand extends XDataTestCase {
@@ -550,7 +559,7 @@ public class TestCoordMaterializeTransitionXCommand extends 
XDataTestCase {
         
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job);
         new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
         job = jpaService.execute(new CoordJobGetJPAExecutor(job.getId()));
-        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY ), 
job.getNextMaterializedTime());
+        assertEquals(new Date(startTime.getTime() + TIME_IN_DAY * 3), 
job.getNextMaterializedTime());
 
         // test with hours, time should not pass the current time.
         startTime = new Date(new Date().getTime());
@@ -627,6 +636,64 @@ public class TestCoordMaterializeTransitionXCommand 
extends XDataTestCase {
             Date pauseTime, int timeout, String freq) throws Exception {
         return addRecordToCoordJobTable(status, startTime, endTime, pauseTime, 
timeout, freq, CoordinatorJob.Execution.FIFO);
     }
+    public void testMaterizationEndOfMonths() throws Exception {
+        Configuration conf = new XConfiguration();
+        File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
+        String appXml = "<coordinator-app name=\"test\" 
frequency=\"${coord:endOfMonths(1)}\" start=\"2009-02-01T01:00Z\" "
+                + "end=\"2009-02-03T23:59Z\" timezone=\"UTC\" "
+                + "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> "
+                + "<execution>LIFO</execution> </controls> <datasets> "
+                + "<dataset name=\"a\" frequency=\"${coord:days(7)}\" 
initial-instance=\"2009-02-01T01:00Z\" "
+                + "timezone=\"UTC\"> <uri-template>"
+                + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+                + "</uri-template>  "
+                + "</dataset> "
+                + "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" 
initial-instance=\"2009-02-01T01:00Z\" "
+                + "timezone=\"UTC\"> <uri-template>"
+                + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+                + "</uri-template> "
+                + " </dataset> "
+                + "</datasets> <input-events> "
+                + "<data-in name=\"A\" dataset=\"a\"> 
<instance>${coord:latest(0)}</instance> </data-in>  "
+                + "</input-events> "
+                + "<output-events> <data-out name=\"LOCAL_A\" 
dataset=\"local_a\"> "
+                + "<instance>${coord:current(-1)}</instance> </data-out> 
</output-events> <action> <workflow> "
+                + "<app-path>hdfs:///tmp/workflows/</app-path> "
+                + "<configuration> <property> <name>inputA</name> 
<value>${coord:dataIn('A')}</value> </property> "
+                + "<property> <name>inputB</name> 
<value>${coord:dataOut('LOCAL_A')}</value> "
+                + "</property></configuration> </workflow> </action> 
</coordinator-app>";
+        writeToFile(appXml, appPathFile);
+        conf.set(OozieClient.COORDINATOR_APP_PATH, 
appPathFile.toURI().toString());
+        conf.set(OozieClient.USER_NAME, getTestUser());
+        CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);
+        String jobId = sc.call();
+
+        Date currentTime = new Date();
+        Date startTime = 
org.apache.commons.lang.time.DateUtils.addMonths(currentTime, -3);
+        Date endTime = 
org.apache.commons.lang.time.DateUtils.addMonths(currentTime, 3);
+        CoordinatorJobBean job = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, jobId);
+        assertEquals(job.getLastActionNumber(), 0);
+
+        job.setStartTime(startTime);
+        job.setEndTime(endTime);
+        job.setMatThrottling(10);
+        
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 job);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        job = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB, 
job.getId());
+        assertEquals(job.getLastActionNumber(), 3);
+
+        String jobXml = job.getJobXml();
+        Element eJob = XmlUtils.parseXml(jobXml);
+        TimeZone appTz = DateUtils.getTimeZone(job.getTimeZone());
+        TimeUnit endOfFlag = 
TimeUnit.valueOf(eJob.getAttributeValue("end_of_duration"));
+        TimeUnit freqTU = TimeUnit.valueOf(job.getTimeUnitStr());
+        Calendar origStart = Calendar.getInstance(appTz);
+        origStart.setTime(job.getStartTimestamp());
+        // Move to the End of duration, if needed.
+        DateUtils.moveToEnd(origStart, endOfFlag);
+        origStart.add(freqTU.getCalendarUnit(), 3 * 
Integer.parseInt(job.getFrequency()));
+        assertEquals(job.getNextMaterializedTime(), origStart.getTime());
+    }
 
     protected CoordinatorJobBean 
addRecordToCoordJobTable(CoordinatorJob.Status status, Date startTime, Date 
endTime,
             Date pauseTime, int timeout, String freq, CoordinatorJob.Execution 
execution) throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
index fedf4a8..82c9e99 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java
@@ -19,8 +19,6 @@ package org.apache.oozie.command.coord;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.Writer;
 import java.net.URI;
@@ -1219,22 +1217,6 @@ public class TestCoordSubmitXCommand extends 
XDataTestCase {
         return null;
     }
 
-    private void writeToFile(String appXml, File appPathFile) throws Exception 
{
-        PrintWriter out = null;
-        try {
-            out = new PrintWriter(new FileWriter(appPathFile));
-            out.println(appXml);
-        }
-        catch (IOException iex) {
-            throw iex;
-        }
-        finally {
-            if (out != null) {
-                out.close();
-            }
-        }
-    }
-
     /**
      * Test timeout setting
      *

http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java 
b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index 7614f03..b72c641 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -18,9 +18,12 @@
 package org.apache.oozie.test;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.UnsupportedEncodingException;
 import java.io.Writer;
@@ -1638,4 +1641,20 @@ public abstract class XDataTestCase extends 
XHCatTestCase {
         
CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB,
 coord);
     }
 
+    protected void writeToFile(String appXml, File appPathFile) throws 
Exception {
+        PrintWriter out = null;
+        try {
+            out = new PrintWriter(new FileWriter(appPathFile));
+            out.println(appXml);
+        }
+        catch (IOException iex) {
+            throw iex;
+        }
+        finally {
+            if (out != null) {
+                out.close();
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/d7514d57/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 65d454c..a50a3f5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.1.0 release (4.1 - unreleased)
 
+OOZIE-2064 coord job with frequency coord:endOfMonths doesn't materialize 
(puru)
 OOZIE-2063 Cron syntax creates duplicate actions (bzhang)
 OOZIE-2032 If using SSL, the port reported by Oozie is incorrect for HA tasks 
(rkanter)
 OOZIE-1959 TestZKUtilsWithSecurity fails (rkanter)

Reply via email to