Author: virag
Date: Thu Jan  3 17:23:45 2013
New Revision: 1428491

URL: http://svn.apache.org/viewvc?rev=1428491&view=rev
Log:
OOZIE-1145 Modify Recovery Service to handle push missing dependencies (virag)

Modified:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
    oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    oozie/branches/hcat-intre/release-log.txt

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
 Thu Jan  3 17:23:45 2013
@@ -126,7 +126,7 @@ import org.apache.openjpa.persistence.jd
 
         @NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select 
a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND 
a.lastModifiedTimestamp <= :lastModifiedTime"),
 
-        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", 
query = "select a.id, a.jobId, a.status, a.externalId from 
CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED') 
AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+        @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN", 
query = "select a.id, a.jobId, a.status, a.externalId, 
a.pushMissingDependencies from CoordinatorActionBean a where (a.status = 
'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <= 
:lastModifiedTime"),
 
         @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query 
= "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a 
where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR 
a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
         // Select query used by rerun, requires almost all columns so select * 
is used

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
 Thu Jan  3 17:23:45 2013
@@ -143,18 +143,18 @@ public class CoordActionInputCheckXComma
                 // pass jobID to the CoordActionReadyXCommand
                 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 
100);
             }
-            else {
-                long waitingTime = (currentTime.getTime() - 
Math.max(coordAction.getNominalTime().getTime(), coordAction
-                        .getCreatedTime().getTime()))
-                        / (60 * 1000);
-                int timeOut = coordAction.getTimeOut();
-                if ((timeOut >= 0) && (waitingTime > timeOut)) {
-                    queue(new CoordActionTimeOutXCommand(coordAction), 100);
+            else if (!isTimeout(currentTime)) {
+                if (status == false) {
+                    queue(new 
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
+                            getCoordInputCheckRequeueInterval());
                 }
                 else {
-                    queue(new 
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()), 
getCoordInputCheckRequeueInterval());
+                    queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()));
                 }
             }
+            else {
+                queue(new CoordActionTimeOutXCommand(coordAction), 100);
+            }
         }
         catch (Exception e) {
             throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
@@ -174,6 +174,18 @@ public class CoordActionInputCheckXComma
         return null;
     }
 
+
+    private boolean isTimeout(Date currentTime) {
+        long waitingTime = (currentTime.getTime() - 
Math.max(coordAction.getNominalTime().getTime(), coordAction
+                .getCreatedTime().getTime()))
+                / (60 * 1000);
+        int timeOut = coordAction.getTimeOut();
+        if ((timeOut >= 0) && (waitingTime > timeOut)) {
+            return true;
+        }
+        return false;
+    }
+
     /**
      * This function reads the value of re-queue interval for coordinator input
      * check command from the Oozie configuration provided by Configuration

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 Thu Jan  3 17:23:45 2013
@@ -40,6 +40,7 @@ import org.apache.oozie.executor.jpa.Coo
 import 
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.service.URIHandlerService;
@@ -52,6 +53,18 @@ public class CoordPushDependencyCheckXCo
     private JPAService jpaService = null;
     private CoordinatorActionBean coordAction = null;
 
+    /**
+     * Property name of command re-queue interval for coordinator push check in
+     * milliseconds.
+     */
+    public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL = 
Service.CONF_PREFIX
+            + "coord.push.check.requeue.interval";
+    /**
+     * Default re-queue interval in ms. It is applied when no value defined in
+     * the oozie configuration.
+     */
+    private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
+
     public CoordPushDependencyCheckXCommand(String actionId) {
         super("coord_push_dep_check", "coord_push_dep_check", 0);
         this.actionId = actionId;
@@ -79,7 +92,12 @@ public class CoordPushDependencyCheckXCo
             pushDeps = StringUtils.join(missingDeps, 
CoordELFunctions.INSTANCE_SEPARATOR);
             coordAction.setPushMissingDependencies(pushDeps);
             // Checking for timeout
-            handleTimeout();
+            if (!isTimeout()) {
+                queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()), 
getCoordPushCheckRequeueInterval());
+            }
+            else {
+                queue(new CoordActionTimeOutXCommand(coordAction), 100);
+            }
         }
         else { // All push-based dependencies are available
             coordAction.setPushMissingDependencies("");
@@ -93,6 +111,16 @@ public class CoordPushDependencyCheckXCo
         return null;
     }
 
