Author: virag
Date: Thu Mar 21 20:36:14 2013
New Revision: 1459513
URL: http://svn.apache.org/r1459513
Log:
OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove
dependencies from cache (rohini via virag)
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/branches/branch-4.0/release-log.txt
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Thu Mar 21 20:36:14 2013
@@ -70,15 +70,22 @@ public class CoordPushDependencyCheckXCo
*/
private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
private boolean registerForNotification;
+ private boolean removeAvailDependencies;
public CoordPushDependencyCheckXCommand(String actionId) {
- this(actionId, false);
+ this(actionId, false, true);
}
public CoordPushDependencyCheckXCommand(String actionId, boolean
registerForNotification) {
+ this(actionId, registerForNotification, !registerForNotification);
+ }
+
+ public CoordPushDependencyCheckXCommand(String actionId, boolean
registerForNotification,
+ boolean removeAvailDependencies) {
super("coord_push_dep_check", "coord_push_dep_check", 0);
this.actionId = actionId;
this.registerForNotification = registerForNotification;
+ this.removeAvailDependencies = removeAvailDependencies;
}
protected CoordPushDependencyCheckXCommand(String actionName, String
actionId) {
@@ -145,7 +152,7 @@ public class CoordPushDependencyCheckXCo
if (registerForNotification) {
registerForNotification(actionDep.getMissingDependencies(), actionConf);
}
- else {
+ if (removeAvailDependencies) {
unregisterAvailableDependencies(actionDep.getAvailableDependencies());
}
if (timeout) {
@@ -164,8 +171,9 @@ public class CoordPushDependencyCheckXCo
&& coordAction.getMissingDependencies().length() > 0) {
// Queue again on exception as RecoveryService will not
queue this again with
// the action being updated regularly by
CoordActionInputCheckXCommand
- callableQueueService.queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()), Services
-
.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) * 1000);
+ callableQueueService.queue(new
CoordPushDependencyCheckXCommand(coordAction.getId(),
+ registerForNotification, removeAvailDependencies),
+
Services.get().getConf().getInt(RecoveryService.CONF_COORD_OLDER_THAN, 600) *
1000);
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified:
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
(original)
+++
oozie/branches/branch-4.0/core/src/main/java/org/apache/oozie/service/RecoveryService.java
Thu Mar 21 20:36:14 2013
@@ -245,7 +245,7 @@ public class RecoveryService implements
+ caction.getId());
if (caction.getPushMissingDependencies() != null
&&
caction.getPushMissingDependencies().length() != 0) {
- queueCallable(new
CoordPushDependencyCheckXCommand(caction.getId(), true),
+ queueCallable(new
CoordPushDependencyCheckXCommand(caction.getId(), true, true),
pushMissingDepDelay);
pushMissingDepDelay = pushMissingDepDelay +
pushMissingDepInterval;
log.info("Recover a WAITING coord action and
resubmit CoordPushDependencyCheckX :"
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Thu Mar 21 20:36:14 2013
@@ -395,9 +395,10 @@ public class TestCoordPushDependencyChec
// Should be requeued at the recovery service interval
final List<String> queueDump = callableQueueService.getQueueDump();
assertEquals(1, callableQueueService.getQueueDump().size());
- // Delay should be something like delay=5999999. Ignore last digit
- assertTrue(queueDump.get(0).contains("delay=599999"));
assertTrue(queueDump.get(0).contains(CoordPushDependencyCheckXCommand.class.getName()));
+ log.info("Queue dump is " + queueDump.toString());
+ // Delay should be something like delay=599999. Ignore last three
digits
+ assertTrue(queueDump.get(0).matches("delay=599[0-9]{3}, .*"));
}
@Test
Modified:
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
---
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
(original)
+++
oozie/branches/branch-4.0/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
Thu Mar 21 20:36:14 2013
@@ -356,6 +356,7 @@ public class TestRecoveryService extends
HCatAccessorService hcatService =
services.get(HCatAccessorService.class);
JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
assertFalse(jmsService.isListeningToTopic(hcatService.getJMSConnectionInfo(new
URI(newHCatDependency1)), db
+ "." + table));
@@ -363,6 +364,10 @@ public class TestRecoveryService extends
String actionId = addInitRecords(newHCatDependency);
CoordinatorAction ca = checkCoordActionDependencies(actionId,
newHCatDependency);
assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+ // Register the missing dependencies to PDMS assuming
CoordPushDependencyCheckCommand did this.
+ pdms.addMissingDependency(new HCatURI(newHCatDependency1), actionId);
+ pdms.addMissingDependency(new HCatURI(newHCatDependency2), actionId);
+
sleep(2000);
Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
recoveryRunnable.run();
@@ -374,7 +379,6 @@ public class TestRecoveryService extends
+ db + "." + table));
checkCoordActionDependencies(actionId, newHCatDependency1);
- PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
Collection<String> waitingActions = pdms.getWaitingActions(new
HCatURI(newHCatDependency1));
assertEquals(1, waitingActions.size());
Modified: oozie/branches/branch-4.0/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-4.0/release-log.txt?rev=1459513&r1=1459512&r2=1459513&view=diff
==============================================================================
--- oozie/branches/branch-4.0/release-log.txt (original)
+++ oozie/branches/branch-4.0/release-log.txt Thu Mar 21 20:36:14 2013
@@ -1,5 +1,6 @@
-- Oozie 4.0.0 (unreleased)
+OOZIE-1280 CoordPushDependencyCheck queued by Recovery Services doesn't remove
dependencies from cache (rohini via virag)
OOZIE-1277 CoordActionInputCheck requeues itself even if only push missing
dependencies exist (virag)
OOZIE-1272 Two workflow jobs mapped to a single coordinator action (ryota via
virag)
OOZIE-1274 change recovery service interval to make it consistent with
oozie-default.xml (ryota via virag)