Author: virag
Date: Wed Mar 6 23:28:21 2013
New Revision: 1453617
URL: http://svn.apache.org/r1453617
Log:
OOZIE-1253 latest() gets resolved before all push dependencies are resolved
(rohini via virag)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453617&r1=1453616&r2=1453617&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Wed Mar 6 23:28:21 2013
@@ -130,6 +130,13 @@ public class CoordActionInputCheckXComma
+ nonResolvedList.toString());
// Updating the list of data dependencies that are available and
those that are yet not
boolean status = checkInput(actionXml, existList, nonExistList,
actionConf);
+ String pushDeps = coordAction.getPushMissingDependencies();
+ // Resolve latest/future only when all current missingDependencies
and
+ // pushMissingDependencies are met
+ if (status) {
+ status = (pushDeps == null || pushDeps.length() == 0) ?
checkUnResolvedInput(actionXml, actionConf)
+ : false;
+ }
coordAction.setLastModifiedTime(currentTime);
coordAction.setActionXml(actionXml.toString());
if (nonResolvedList.length() > 0 && status == false) {
@@ -141,8 +148,7 @@ public class CoordActionInputCheckXComma
isChangeInDependency = true;
coordAction.setMissingDependencies(nonExistListStr);
}
- String pushDeps = coordAction.getPushMissingDependencies();
- if (status == true && (pushDeps == null || pushDeps.length() ==
0)) {
+ if (status) {
String newActionXml = resolveCoordConfiguration(actionXml,
actionConf, actionId);
actionXml.replace(0, actionXml.length(), newActionXml);
coordAction.setActionXml(actionXml.toString());
@@ -242,11 +248,13 @@ public class CoordActionInputCheckXComma
protected boolean checkInput(StringBuilder actionXml, StringBuilder
existList, StringBuilder nonExistList,
Configuration conf) throws Exception {
Element eAction = XmlUtils.parseXml(actionXml.toString());
- boolean allExist = checkResolvedUris(eAction, existList, nonExistList,
conf);
- if (allExist) {
- LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking
Latest/future");
- allExist = checkUnresolvedInstances(eAction, conf);
- }
+ return checkResolvedUris(eAction, existList, nonExistList, conf);
+ }
+
+ private boolean checkUnResolvedInput(StringBuilder actionXml,
Configuration conf) throws Exception {
+ Element eAction = XmlUtils.parseXml(actionXml.toString());
+ LOG.debug("[" + actionId + "]::ActionInputCheck:: Checking
Latest/future");
+ boolean allExist = checkUnresolvedInstances(eAction, conf);
if (allExist) {
actionXml.replace(0, actionXml.length(),
XmlUtils.prettyPrint(eAction).toString());
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453617&r1=1453616&r2=1453617&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
Wed Mar 6 23:28:21 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.executor.jpa.Coo
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionUpdateForInputCheckJPAExecutor;
+import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.CallableQueueService;
@@ -213,33 +214,22 @@ public class TestCoordActionInputCheckXC
CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- CoordinatorActionBean action = null;
JPAService jpaService = Services.get().get(JPAService.class);
- try {
- action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
-
+ CoordinatorActionBean action = jpaService
+ .execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+ // Update action creation time
String actionXML = action.getActionXml();
String actionCreationTime = "2009-02-15T01:00" + TZ;
actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
- try {
- jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
- action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
// providing some of the dataset dirs required as per coordinator
specification with holes
// before and after action creation time
@@ -253,13 +243,8 @@ public class TestCoordActionInputCheckXC
new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
//Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
Thread.sleep(1000);
- try {
- action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
actionXML = action.getActionXml();
assertEquals("", action.getMissingDependencies());
// Datasets only before action creation/actual time should be picked
up.
@@ -272,6 +257,81 @@ public class TestCoordActionInputCheckXC
assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
}
+ public void
testActionInputCheckLatestActionCreationTimeWithPushDependency() throws
Exception {
+
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
false);
+
+ String jobId = "0000000-" + new Date().getTime() +
"-TestCoordActionInputCheckXCommand-C";
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+ // Set push missing dependency
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService
+ .execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ final String pushMissingDependency = "file://" + getTestCaseDir() +
"/2009/02/05";
+ action.setPushMissingDependencies(pushMissingDependency);
+ jpaService.execute(new
CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+ // Update action creation time
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
+
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency,
action.getPushMissingDependencies());
+
+ // providing some of the dataset dirs required as per coordinator
specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
+ createDir(getTestCaseDir() + "/2009/02/12/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+ createDir(getTestCaseDir() + "/2009/01/01/");
+
+ // Run input check after making latest available
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency,
action.getPushMissingDependencies());
+
+ // Run input check after making push dependencies available
+ createDir(getTestCaseDir() + "/2009/02/05");
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getPushMissingDependencies());
+ checkCoordAction(job.getId() + "@1",
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ + "${coord:latestRange(-3,0)}",
CoordinatorAction.Status.WAITING);
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
+ Thread.sleep(1000);
+
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getMissingDependencies());
+ actionXML = action.getActionXml();
+ // Datasets only before action creation/actual time should be picked
up.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/02/12" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/22" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/01/08";
+ System.out.println("Expected: " + resolvedList);
+ System.out.println("Actual: " +
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ }
+
public void testActionInputCheckLatestCurrentTime() throws Exception {
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
true);
@@ -281,33 +341,22 @@ public class TestCoordActionInputCheckXC
CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
- CoordinatorActionBean action = null;
JPAService jpaService = Services.get().get(JPAService.class);
- try {
- action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
-
+ CoordinatorActionBean action = jpaService
+ .execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
action.getMissingDependencies());
+ // Update action creation time
String actionXML = action.getActionXml();
String actionCreationTime = "2009-02-15T01:00" + TZ;
actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ "\">");
action.setActionXml(actionXML);
action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
-
- try {
- jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
- action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
-
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
// providing some of the dataset dirs required as per coordinator
// specification with holes
@@ -322,13 +371,8 @@ public class TestCoordActionInputCheckXC
new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
//Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
Thread.sleep(1000);
- try {
- action = jpaService.execute(new
CoordActionGetJPAExecutor(job.getId() + "@1"));
- }
- catch (JPAExecutorException se) {
- fail("Action ID " + job.getId() + "@1" + " was not stored properly
in db");
- }
+ action = jpaService.execute(new CoordActionGetJPAExecutor(job.getId()
+ "@1"));
actionXML = action.getActionXml();
assertEquals("", action.getMissingDependencies());
// Datasets should be picked up based on current time and not action
creation/actual time.
@@ -339,6 +383,81 @@ public class TestCoordActionInputCheckXC
assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
}
+ public void testActionInputCheckLatestCurrentTimeWithPushDependency()
throws Exception {
+
Services.get().getConf().setBoolean(CoordELFunctions.LATEST_EL_USE_CURRENT_TIME,
true);
+
+ String jobId = "0000000-" + new Date().getTime() +
"-TestCoordActionInputCheckXCommand-C";
+ Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
+ Date endTime = DateUtils.parseDateOozieTZ("2009-02-16T23:59" + TZ);
+ CoordinatorJobBean job = addRecordToCoordJobTable(jobId, startTime,
endTime, "latest");
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+
+ // Set push missing dependency
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService
+ .execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ final String pushMissingDependency = "file://" + getTestCaseDir() +
"/2009/02/05";
+ action.setPushMissingDependencies(pushMissingDependency);
+ jpaService.execute(new
CoordActionUpdatePushInputCheckJPAExecutor(action));
+
+ // Update action creation time
+ String actionXML = action.getActionXml();
+ String actionCreationTime = "2009-02-15T01:00" + TZ;
+ actionXML = actionXML.replaceAll("action-actual-time=\".*\">",
"action-actual-time=\"" + actionCreationTime
+ + "\">");
+ action.setActionXml(actionXML);
+ action.setCreatedTime(DateUtils.parseDateOozieTZ(actionCreationTime));
+ jpaService.execute(new
CoordActionUpdateForInputCheckJPAExecutor(action));
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+
assertTrue(action.getActionXml().contains("action-actual-time=\"2009-02-15T01:00"))
;
+
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency,
action.getPushMissingDependencies());
+
+ // providing some of the dataset dirs required as per coordinator
specification with holes
+ // before and after action creation time
+ createDir(getTestCaseDir() + "/2009/03/05/");
+ createDir(getTestCaseDir() + "/2009/02/19/");
+ createDir(getTestCaseDir() + "/2009/02/12/");
+ createDir(getTestCaseDir() + "/2009/01/22/");
+ createDir(getTestCaseDir() + "/2009/01/08/");
+ createDir(getTestCaseDir() + "/2009/01/01/");
+
+ // Run input check after making latest available
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR +
"${coord:latestRange(-3,0)}",
+ action.getMissingDependencies());
+ assertEquals(pushMissingDependency,
action.getPushMissingDependencies());
+
+ // Run input check after making push dependencies available
+ createDir(getTestCaseDir() + "/2009/02/05");
+ new CoordPushDependencyCheckXCommand(job.getId() + "@1").call();
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getPushMissingDependencies());
+ checkCoordAction(job.getId() + "@1",
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ + "${coord:latestRange(-3,0)}",
CoordinatorAction.Status.WAITING);
+ new CoordActionInputCheckXCommand(job.getId() + "@1",
job.getId()).call();
+ //Sleep for sometime as it gets requeued with 10ms delay on failure to
acquire write lock
+ Thread.sleep(1000);
+
+ action = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(job.getId() + "@1"));
+ assertEquals("", action.getMissingDependencies());
+ actionXML = action.getActionXml();
+ // Datasets should be picked up based on current time and not action
creation/actual time.
+ String resolvedList = "file://" + getTestCaseDir() + "/2009/03/05" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/19" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/12" +
CoordELFunctions.INSTANCE_SEPARATOR
+ + "file://" + getTestCaseDir() + "/2009/02/05";
+ System.out.println("Expected: " + resolvedList);
+ System.out.println("Actual: " +
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ assertEquals(resolvedList,
actionXML.substring(actionXML.indexOf("<uris>") + 6,
actionXML.indexOf("</uris>")));
+ }
+
public void testActionInputCheckFuture() throws Exception {
String jobId = "0000000-" + new Date().getTime() +
"-TestCoordActionInputCheckXCommand-C";
Date startTime = DateUtils.parseDateOozieTZ("2009-02-15T23:59" + TZ);
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1453617&r1=1453616&r2=1453617&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Wed Mar 6 23:28:21 2013
@@ -4,6 +4,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
-- Oozie 4.0.0 (unreleased)
+OOZIE-1253 latest() gets resolved before all push dependencies are resolved
(rohini via virag)
OOZIE-1251 Log messages for DependencyChecker class show wrong jobid and
actionid (rohini via mona)
OOZIE-1218 Create a HCatalog Integration Guide (rohini via virag)
OOZIE-1250 Coord action timeout not happening when there is a exception
(rohini via mona)