+    /**
+     * Return the re-queue interval for coord push dependency check
+     * @return
+     */
+    public long getCoordPushCheckRequeueInterval() {
+        long requeueInterval = 
Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
+                DEFAULT_COMMAND_REQUEUE_INTERVAL);
+        return requeueInterval;
+    }
+
     private List<String> getMissingDependencies(String[] dependencies, 
Configuration conf, String user)
             throws CommandException {
         List<String> missingDeps = new ArrayList<String>();
@@ -127,14 +155,16 @@ public class CoordPushDependencyCheckXCo
         }
     }
 
-    private void handleTimeout() {
+    // returns true if timeout command is queued
+    private boolean isTimeout() {
         long waitingTime = (new Date().getTime() - 
Math.max(coordAction.getNominalTime().getTime(), coordAction
                 .getCreatedTime().getTime()))
                 / (60 * 1000);
         int timeOut = coordAction.getTimeOut();
         if ((timeOut >= 0) && (waitingTime > timeOut)) {
-            queue(new CoordActionTimeOutXCommand(coordAction), 100);
+            return true;
         }
+        return false;
     }
 
     /*

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
 Thu Jan  3 17:23:45 2013
@@ -60,7 +60,7 @@ public class CoordActionsGetForRecoveryJ
             q.setParameter("lastModifiedTime", ts);
             List<Object[]> objectArrList = q.getResultList();
             for (Object[] arr : objectArrList) {
-                CoordinatorActionBean caa = 
getBeanForCoordinatorActionFromArray(arr);
+                CoordinatorActionBean caa = 
getBeanForCoordinatorActionFromArrayForRecovery(arr);
                 allActions.add(caa);
             }
 
@@ -68,7 +68,7 @@ public class CoordActionsGetForRecoveryJ
             q.setParameter("lastModifiedTime", ts);
             objectArrList = q.getResultList();
             for (Object[] arr : objectArrList) {
-                CoordinatorActionBean caa = 
getBeanForCoordinatorActionFromArray(arr);
+                CoordinatorActionBean caa = 
getBeanForCoordinatorActionFromArrayForWaiting(arr);
                 allActions.add(caa);
             }
 
@@ -79,7 +79,7 @@ public class CoordActionsGetForRecoveryJ
         }
     }
 
-    private CoordinatorActionBean 
getBeanForCoordinatorActionFromArray(Object[] arr) {
+    private CoordinatorActionBean 
getBeanForCoordinatorActionFromArrayForRecovery(Object[] arr) {
         CoordinatorActionBean bean = new CoordinatorActionBean();
         if (arr[0] != null) {
             bean.setId((String) arr[0]);
@@ -96,4 +96,25 @@ public class CoordActionsGetForRecoveryJ
         return bean;
     }
 
+
+    private CoordinatorActionBean 
getBeanForCoordinatorActionFromArrayForWaiting(Object[] arr){
+        CoordinatorActionBean bean = new CoordinatorActionBean();
+        if (arr[0] != null) {
+            bean.setId((String) arr[0]);
+        }
+        if (arr[1] != null){
+            bean.setJobId((String) arr[1]);
+        }
+        if (arr[2] != null) {
+            bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
+        }
+        if (arr[3] != null) {
+            bean.setExternalId((String) arr[3]);
+        }
+        if (arr[4] != null) {
+            bean.setPushMissingDependencies((String) arr[4]);
+        }
+        return bean;
+    }
+
 }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
 Thu Jan  3 17:23:45 2013
@@ -321,7 +321,7 @@ public class JMSAccessorService implemen
             });
 
         }
-        catch (Exception e1){
+        catch (Exception e1) {
             LOG.error(e1.getMessage(), e1);
             if (conn != null) {
                 try {
@@ -331,6 +331,7 @@ public class JMSAccessorService implemen
                     LOG.error(e2.getMessage(), e2);
                 }
             }
+            throw new ServiceException(ErrorCode.E0100, getClass().getName(), 
e1.getMessage(), e1);
         }
         return conn;
     }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 Thu Jan  3 17:23:45 2013
@@ -146,6 +146,40 @@ public class PartitionDependencyManagerS
     }
 
     /**
+     * Checks whether the tables exists in the Map or not
+     * @param hcatURI
+     * @return
+     * @throws MetadataServiceException
+     */
+    public boolean containsTable(String hcatURI) throws 
MetadataServiceException {
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1025, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        return containsTable(partition);
+
+    }
+
+    private boolean containsTable(PartitionWrapper partition) {
+        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
+        Map<String, PartitionsGroup> tablePartitionsMap;
+        String tableName = partition.getTableName();
+        if (hcatInstanceMap.containsKey(prefix)) {
+            tablePartitionsMap = hcatInstanceMap.get(prefix);
+            if (tablePartitionsMap.containsKey(tableName)) {
+                return true;
+            }
+        }
+        return false;
+
+    }
+
+    /**
      * Adding missing partition entry specified by PartitionWrapper object
      *
      * @param partition
@@ -445,8 +479,7 @@ public class PartitionDependencyManagerS
         return containsPartition(partition);
     }
 
-
-    /**
+   /**
      * Determine if a partition entry exists in cache
      *
      * @param partition
@@ -474,7 +507,8 @@ public class PartitionDependencyManagerS
         PartitionsGroup missingPartitions = tableMap.get(tableName);
         if (missingPartitions != null && 
missingPartitions.getPartitionsMap().containsKey(partition)) {
             actionsList = missingPartitions.getPartitionsMap().get(partition);
-            if (actionsList != null) {
+            // TODO - check whether set will be better than list
+            if (actionsList != null && 
!actionsList.getActions().contains(actionId)) {
                 // partition exists, therefore append action
                 actionsList.addAndUpdate(actionId);
             }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
 Thu Jan  3 17:23:45 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.command.coord.Co
 import org.apache.oozie.command.coord.CoordActionReadyXCommand;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.command.coord.CoordKillXCommand;
+import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
 import org.apache.oozie.command.coord.CoordResumeXCommand;
 import org.apache.oozie.command.coord.CoordSubmitXCommand;
 import org.apache.oozie.command.coord.CoordSuspendXCommand;
@@ -46,6 +47,7 @@ import org.apache.oozie.command.wf.KillX
 import org.apache.oozie.command.wf.ResumeXCommand;
 import org.apache.oozie.command.wf.SignalXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
+import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
@@ -80,6 +82,12 @@ public class RecoveryService implements 
      * The number of callables to be queued in a batch.
      */
     public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX + 
"callable.batch.size";
+
+    /**
+     * Delay for the push missing dependencies in milliseconds.
+     */
+    public static final String CONF_PUSH_DEPENDENCY_DELAY = CONF_PREFIX + 
"push.dependency.delay";
+
     /**
      * Age of actions to queue, in seconds.
      */
@@ -113,6 +121,10 @@ public class RecoveryService implements 
         private List<XCallable<?>> delayedCallables;
         private StringBuilder msg = null;
         private JPAService jpaService = null;
+        URIHandlerService uriService = 
Services.get().get(URIHandlerService.class);
+        JMSAccessorService jmsService = 
Services.get().get(JMSAccessorService.class);
+        PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
+
 
         public RecoveryRunnable(long olderThan, long coordOlderThan,long 
bundleOlderThan) {
             this.olderThan = olderThan;
@@ -210,6 +222,31 @@ public class RecoveryService implements 
                 }
             }
 
