Author: virag
Date: Mon Dec 10 18:34:34 2012
New Revision: 1419641
URL: http://svn.apache.org/viewvc?rev=1419641&view=rev
Log:
OOZIE-1071 latest EL function is based on action materialization time (rohini
via virag)
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/release-log.txt
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java?rev=1419641&r1=1419640&r2=1419641&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/coord/CoordELFunctions.java
Mon Dec 10 18:34:34 2012
@@ -47,6 +47,7 @@ public class CoordELFunctions {
final private static String DATASET = "oozie.coord.el.dataset.bean";
final private static String COORD_ACTION = "oozie.coord.el.app.bean";
final public static String CONFIGURATION = "oozie.coord.el.conf";
+ final public static String LATEST_EL_USE_CURRENT_TIME =
"oozie.service.ELService.latest-el.use-current-time";
// INSTANCE_SEPARATOR is used to separate multiple directories into one
tag.
final public static String INSTANCE_SEPARATOR = "#";
final public static String DIR_SEPARATOR = ",";
@@ -1111,7 +1112,14 @@ public class CoordELFunctions {
int datasetFrequency = (int) getDSFrequency();// in minutes
TimeUnit dsTimeUnit = getDSTimeUnit();
int[] instCount = new int[1];
- Calendar nominalInstanceCal = getCurrentInstance(getActualTime(),
instCount);
+ boolean useCurrentTime =
Services.get().getConf().getBoolean(LATEST_EL_USE_CURRENT_TIME, false);
+ Calendar nominalInstanceCal;
+ if (useCurrentTime) {
+ nominalInstanceCal = getCurrentInstance(new Date(), instCount);
+ }
+ else {
+ nominalInstanceCal = getCurrentInstance(getActualTime(),
instCount);
+ }
StringBuilder resolvedInstances = new StringBuilder();
StringBuilder resolvedURIPaths = new StringBuilder();
if (nominalInstanceCal != null) {
@@ -1139,7 +1147,7 @@ public class CoordELFunctions {
}
if (isPathAvailable(pathWithDoneFlag, user, null, conf)) {
LOG.debug("Found latest(" + available + "): " +
pathWithDoneFlag);
- if (available == endOffset) {
+ if (available == startOffset) {
LOG.debug("Matched latest(" + available + "): " +
pathWithDoneFlag);
resolved = true;
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal));
@@ -1147,7 +1155,7 @@ public class CoordELFunctions {
retVal = resolvedInstances.toString();
eval.setVariable("resolved_path",
resolvedURIPaths.toString());
break;
- } else if (available <= startOffset) {
+ } else if (available <= endOffset) {
LOG.debug("Matched latest(" + available + "): " +
pathWithDoneFlag);
resolvedInstances.append(DateUtils.formatDateOozieTZ(nominalInstanceCal)).append(INSTANCE_SEPARATOR);
resolvedURIPaths.append(uriPath).append(INSTANCE_SEPARATOR);
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1419641&r1=1419640&r2=1419641&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
(original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Mon Dec
10 18:34:34 2012
@@ -1031,6 +1031,14 @@
</description>
</property>
+ <property>
+ <name>oozie.service.ELService.latest-el.use-current-time</name>
+ <value>false</value>
+ <description>
+ Determine whether to use the current time to determine the latest
dependency or the action creation time.
+ This is for backward compatibility with older oozie behaviour.
+ </description>
+ </property>
<!-- UUIDService -->
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1419641&r1=1419640&r2=1419641&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
Mon Dec 10 18:34:34 2012
@@ -32,8 +32,11 @@ import org.apache.oozie.client.OozieClie
import org.apache.oozie.client.CoordinatorJob.Execution;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
import org.apache.oozie.command.CommandException;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
@@ -198,18 +201,19 @@ public class TestCoordActionInputCheckXC
}
}
- public void testActionInputCheckLatest() throws Exception {
+ public void testActionInputCheckLatestActionCreationTime() throws
Exception {
+
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
false);
+
String jobId = "0000000-" + new Date().getTime() +
"-TestCoordActionInputCheckXCommand-C";
Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
CoordinatorActionBean action = null;
JPAService jpaService = Services.get().get(JPAService.class);
try {
- action = jpaService.execute(new
CoordActionGetJPAExecutor(job.getId() + "@1"));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
}
catch (JPAExecutorException se) {
fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
@@ -217,13 +221,102 @@ public class TestCoordActionInputCheckXC
assertEquals(";${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+
+ try {
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
+ }
+ catch (JPAExecutorException se) {
+ fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
+ }
+
// providing some of the dataset dirs required as per coordinator
specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
+ createDir(getTestCaseDir() + "/2009/02/12/");
+ createDir(getTestCaseDir() + "/2009/02/05/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
+ Thread.sleep(1000);
+ try {
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ }
+ catch (JPAExecutorException se) {
+ fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
+ }
+
+ actionXML = action.getActionXml();
+ assertEquals("", action.getMissingDependencies());
+ // Datasets only before action creation/actual time should be picked
up.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/22" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/08";
+ System.out.println("Expected: " + resolvedList);
+ System.out.println("Actual: " +
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ }
+
+ public void testActionInputCheckLatestCurrentTime() throws Exception {
+
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
true);
+
+ String jobId = "0000000-" + new Date().getTime() +
"-TestCoordActionInputCheckXCommand-C";
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+ CoordinatorActionBean action = null;
+ JPAService jpaService = Services.get().get(JPAService.class);
+ try {
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ }
+ catch (JPAExecutorException se) {
+ fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
+ }
+
+ assertEquals(";${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+
+ try {
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
+ }
+ catch (JPAExecutorException se) {
+ fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
+ }
+
+ // providing some of the dataset dirs required as per coordinator
+ // specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
createDir(getTestCaseDir() + "/2009/02/12/");
createDir(getTestCaseDir() + "/2009/02/05/");
createDir(getTestCaseDir() + "/2009/01/22/");
createDir(getTestCaseDir() + "/2009/01/08/");
new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
+ Thread.sleep(1000);
try {
action = jpaService.execute(new
CoordActionGetJPAExecutor(job.getId() + "@1"));
}
@@ -231,7 +324,14 @@ public class TestCoordActionInputCheckXC
fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
}
+ actionXML = action.getActionXml();
assertEquals("", action.getMissingDependencies());
+ // Datasets should be picked up based on current time and not action
creation/actual time.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/19" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/12" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05";
+ assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
}
public void testActionInputCheckFuture() throws Exception {
@@ -270,6 +370,12 @@ public class TestCoordActionInputCheckXC
}
assertEquals("", action.getMissingDependencies());
+ String actionXML = action.getActionXml();
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/26" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/03/05" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/03/12";
+ assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
}
/**
* Testing a non existing namenode path
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1419641&r1=1419640&r2=1419641&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Mon Dec 10 18:34:34 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1071 latest EL function is based on action materialization time (rohini
via virag)
OOZIE-1106 latest and future function do not work correctly when oozie
processing timezone is non UTC (rohini via tucu)
OOZIE-1073 Optimize latest and future EL resolution in case of start-instance
and end-instance (rohini via virag)
OOZIE-1107 Change default done-flag from _SUCCESS to empty for Hcat (mohammad)