Author: virag
Date: Thu Jan 10 20:37:15 2013
New Revision: 1431622

URL: http://svn.apache.org/viewvc?rev=1431622&view=rev
Log:
OOZIE-1156 Make all the latest/future instances as pull dependences (virag)

Added:
    
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
    
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
Modified:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
    oozie/branches/hcat-intre/release-log.txt

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
 Thu Jan 10 20:37:15 2013
@@ -6,14 +6,17 @@ import java.util.Date;
 import java.util.List;
 
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
 import org.apache.oozie.coord.CoordELFunctions;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.MetadataServiceException;
@@ -21,12 +24,14 @@ import org.apache.oozie.service.Partitio
 import org.apache.oozie.service.Services;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.StatusUtils;
 
 public class CoordActionUpdatePushMissingDependency extends 
CoordinatorXCommand<Void> {
 
     private String actionId;
     private JPAService jpaService = null;
     private CoordinatorActionBean coordAction = null;
+    private CoordinatorJobBean coordJob = null;
 
     public CoordActionUpdatePushMissingDependency(String actionId) {
         super("coord_action_push_md", "coord_action_push_md", 0);
@@ -158,12 +163,16 @@ public class CoordActionUpdatePushMissin
         return getName() + "_" + actionId;
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#eagerLoadState()
+     */
     @Override
-    protected void loadState() throws CommandException {
+    protected void eagerLoadState() throws CommandException {
         try {
             jpaService = Services.get().get(JPAService.class);
             if (jpaService != null) {
                 coordAction = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(actionId));
+                coordJob = jpaService.execute(new 
CoordJobGetJPAExecutor(coordAction.getJobId()));
                 LogUtils.setLogInfo(coordAction, logInfo);
             }
             else {
@@ -176,14 +185,47 @@ public class CoordActionUpdatePushMissin
 
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
+     */
     @Override
-    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
+    protected void eagerVerifyPrecondition() throws CommandException, 
PreconditionException {
         if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
             throw new PreconditionException(ErrorCode.E1100, "[" + actionId
-                    + "]::CoordActionInputCheck:: Ignoring action. Should be 
in WAITING state, but state="
+                    + "]::CoordActionUpdatePushDependency:: Ignoring action. 
Should be in WAITING state, but state="
                     + coordAction.getStatus());
         }
-        // TODO: check the parent coordinator job?
+        // if eligible to do action input check when running with backward
+        // support is true
+        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+            return;
+        }
+
+        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() 
!= Job.Status.RUNNINGWITHERROR
+                && coordJob.getStatus() != Job.Status.PAUSED && 
coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordActionUpdatePushDependency:: Ignoring action."
+                    + " Coordinator job is not in 
RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+                    + coordJob.getStatus());
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+     */
+    @Override
+    protected void verifyPrecondition() throws CommandException, 
PreconditionException {
     }
 
 }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 Thu Jan 10 20:37:15 2013
@@ -286,11 +286,11 @@ public class CoordCommandUtils {
      * @param instances
      * @throws Exception
      */
-    public static StringBuffer separateResolvedAndUnresolved(Element event, 
StringBuilder instances)
+    public static StringBuilder separateResolvedAndUnresolved(Element event, 
StringBuilder instances)
             throws Exception {
         StringBuilder unresolvedInstances = new StringBuilder();
         StringBuilder urisWithDoneFlag = new StringBuilder();
-        StringBuffer depList = new StringBuffer();
+        StringBuilder depList = new StringBuilder();
         String uris = createEarlyURIs(event, instances.toString(), 
unresolvedInstances, urisWithDoneFlag);
         if (uris.length() > 0) {
             Element uriInstance = new Element("uris", event.getNamespace());
@@ -426,28 +426,20 @@ public class CoordCommandUtils {
         
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
         
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
 
-        HashMap<String, StringBuffer> dependencyList = new HashMap<String, 
StringBuffer>();
-        StringBuffer pushDepsList = new StringBuffer();
-        StringBuffer pullDepsList = new StringBuffer();
-        dependencyList.put("push", pushDepsList);
-        dependencyList.put("pull", pullDepsList);
+        Map<String, StringBuilder> dependencyMap = new HashMap<String, 
StringBuilder>();
 
         Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
         List<Element> dataInList = null;
         if (inputList != null) {
             dataInList = inputList.getChildren("data-in", 
eAction.getNamespace());
-            materializeDataEvents(dataInList, appInst, conf, dependencyList);
+            dependencyMap = materializeDataEvents(dataInList, appInst, conf);
         }
 
         Element outputList = eAction.getChild("output-events", 
eAction.getNamespace());
         List<Element> dataOutList = null;
         if (outputList != null) {
             dataOutList = outputList.getChildren("data-out", 
eAction.getNamespace());
-            // no dependency checks
-            HashMap<String, StringBuffer> emptyDepsList = new HashMap<String, 
StringBuffer>();
-            emptyDepsList.put("pull", new StringBuffer());
-            emptyDepsList.put("push", new StringBuffer());
-            materializeDataEvents(dataOutList, appInst, conf, emptyDepsList);
+            materializeDataEvents(dataOutList, appInst, conf);
         }
 
         eAction.removeAttribute("start");
@@ -469,8 +461,14 @@ public class CoordCommandUtils {
         actionBean.setLastModifiedTime(new Date());
         actionBean.setStatus(CoordinatorAction.Status.WAITING);
         actionBean.setActionNumber(instanceCount);
-        
actionBean.setMissingDependencies(dependencyList.get("pull").toString());
-        
actionBean.setPushMissingDependencies(dependencyList.get("push").toString());
+        StringBuilder sbPull = dependencyMap.get("pull");
+        if (sbPull != null) {
+            actionBean.setMissingDependencies(sbPull.toString());
+        }
+        StringBuilder sbPush = dependencyMap.get("push");
+        if (sbPush != null) {
+            actionBean.setPushMissingDependencies(sbPush.toString());
+        }
         actionBean.setNominalTime(nominalTime);
         if (isSla == true) {
             actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -511,16 +509,14 @@ public class CoordCommandUtils {
      * @param conf
      * @throws Exception
      */
-    public static void materializeDataEvents(List<Element> events, 
SyncCoordAction appInst, Configuration conf,
-            Map<String, StringBuffer> dependencyList) throws Exception {
+    public static Map<String, StringBuilder> 
materializeDataEvents(List<Element> events, SyncCoordAction appInst, 
Configuration conf
+            ) throws Exception {
 
         if (events == null) {
-            return;
+            return null;
         }
-        HashMap<String, StringBuffer> unresolvedList = new HashMap<String, 
StringBuffer>();
-        unresolvedList.put("push", new StringBuffer());
-        unresolvedList.put("pull", new StringBuffer());
-
+        StringBuilder unresolvedList = new StringBuilder();
+        Map<String, StringBuilder> dependencyMap = new HashMap<String, 
StringBuilder>();
         URIHandlerService uriService = 
Services.get().get(URIHandlerService.class);
 
         for (Element event : events) {
@@ -535,29 +531,31 @@ public class CoordCommandUtils {
             // Handle start-instance and end-instance
             resolveInstanceRange(event, instances, appInst, conf, eval);
             // Separate out the unresolved instances
-            StringBuffer depList = separateResolvedAndUnresolved(event, 
instances);
+            StringBuilder depList = separateResolvedAndUnresolved(event, 
instances);
             URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
             URIHandler handler = uriService.getURIHandler(baseURI);
             if 
(handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
                 pullOrPush = "push";
             }
-            dependencyList.put(pullOrPush, depList);
+            dependencyMap.put(pullOrPush, depList);
             String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, 
event.getNamespace());
             if (tmpUnresolved != null) {
-                if (unresolvedList.get(pullOrPush).length() > 0) {
-                    
unresolvedList.get(pullOrPush).append(CoordELFunctions.INSTANCE_SEPARATOR);
+                if (unresolvedList.length() > 0) {
+                    unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
                 }
-                unresolvedList.get(pullOrPush).append(tmpUnresolved);
+                unresolvedList.append(tmpUnresolved);
             }
         }
-        if (unresolvedList.get("push").length() > 0) {
-            dependencyList.get("push").append(RESOLVED_UNRESOLVED_SEPARATOR);
-            dependencyList.get("push").append(unresolvedList.get("push"));
-        }
-        if (unresolvedList.get("pull").length() > 0) {
-            dependencyList.get("pull").append(RESOLVED_UNRESOLVED_SEPARATOR);
-            dependencyList.get("pull").append(unresolvedList.get("pull"));
+        if (unresolvedList.length() > 0) {
+            StringBuilder sb = dependencyMap.get("pull");
+            if (sb == null) {
+                dependencyMap.put("pull", new 
StringBuilder(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList));
+            }
+            else {
+                
sb.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
+            }
         }
+        return dependencyMap;
     }
 
     /**
@@ -589,19 +587,14 @@ public class CoordCommandUtils {
      * @throws Exception
      */
     public static void registerPartition(CoordinatorActionBean actionBean) 
throws MetadataServiceException {
-
-        String resolved = 
getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
-                new StringBuilder());
-        if (resolved.length() == 0)
-            return;
-        String[] resolvedList = 
resolved.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+        String missDeps = actionBean.getPushMissingDependencies();
+        String[] missDepsList = 
missDeps.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
         PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
 
         // always register action ID to missing partition in PDMS before
         // asking HCat to avoid corner case where JMS notification msg
         // arrives while asking HCat for existence of partition
-        if (resolvedList != null && resolvedList.length > 0) {
-            pdms.addMissingPartitions(resolvedList, actionBean.getId());
+            pdms.addMissingPartitions(missDepsList, actionBean.getId());
             // after Hcat answers, two things need to be done
             // 1. if partition exists, remove actionId from missing
             // partition in PDMS
@@ -610,10 +603,5 @@ public class CoordCommandUtils {
             // 2. if partition not exists, no-op
             // we probably delegate these tasks to other separate command,
             // so end up with queuing the new command
-        }
-        else {
-            XLog.getLog(CoordCommandUtils.class).info("no resolved push data 
dependency");
-        }
-
     }
 }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 Thu Jan 10 20:37:15 2013
@@ -112,8 +112,10 @@ public class CoordMaterializeTransitionX
             for (JsonBean actionBean : insertList) {
                 if (actionBean instanceof CoordinatorActionBean) {
                     CoordinatorActionBean coordAction = 
(CoordinatorActionBean) actionBean;
-                    CoordCommandUtils.registerPartition(coordAction);
-                    queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()));
+                    if (coordAction.getPushMissingDependencies() != null) {
+                        CoordCommandUtils.registerPartition(coordAction);
+                        queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()));
+                    }
                 }
             }
         }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
 Thu Jan 10 20:37:15 2013
@@ -22,15 +22,18 @@ import java.io.StringReader;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.PreconditionException;
@@ -38,20 +41,25 @@ import org.apache.oozie.coord.CoordELFun
 import org.apache.oozie.dependency.URIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
 import 
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Service;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.URIAccessorException;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.StatusUtils;
 import org.apache.oozie.util.XConfiguration;
 
 public class CoordPushDependencyCheckXCommand extends 
CoordinatorXCommand<Void> {
     private String actionId;
     private JPAService jpaService = null;
     private CoordinatorActionBean coordAction = null;
+    private CoordinatorJobBean coordJob = null;
 
     /**
      * Property name of command re-queue interval for coordinator push check in
@@ -88,12 +96,16 @@ public class CoordPushDependencyCheckXCo
         }
         String user = 
ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME), 
OozieClient.USER_NAME);
         List<String> missingDeps = getMissingDependencies(pushDepList, 
actionConf, user);
+        List<String> availableDepList = new 
ArrayList<String>(Arrays.asList(pushDepList));
+        availableDepList.removeAll(missingDeps);
+
         if (missingDeps.size() > 0) {
             pushDeps = StringUtils.join(missingDeps, 
CoordELFunctions.INSTANCE_SEPARATOR);
             coordAction.setPushMissingDependencies(pushDeps);
             // Checking for timeout
             if (!isTimeout()) {
-                queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()), 
getCoordPushCheckRequeueInterval());
+                queue(new 
CoordPushDependencyCheckXCommand(coordAction.getId()),
+                        getCoordPushCheckRequeueInterval());
             }
             else {
                 queue(new CoordActionTimeOutXCommand(coordAction), 100);
@@ -107,7 +119,8 @@ public class CoordPushDependencyCheckXCo
                 queue(new CoordActionReadyXCommand(coordAction.getJobId()), 
100);
             }
         }
-        updateCoordAction(coordAction);
+
+        updateCoordAction(coordAction, availableDepList);
         return null;
     }
 
@@ -143,15 +156,26 @@ public class CoordPushDependencyCheckXCo
         return missingDeps;
     }
 
-    private void updateCoordAction(CoordinatorActionBean coordAction2) throws 
CommandException {
+    private void updateCoordAction(CoordinatorActionBean coordAction2, 
List<String> availPartitionList) throws CommandException {
         coordAction.setLastModifiedTime(new Date());
         if (jpaService != null) {
             try {
                 jpaService.execute(new 
CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+                PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
+                if (pdms.removeAvailablePartitions(
+                        
PartitionDependencyManagerService.createPartitionWrappers(availPartitionList), 
actionId)) {
+                    LOG.debug("Succesfully removed partitions for actionId: 
[{0}] from available Map ", actionId);
+                }
+                else {
+                    LOG.warn("Unable to remove partitions for actionId: [{0}] 
from available Map ", actionId);
+                }
             }
             catch (JPAExecutorException jex) {
                 throw new CommandException(ErrorCode.E1023, jex.getMessage(), 
jex);
             }
+            catch (MetadataServiceException e) {
+                throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
+            }
         }
     }
 
@@ -173,9 +197,8 @@ public class CoordPushDependencyCheckXCo
      * @see org.apache.oozie.command.XCommand#getEntityKey()
      */
     @Override
-    // TODO - Check whether the entityKey should be JobId or actionId
     public String getEntityKey() {
-        return actionId;
+        return coordAction.getJobId();
     }
 
     /* (non-Javadoc)
@@ -196,18 +219,17 @@ public class CoordPushDependencyCheckXCo
         return true;
     }
 
-    /*
-     * (non-Javadoc)
-     *
-     * @see org.apache.oozie.command.XCommand#loadState()
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#eagerLoadState()
      */
     @Override
-    protected void loadState() throws CommandException {
+    protected void eagerLoadState() throws CommandException {
         try {
             jpaService = Services.get().get(JPAService.class);
 
             if (jpaService != null) {
                 coordAction = jpaService.execute(new 
CoordActionGetForInputCheckJPAExecutor(actionId));
+                coordJob = jpaService.execute(new 
CoordJobGetJPAExecutor(coordAction.getJobId()));
                 LogUtils.setLogInfo(coordAction, logInfo);
             }
             else {
@@ -219,6 +241,41 @@ public class CoordPushDependencyCheckXCo
         }
     }
 
+    /* (non-Javadoc)
+     * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
+     */
+    @Override
+    protected void eagerVerifyPrecondition() throws CommandException, 
PreconditionException {
+        if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordPushDependencyCheck:: Ignoring action. Should 
be in WAITING state, but state="
+                    + coordAction.getStatus());
+        }
+
+        // if eligible to do action input check when running with backward
+        // support is true
+        if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+            return;
+        }
+
+        if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus() 
!= Job.Status.RUNNINGWITHERROR
+                && coordJob.getStatus() != Job.Status.PAUSED && 
coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+            throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+                    + "]::CoordPushDependencyCheck:: Ignoring action."
+                    + " Coordinator job is not in 
RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+                    + coordJob.getStatus());
+        }
+    }
+
+    /*
+     * (non-Javadoc)
+     *
+     * @see org.apache.oozie.command.XCommand#loadState()
+     */
+    @Override
+    protected void loadState() throws CommandException {
+    }
+
     /*
      * (non-Javadoc)
      *
@@ -226,9 +283,6 @@ public class CoordPushDependencyCheckXCo
      */
     @Override
     protected void verifyPrecondition() throws CommandException, 
PreconditionException {
-        if (coordAction.getStatus().equals(CoordinatorAction.Status.WAITING) 
== false) {
-            throw new PreconditionException(ErrorCode.E1100);
-        }
     }
 
 }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 Thu Jan 10 20:37:15 2013
@@ -18,6 +18,7 @@
 package org.apache.oozie.service;
 
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
@@ -31,6 +32,7 @@ import com.googlecode.concurrentlinkedha
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
 import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
 import org.apache.oozie.jms.HCatMessageHandler;
 import org.apache.oozie.jms.MessageReceiver;
@@ -589,4 +591,17 @@ public class PartitionDependencyManagerS
         }
     }
 
+    public static List<PartitionWrapper> createPartitionWrappers(List<String> 
partitions) throws CommandException {
+        List<PartitionWrapper> ret = new ArrayList<PartitionWrapper>();
+        for (String partURI : partitions) {
+            try {
+                ret.add(new PartitionWrapper(partURI));
+            }
+            catch (URISyntaxException e) {
+                throw new CommandException(ErrorCode.E1025, e);
+            }
+        }
+        return ret;
+    }
+
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
 Thu Jan 10 20:37:15 2013
@@ -22,9 +22,13 @@ import java.util.Date;
 import java.util.List;
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
 import org.apache.oozie.SLAEventBean;
 import org.apache.oozie.client.CoordinatorJob;
 import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
 import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
@@ -35,6 +39,7 @@ import org.apache.oozie.executor.jpa.SLA
 import org.apache.oozie.local.LocalOozie;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.DateUtils;
 
@@ -65,6 +70,48 @@ public class TestCoordMaterializeTransit
         checkCoordAction(job.getId() + "@1");
     }
 
+    public void testActionMaterForHcatalog() throws Exception {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        services.getConf().set(URIHandlerService.URI_HANDLERS,
+                FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
+        services.init();
+        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+                CoordinatorJob.Status.RUNNING, startTime, endTime, false, 
false, 0);
+        new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+        CoordinatorActionBean actionBean = getCoordAction(job.getId() + "@1");
+        assertEquals("file://dummyhdfs/2009/05/_SUCCESS" + 
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+                + "${coord:latestRange(-1,0)}", 
actionBean.getMissingDependencies());
+
+        assertEquals("hcat://dummyhcat:1000/db/table/ds=2009-12", 
actionBean.getPushMissingDependencies());
+    }
+
+
+    public void testActionMaterForHcatalogIncorrectURI() throws Exception {
+        services.destroy();
+        services = super.setupServicesForHCatalog();
+        services.getConf().set(URIHandlerService.URI_HANDLERS,
+                FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
+        services.init();
+        Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
+        Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
+        CoordinatorJobBean job = 
addRecordToCoordJobTableForWaiting("coord-job-for-matd-neg-hcat.xml",
+                CoordinatorJob.Status.RUNNING, startTime, endTime, false, 
false, 0);
+        try {
+            new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+            fail("Expected Command exception but didn't catch any");
+        }
+        catch (CommandException e) {
+            e.printStackTrace();
+            assertEquals(ErrorCode.E1001, e.getErrorCode());
+        }
+        catch (Exception e) {
+            fail("Unexpected exception " + e.getMessage());
+        }
+    }
+
     public void testActionMaterWithPauseTime1() throws Exception {
         Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
         Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
@@ -240,6 +287,13 @@ public class TestCoordMaterializeTransit
         return actionBean;
     }
 
+    private CoordinatorActionBean getCoordAction(String actionId) throws 
JPAExecutorException {
+        JPAService jpaService = Services.get().get(JPAService.class);
+        CoordinatorActionBean actionBean;
+        actionBean = jpaService.execute(new 
CoordActionGetJPAExecutor(actionId));
+        return actionBean;
+    }
+
     private CoordinatorJob.Status getStatus(String jobId){
         CoordinatorJob job = null;
         try {
@@ -284,4 +338,27 @@ public class TestCoordMaterializeTransit
             fail("Action ID " + actionId + " was not stored properly in db");
         }
     }
+
+    private CoordinatorJobBean addRecordToCoordJobTableForWaiting(String 
testFileName, CoordinatorJob.Status status,
+            Date start, Date end, boolean pending, boolean doneMatd, int 
lastActionNum) throws Exception {
+
+        String testDir = getTestCaseDir();
+        CoordinatorJobBean coordJob = createCoordJob(testFileName, status, 
start, end, pending, doneMatd, lastActionNum);
+        String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+        coordJob.setJobXml(appXml);
+
+        try {
+            JPAService jpaService = Services.get().get(JPAService.class);
+            assertNotNull(jpaService);
+            CoordJobInsertJPAExecutor coordInsertCmd = new 
CoordJobInsertJPAExecutor(coordJob);
+            jpaService.execute(coordInsertCmd);
+        }
+        catch (JPAExecutorException je) {
+            je.printStackTrace();
+            fail("Unable to insert the test coord job record to table");
+            throw je;
+        }
+
+        return coordJob;
+    }
 }

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
 Thu Jan 10 20:37:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.service.Services
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.PartitionWrapper;
+import org.apache.hadoop.conf.Configuration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -41,8 +42,10 @@ public class TestCoordPushDependencyChec
     protected void setUp() throws Exception {
         super.setUp();
         services = new Services();
-        services.getConf().set(URIHandlerService.URI_HANDLERS,
+        Configuration conf = services.getConf();
+        conf.set(URIHandlerService.URI_HANDLERS,
                 FSURIHandler.class.getName() + "," + 
HCatURIHandler.class.getName());
+        conf.set(Services.CONF_SERVICE_EXT_CLASSES, 
"org.apache.oozie.service.PartitionDependencyManagerService");
         services.init();
         server = getMetastoreAuthority();
     }
@@ -138,7 +141,7 @@ public class TestCoordPushDependencyChec
                 assertEquals(new PartitionWrapper(missDeps), new 
PartitionWrapper(expDeps));
             }
             else {
-                assertEquals(missDeps, expDeps);
+                assertEquals(expDeps, missDeps);
             }
             assertEquals(action.getStatus(), stat);
 

Added: 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml?rev=1431622&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml 
(added)
+++ 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml 
Thu Jan 10 20:37:15 2013
@@ -0,0 +1,64 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+ 
+       http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<coordinator-app xmlns='uri:oozie:coordinator:0.2' name='NAME'
+       frequency="1" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z'
+       timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>
+       <controls>
+               <timeout>10</timeout>
+               <concurrency>2</concurrency>
+               <execution>LIFO</execution>
+       </controls>
+       <input-events>
+               <data-in name='A' dataset='a'>
+                       <dataset name='a' frequency='7' 
initial-instance='2009-01-01T01:00Z'
+                               timezone='UTC' freq_timeunit='DAY' 
end_of_duration='NONE'>
+                               
<uri-template>hcat://dummyhcat:1000/db/table/ds=${YEAR}-${DAY}
+                               </uri-template>
+                       </dataset>
+                       <instance>${coord:current(-3)}</instance>
+               </data-in>
+               <data-in name='B' dataset='b'>
+                       <dataset name='b' frequency='7' 
initial-instance='2009-01-01T01:00Z'
+                               timezone='UTC' freq_timeunit='DAY' 
end_of_duration='NONE'>
+                               
<uri-template>hcat://dummyhcat:1000/db/table/ds=${YEAR}-${DAY}
+                               </uri-template>
+                       </dataset>
+                       <start-instance>${coord:latest(-1)}</start-instance>
+                       <end-instance>${coord:latest(0)}</end-instance>
+               </data-in>
+               <data-in name='C' dataset='c'>
+                       <dataset name='c' frequency='7' 
initial-instance='2009-01-01T01:00Z'
+                               timezone='UTC' freq_timeunit='DAY' 
end_of_duration='NONE'>
+                               <uri-template>file://dummyhdfs/${YEAR}/${DAY}
+                               </uri-template>
+                       </dataset>
+                       <instance>${coord:current(0)}</instance>
+               </data-in>
+       </input-events>
+       <action>
+               <workflow>
+                       <app-path>hdfs:///tmp/workflows/</app-path>
+                       <configuration>
+                               <property>
+                                       <name>inputA</name>
+                                       <value>${coord:dataIn('A')}</value>
+                               </property>
+                       </configuration>
+               </workflow>
+       </action>
+</coordinator-app>

Added: 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml?rev=1431622&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
 (added)
+++ 
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
 Thu Jan 10 20:37:15 2013
@@ -0,0 +1,47 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<coordinator-app xmlns='uri:oozie:coordinator:0.2' name='NAME'
+       frequency="1" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z'
+       timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>
+       <controls>
+               <timeout>10</timeout>
+               <concurrency>2</concurrency>
+               <execution>LIFO</execution>
+       </controls>
+       <input-events>
+               <data-in name='A' dataset='a'>
+                       <dataset name='a' frequency='7' 
initial-instance='2009-01-01T01:00Z'
+                               timezone='UTC' freq_timeunit='DAY' 
end_of_duration='NONE'>
+                               
<uri-template>hcat://dummyhcat:1000/table/ds=${YEAR}/${DAY};region=us
+                               </uri-template>
+                       </dataset>
+                       <instance>${coord:current(-3)}</instance>
+               </data-in>
+       </input-events>
+       <action>
+               <workflow>
+                       <app-path>hdfs:///tmp/workflows/</app-path>
+                       <configuration>
+                               <property>
+                                       <name>inputA</name>
+                                       <value>${coord:dataIn('A')}</value>
+                               </property>
+                       </configuration>
+               </workflow>
+       </action>
+</coordinator-app>

Modified: oozie/branches/hcat-intre/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Jan 10 20:37:15 2013
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1156 Make all the latest/future instances as pull dependences (virag)
 OOZIE-1144 OOZIE-1137 breaks the sharelib (rkanter)
 OOZIE-1035 Improve forkjoin validation to allow same errorTo transitions 
(rkanter)
 OOZIE-1137 In light of federation use actionLibPath instead of appPath (vaidya 
via rkanter)


Reply via email to