+        }
+
+        private void registerPartitions(CoordinatorActionBean actionBean) 
throws URIAccessorException,
+                MetadataServiceException {
+            String pushDeps = actionBean.getPushMissingDependencies();
+            String[] pushDepsArr = 
pushDeps.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+
+            String firstURI = pushDepsArr[0];
+            String uriWithSchemeAuthority = 
uriService.getAuthorityWithScheme(firstURI).toString();
+            if (jmsService.isExistsConnection(uriWithSchemeAuthority)) {
+                if (pdms.containsTable(firstURI)) {
+                    return; // assuming that all partitions are registered as
+                            // connection and table exists
+                }
+                else {
+                    for (String uri : pushDepsArr) {
+                        pdms.addMissingPartition(uri, actionBean.getId());
+                    }
+                }
+            }
+            else if (jmsService.getOrCreateConnection(uriWithSchemeAuthority)) 
{
+                for (String uri : pushDepsArr) {
+                    pdms.addMissingPartition(uri, actionBean.getId());
+                }
+            }
 
         }
 
@@ -219,6 +256,7 @@ public class RecoveryService implements 
         private void runCoordActionRecovery() {
             XLog.Info.get().clear();
             XLog log = XLog.getLog(getClass());
+            long pushMissingDepDelay = 
Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_DELAY, 60000);
             List<CoordinatorActionBean> cactions = null;
             try {
                 cactions = jpaService.execute(new 
CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
@@ -234,8 +272,11 @@ public class RecoveryService implements 
                             .incr(INSTRUMENTATION_GROUP, 
INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
                     if (caction.getStatus() == 
CoordinatorActionBean.Status.WAITING) {
                         queueCallable(new 
CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
-
-                        log.info("Recover a WAITTING coord action and resubmit 
CoordActionInputCheckXCommand :"
+                        log.info("Recover a WAITING coord action and resubmit 
CoordActionInputCheckXCommand :"
+                                + caction.getId());
+                        queueCallable(new 
CoordPushDependencyCheckXCommand(caction.getId()), pushMissingDepDelay);
+                        registerPartitions(caction);
+                        log.info("Recover a WAITING coord action and resubmit 
CoordPushDependencyCheckX :"
                                 + caction.getId());
                     }
                     else if (caction.getStatus() == 
CoordinatorActionBean.Status.SUBMITTED) {

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
 Thu Jan  3 17:23:45 2013
@@ -34,6 +34,7 @@ public class WaitingActions {
      * Empty (default) constructor
      */
     public WaitingActions() {
+        // TODO - as Writes are frequent, check whether this is correct or not?
         this(new CopyOnWriteArrayList<String>());
     }
 

Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml 
(original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Thu Jan 
 3 17:23:45 2013
@@ -262,6 +262,15 @@
     </property>
 
     <property>
+        <name>oozie.service.RecoveryService.push.dependency.delay</name>
+        <value>60000</value>
+        <description>
+            This value determines the delay for push missing dependancy command
+            in Recovery Service
+        </description>
+    </property>
+
+    <property>
         <name>oozie.service.RecoveryService.interval</name>
         <value>60</value>
         <description>
@@ -396,6 +405,14 @@
        </property>
 
        <property>
+               <name>oozie.service.coord.push.check.requeue.interval
+               </name>
+               <value>600000</value>
+               <description>Command re-queue interval for push dependencies 
(in millisecond).
+        </description>
+       </property>
+
+       <property>
                <name>oozie.service.coord.default.concurrency
                </name>
                <value>1</value>

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
 Thu Jan  3 17:23:45 2013
@@ -17,35 +17,24 @@
  */
 package org.apache.oozie.command.coord;
 
-import java.io.IOException;
-import java.io.Reader;
-import java.util.Date;
 import java.util.List;
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JMSAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.HCatURI;
-import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PartitionWrapper;
-import org.apache.oozie.util.XLog;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestCoordActionUpdatePushMissingDependency extends XDataTestCase {
-    private String TZ;
     private Services services;
 
     @Before
@@ -56,8 +45,6 @@ public class TestCoordActionUpdatePushMi
         
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, 
"100");
         services = super.setupServicesForHCatalog();
         services.init();
-        TZ = 
(getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : 
getProcessingTZ()
-                .substring(3);
     }
 
     @After
@@ -157,71 +144,4 @@ public class TestCoordActionUpdatePushMi
         }
     }
 
-    private String addInitRecords(String pushMissingDependencies) throws 
Exception {
-        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
-        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
-        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
-                CoordinatorJob.Status.RUNNING, startTime, endTime, false, 
true, 3);
-
-        CoordinatorActionBean action1 = 
addRecordToCoordActionTableForWaiting(job.getId(), 1,
-                CoordinatorAction.Status.WAITING, 
"coord-action-for-action-input-check.xml", pushMissingDependencies);
-        return action1.getId();
-    }
-
-    protected CoordinatorActionBean 
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
-            CoordinatorAction.Status status, String resourceXmlName, String 
pushMissingDependencies) throws Exception {
-        CoordinatorActionBean action = createCoordAction(jobId, actionNum, 
status, resourceXmlName, 0, TZ);
-        action.setPushMissingDependencies(pushMissingDependencies);
-        try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
-            CoordActionInsertJPAExecutor coordActionInsertCmd = new 
CoordActionInsertJPAExecutor(action);
-            jpaService.execute(coordActionInsertCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Unable to insert the test coord action record to table");
-            throw je;
-        }
-        return action;
-    }
-
-    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status,
-            Date start, Date end, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
-
-        String testDir = getTestCaseDir();
-        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, 
start, end, pending, doneMatd, lastActionNum);
-        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
-        coordJob.setJobXml(appXml);
-
-        try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
-            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
-            jpaService.execute(coordInsertCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Unable to insert the test coord job record to table");
-            throw je;
-        }
-
-        return coordJob;
-    }
-
-    protected String getCoordJobXmlForWaiting(String testFileName, String 
testDir) {
-        try {
-            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
-            String appXml = IOUtils.getReaderAsString(reader, -1);
-            appXml = appXml.replaceAll("#testDir", testDir);
-            return appXml;
-        }
-        catch (IOException ioe) {
-            throw new RuntimeException(XLog.format("Could not get " + 
testFileName, ioe));
-        }
-    }
-
-    protected String getProcessingTZ() {
-        return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
-    }
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 Thu Jan  3 17:23:45 2013
@@ -17,34 +17,23 @@
  */
 package org.apache.oozie.command.coord;
 
-import java.io.IOException;
-import java.io.Reader;
-import java.util.Date;
 import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.dependency.FSURIHandler;
 import org.apache.oozie.dependency.HCatURIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.PartitionWrapper;
-import org.apache.oozie.util.XLog;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
-    private String TZ;
     private String server;
     private Services services = null;
 
@@ -56,8 +45,6 @@ public class TestCoordPushDependencyChec
                 FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
         services.init();
         server = getMetastoreAuthority();
-        TZ = 
(getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" : 
getProcessingTZ()
-                .substring(3);
     }
 
     @After
@@ -162,72 +149,4 @@ public class TestCoordPushDependencyChec
         }
     }
 
