Author: virag
Date: Sat Mar 16 00:45:06 2013
New Revision: 1457184

URL: http://svn.apache.org/r1457184
Log:
OOZIE-1269 Exception in push dependency check when there is also a pull 
dependency leaves it in waiting till timeout (rohini via virag)

Modified:
    
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/trunk/release-log.txt

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1457184&r1=1457183&r2=1457184&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 Sat Mar 16 00:45:06 2013
@@ -43,6 +43,7 @@ import org.apache.oozie.executor.jpa.Coo
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.RecoveryService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
@@ -152,12 +153,21 @@ public class CoordPushDependencyCheckXCo
                 }
             }
             catch (Exception e) {
+                final CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
                 if (isTimeout()) {
                     LOG.debug("Queueing timeout command");
                     // XCommand.queue() will not work when there is a Exception
-                    Services.get().get(CallableQueueService.class).queue(new 
CoordActionTimeOutXCommand(coordAction));
+                    callableQueueService.queue(new 
CoordActionTimeOutXCommand(coordAction));
                     
unregisterMissingDependencies(Arrays.asList(missingDepsArray), actionId);
                 }
+                else if (coordAction.getMissingDependencies() != null
+                        && coordAction.getMissingDependencies().length() > 0) {
+                    // Queue again on exception as RecoveryService will not 
queue this again with
+                    // the action being updated regularly by 
CoordActionInputCheckXCommand
+                    final RecoveryService recoveryService = 
Services.get().get(RecoveryService.class);
+                    callableQueueService.queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()),
+                            
recoveryService.getRecoveryServiceInterval(Services.get().getConf()) * 1000);
+                }
                 throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
             }
         }

Modified: 
oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1457184&r1=1457183&r2=1457184&view=diff
==============================================================================
--- 
oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java 
(original)
+++ 
oozie/trunk/core/src/main/java/org/apache/oozie/service/RecoveryService.java 
Sat Mar 16 00:45:06 2013
@@ -427,10 +427,14 @@ public class RecoveryService implements 
         Configuration conf = services.getConf();
         Runnable recoveryRunnable = new 
RecoveryRunnable(conf.getInt(CONF_WF_ACTIONS_OLDER_THAN, 120), conf.getInt(
                 CONF_COORD_OLDER_THAN, 
600),conf.getInt(CONF_BUNDLE_OLDER_THAN, 600));
-        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, 
conf.getInt(CONF_SERVICE_INTERVAL, 600),
+        services.get(SchedulerService.class).schedule(recoveryRunnable, 10, 
getRecoveryServiceInterval(conf),
                                                       
SchedulerService.Unit.SEC);
     }
 
+    public int getRecoveryServiceInterval(Configuration conf){
+        return conf.getInt(CONF_SERVICE_INTERVAL, 600);
+    }
+
     /**
      * Destroy the Recovery Service.
      */

Modified: 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1457184&r1=1457183&r2=1457184&view=diff
==============================================================================
--- 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 Sat Mar 16 00:45:06 2013
@@ -33,9 +33,11 @@ import org.apache.oozie.coord.CoordELFun
 import org.apache.oozie.dependency.DependencyChecker;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CallableQueueService;
 import org.apache.oozie.service.HCatAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.RecoveryService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.HCatURI;
@@ -349,6 +351,43 @@ public class TestCoordPushDependencyChec
     }
 
     @Test
+    public void testRequeueOnException() throws Exception {
+        Services.get().getConf().setInt(RecoveryService.CONF_SERVICE_INTERVAL, 
6000);
+        // Test timeout when table containing missing dependencies is dropped 
in between
+        String newHCatDependency1 = "hcat://" + server + 
"/nodb/notable/dt=20120430;country=brazil";
+        String newHCatDependency2 = "hcat://" + server + 
"/nodb/notable/dt=20120430;country=usa";
+        String newHCatDependency = newHCatDependency1 + 
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+        String actionId = addInitRecords(newHCatDependency);
+        checkCoordAction(actionId, newHCatDependency, 
CoordinatorAction.Status.WAITING);
+        try {
+            new CoordPushDependencyCheckXCommand(actionId, true).call();
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(e.getMessage().contains("NoSuchObjectException"));
+        }
+        // Nothing should be queued as there are no pull dependencies
+        CallableQueueService callableQueueService = 
Services.get().get(CallableQueueService.class);
+        assertEquals(0, callableQueueService.getQueueDump().size());
+
+        setMissingDependencies(actionId, newHCatDependency1);
+        try {
+            new CoordPushDependencyCheckXCommand(actionId, true).call();
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(e.getMessage().contains("NoSuchObjectException"));
+        }
+        // Should be requeued at the recovery service interval
+        final List<String> queueDump = callableQueueService.getQueueDump();
+        assertEquals(1, callableQueueService.getQueueDump().size());
+        // Delay should be something like delay=5999999. Ignore last digit
+        assertTrue(queueDump.get(0).contains("delay=599999"));
+        
assertTrue(queueDump.get(0).contains(CoordPushDependencyCheckXCommand.class.getName()));
+    }
+
+    @Test
     public void testLogMessagePrefix() throws Exception {
         Logger logger = Logger.getLogger(DependencyChecker.class);
         ByteArrayOutputStream out = new ByteArrayOutputStream();

Modified: oozie/trunk/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1457184&r1=1457183&r2=1457184&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Sat Mar 16 00:45:06 2013
@@ -7,6 +7,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
 
 -- Oozie 4.0.0 (unreleased)
 
+OOZIE-1269 Exception in push dependency check when there is also a pull 
dependency leaves it in waiting till timeout (rohini via virag)
 OOZIE-1267 Dryrun option for push missing deps (virag)
 OOZIE-1263 Fix few HCat dependency check issues (rohini via virag)
 OOZIE-1261 Registered push dependencies are not removed on Coord Kill command 
(virag)


Reply via email to