-    private String addInitRecords(String pushMissingDependencies) throws 
Exception {
-        Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
-        Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
-        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
-                CoordinatorJob.Status.RUNNING, startTime, endTime, false, 
true, 3);
-
-        CoordinatorActionBean action1 = 
addRecordToCoordActionTableForWaiting(job.getId(), 1,
-                CoordinatorAction.Status.WAITING, 
"coord-action-for-action-input-check.xml", pushMissingDependencies);
-        return action1.getId();
-    }
-
-    protected CoordinatorActionBean 
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
-            CoordinatorAction.Status status, String resourceXmlName, String 
pushMissingDependencies) throws Exception {
-        CoordinatorActionBean action = createCoordAction(jobId, actionNum, 
status, resourceXmlName, 0, TZ);
-        action.setPushMissingDependencies(pushMissingDependencies);
-        try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
-            CoordActionInsertJPAExecutor coordActionInsertCmd = new 
CoordActionInsertJPAExecutor(action);
-            jpaService.execute(coordActionInsertCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Unable to insert the test coord action record to table");
-            throw je;
-        }
-        return action;
-    }
-
-    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status,
-            Date start, Date end, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
-
-        String testDir = getTestCaseDir();
-        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, 
start, end, pending, doneMatd, lastActionNum);
-        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
-        coordJob.setJobXml(appXml);
-
-        try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
-            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
-            jpaService.execute(coordInsertCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Unable to insert the test coord job record to table");
-            throw je;
-        }
-
-        return coordJob;
-    }
-
-    protected String getCoordJobXmlForWaiting(String testFileName, String 
testDir) {
-        try {
-            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
-            String appXml = IOUtils.getReaderAsString(reader, -1);
-            appXml = appXml.replaceAll("#testDir", testDir);
-            return appXml;
-        }
-        catch (IOException ioe) {
-            throw new RuntimeException(XLog.format("Could not get " + 
testFileName, ioe));
-        }
-    }
-
-    protected String getProcessingTZ() {
-        return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
-    }
-
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 Thu Jan  3 17:23:45 2013
@@ -199,8 +199,6 @@ public class TestPartitionDependencyMana
         assertFalse(actions.getActions().contains(actionId2));
     }
 
-
-
     /**
      * Test removal of partitions from Available map
      */
@@ -248,5 +246,26 @@ public class TestPartitionDependencyMana
         }
     }
 
+    /**
+     * Test table available from the Map
+     * @throws URISyntaxException
+     * @throws MetadataServiceException
+     */
+    @Test
+    public void testMapContainsTable() throws URISyntaxException, 
MetadataServiceException{
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+        jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
+        String newHCatDependency1 = 
"hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
+
+        // +ve test
+        pdms.addMissingPartition(newHCatDependency1, "1");
+        assertTrue(pdms.containsTable(newHCatDependency1));
+        // -ve test
+        pdms.removePartition(newHCatDependency1, true);
+        assertFalse(pdms.containsTable(newHCatDependency1));
+
+    }
+
 
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
 Thu Jan  3 17:23:45 2013
@@ -25,8 +25,13 @@ import java.io.PrintWriter;
 import java.io.Reader;
 import java.io.StringReader;
 import java.io.Writer;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -52,6 +57,9 @@ import org.apache.oozie.client.WorkflowJ
 import org.apache.oozie.client.CoordinatorJob.Execution;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
@@ -66,6 +74,7 @@ import org.apache.oozie.store.WorkflowSt
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.XmlUtils;
@@ -73,15 +82,18 @@ import org.apache.oozie.workflow.Workflo
 
 public class TestRecoveryService extends XDataTestCase {
     private Services services;
+    private String server;
 
     @Override
     protected void setUp() throws Exception {
         super.setUp();
+        server = getMetastoreAuthority();
         setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS, 
"wf-ext-schema.xsd");
         services = new Services();
         services.init();
         cleanUpDBTables();
         
services.get(ActionService.class).register(ForTestingActionExecutor.class);
+
     }
 
     @Override
@@ -186,9 +198,9 @@ public class TestRecoveryService extends
         store3.commitTrx();
         store3.closeTrx();
     }
-    
+
     /**
-     * Tests functionality of the Recovery Service Runnable command. </p> 
Starts an action with USER_RETRY status. 
+     * Tests functionality of the Recovery Service Runnable command. </p> 
Starts an action with USER_RETRY status.
      * Runs the recovery runnable, and ensures the state changes to OK and the 
job completes successfully.
      *
      * @throws Exception
@@ -197,11 +209,11 @@ public class TestRecoveryService extends
         final JPAService jpaService = Services.get().get(JPAService.class);
         WorkflowJobBean job = 
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING);
         WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), 
"1", WorkflowAction.Status.USER_RETRY);
-        
+
         Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60);
         recoveryRunnable.run();
         sleep(3000);
-        
+
         final WorkflowActionGetJPAExecutor wfActionGetCmd = new 
WorkflowActionGetJPAExecutor(action.getId());
 
         waitFor(5000, new Predicate() {
@@ -233,7 +245,7 @@ public class TestRecoveryService extends
         assertTrue(launcherJob.isSuccessful());
         assertTrue(LauncherMapper.hasIdSwap(launcherJob));
     }
-    
+
 
     /**
      * Tests functionality of the Recovery Service Runnable command. </p> 
Insert a coordinator job with RUNNING and
@@ -294,11 +306,8 @@ public class TestRecoveryService extends
      */
     public void testCoordActionRecoveryServiceForWaiting() throws Exception {
 
-        String currentDatePlusMonth = 
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
-        Date startTime = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
-        Date endTime = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
         CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
-                CoordinatorJob.Status.RUNNING, startTime, endTime, false, 
true, 0);
+                CoordinatorJob.Status.RUNNING, false, true);
 
         CoordinatorActionBean action = 
addRecordToCoordActionTableForWaiting(job.getId(), 1,
                 CoordinatorAction.Status.WAITING, 
"coord-action-for-action-input-check.xml");
@@ -332,6 +341,109 @@ public class TestRecoveryService extends
         }
     }
 
+
+    public void testCoordActionRecoveryServiceForWaitingRegisterPartition() 
throws Exception {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        services.getConf().set(URIHandlerService.URI_HANDLERS,
+                FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
+        services.init();
+
+        String db = "default";
+        String table = "tablename";
+
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + 
table + "/dt=20120412;country=brazil";
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + 
table + "/dt=20120430;country=usa";
+        String newHCatDependency = newHCatDependency1 + 
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+        String actionId = addInitRecords(newHCatDependency);
+
+        CoordinatorAction ca = checkCoordActionDependencies(actionId, 
newHCatDependency, 0);
+        assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+
+        sleep(2000);
+
+        Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
+        recoveryRunnable.run();
+
+        sleep(2000);
+
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+
+        // Recovery service should have created the partitions and JMS 
Connection should exist
+        assertTrue(jmsService.isExistsConnection("hcat://" + server));
+        assertTrue(pdms.containsPartition(newHCatDependency1));
+        assertTrue(pdms.containsPartition(newHCatDependency2));
+
+    }
+
+    public void testCoordActionRecoveryServiceForWaitingPushMissingDeps() 
throws Exception {
+        services.destroy();
+        services = new Services();
+        Configuration conf = services.getConf();
+        conf.set(URIHandlerService.URI_HANDLERS,
+                FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
+        conf.setLong(RecoveryService.CONF_PUSH_DEPENDENCY_DELAY, 10);
+
+        services.init();
+
+        String db = "default";
+        String table = "tablename";
+
+
+        String newHCatDependency1 = "hcat://" + server + "/" + db + "/" + 
table + "/dt=20120412;country=brazil";
+        String newHCatDependency2 = "hcat://" + server + "/" + db + "/" + 
table + "/dt=20120430;country=usa";
+        String newHCatDependency = newHCatDependency1 + 
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+        populateTable(db, table);
+        String actionId = addInitRecords(newHCatDependency);
+
+        CoordinatorAction ca = checkCoordActionDependencies(actionId, 
newHCatDependency, 0);
+        assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+
+        sleep(3000);
+
+        Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
+        recoveryRunnable.run();
+
+        sleep(3000);
+
+        ca = checkCoordActionDependencies(actionId, "", 0);
+        assertFalse(ca.getStatus().equals(CoordinatorAction.Status.WAITING));
+    }
+
+    private void populateTable(String db, String table) throws Exception {
+        dropTable(db, table, true);
+        dropDatabase(db, true);
+        createDatabase(db);
+        createTable(db, table, "dt,country");
+        addPartition(db, table, "dt=20120430;country=usa");
+        addPartition(db, table, "dt=20120412;country=brazil");
+        addPartition(db, table, "dt=20120413;country=brazil");
+    }
+
+
+    private CoordinatorActionBean checkCoordActionDependencies(String 
actionId, String expDeps,
+            int type) throws Exception {
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            CoordinatorActionBean action = jpaService.execute(new 
CoordActionGetJPAExecutor(actionId));
+            String missDeps = action.getPushMissingDependencies();
+            if (type != 0) {
+                assertEquals(new PartitionWrapper(missDeps), new 
PartitionWrapper(expDeps));
+            }
+            else {
+                assertEquals(missDeps, expDeps);
+            }
+
+            return action;
+        }
+        catch (JPAExecutorException se) {
+            throw new Exception("Action ID " + actionId + " was not stored 
properly in db");
+        }
+    }
+
     /**
      * Tests functionality of the Recovery Service Runnable command. </p> 
Insert a coordinator job with SUSPENDED and
      * action with SUSPENDED and workflow with RUNNING. Then, runs the 
recovery runnable and ensures the workflow status changes to SUSPENDED.
@@ -441,29 +553,6 @@ public class TestRecoveryService extends
         assertEquals(WorkflowJob.Status.RUNNING, ret.getStatus());
     }
 
-    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status, Date start, Date end,
-            boolean pending, boolean doneMatd, int lastActionNum) throws 
Exception {
-
-        String testDir = getTestCaseDir();
-        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, 
start, end, pending, doneMatd, lastActionNum);
-        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
-        coordJob.setJobXml(appXml);
-
-        try {
-            JPAService jpaService = Services.get().get(JPAService.class);
-            assertNotNull(jpaService);
-            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
-            jpaService.execute(coordInsertCmd);
-        }
-        catch (JPAExecutorException je) {
-            je.printStackTrace();
-            fail("Unable to insert the test coord job record to table");
-            throw je;
-        }
-
-        return coordJob;
-    }
-
     protected CoordinatorActionBean 
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
             CoordinatorAction.Status status, String resourceXmlName) throws 
Exception {
         CoordinatorActionBean action = createCoordAction(jobId, actionNum, 
status, resourceXmlName, 0);
@@ -500,18 +589,6 @@ public class TestRecoveryService extends
         }
     }
 
-    protected String getCoordJobXmlForWaiting(String testFileName, String 
testDir) {
-        try {
-            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
-            String appXml = IOUtils.getReaderAsString(reader, -1);
-            appXml = appXml.replaceAll("#testDir", testDir);
-            return appXml;
-        }
-        catch (IOException ioe) {
-            throw new RuntimeException(XLog.format("Could not get "+ 
testFileName, ioe));
-        }
-    }
-
     private void addRecordToActionTable(String jobId, int actionNum, String 
actionId, CoordinatorStore store, String baseDir) throws StoreException, 
IOException {
         CoordinatorActionBean action = new CoordinatorActionBean();
         action.setJobId(jobId);
@@ -699,7 +776,7 @@ public class TestRecoveryService extends
             throw se;
         }
     }
-    
+
     @Override
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String 
actionName, WorkflowAction.Status status)
             throws Exception {

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
 Thu Jan  3 17:23:45 2013
@@ -1269,4 +1269,65 @@ public abstract class XDataTestCase exte
         return DateUtils.formatDateOozieTZ(currentDate);
     }
 
+    protected String addInitRecords(String pushMissingDependencies) throws 
Exception {
+        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+                CoordinatorJob.Status.RUNNING, false, true);
+
+        CoordinatorActionBean action1 = 
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+                CoordinatorAction.Status.WAITING, 
"coord-action-for-action-input-check.xml", pushMissingDependencies);
+        return action1.getId();
+    }
+
+    protected CoordinatorActionBean 
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+            CoordinatorAction.Status status, String resourceXmlName, String 
pushMissingDependencies) throws Exception {
+        CoordinatorActionBean action = createCoordAction(jobId, actionNum, 
status, resourceXmlName, 0);
+        action.setPushMissingDependencies(pushMissingDependencies);
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordActionInsertJPAExecutor coordActionInsertCmd = new 
CoordActionInsertJPAExecutor(action);
+            jpaService.execute(coordActionInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord action record to table");
+            throw je;
+        }
+        return action;
+    }
+
+    protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status,
+             boolean pending, boolean doneMatd) throws Exception {
+
+        String testDir = getTestCaseDir();
+        CoordinatorJobBean coordJob = createCoordJob(status, pending, 
doneMatd);
+        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+        coordJob.setJobXml(appXml);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
+            jpaService.execute(coordInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord job record to table");
+            throw je;
+        }
+
+        return coordJob;
+    }
+
+    protected String getCoordJobXmlForWaiting(String testFileName, String 
testDir) {
+        try {
+            Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+            String appXml = IOUtils.getReaderAsString(reader, -1);
+            appXml = appXml.replaceAll("#testDir", testDir);
+            return appXml;
+        }
+        catch (IOException ioe) {
+            throw new RuntimeException(XLog.format("Could not get " + 
testFileName, ioe));
+        }
+    }
 }

Modified: oozie/branches/hcat-intre/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Jan  3 17:23:45 2013
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1145 Modify Recovery Service to handle push missing dependencies (virag)
 OOZIE-1135 Display missing partition dependencies via job -info command on CLI 
(mona)
 OOZIE-1125 Prepare actions for hcat (rohini via virag)
 OOZIE-1123 EL Functions for hcatalog (mona)


Reply via email to