Author: kamrul
Date: Tue Nov 20 01:11:09 2012
New Revision: 1411494

URL: http://svn.apache.org/viewvc?rev=1411494&view=rev
Log:
OOZIE-1043 Add logic to register to Missing Dependency Structure in coord 
action materialization (ryota via mohammad)

Added:
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
Modified:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
    oozie/branches/hcat-intre/release-log.txt

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
 Tue Nov 20 01:11:09 2012
@@ -20,7 +20,9 @@ package org.apache.oozie.command.coord;
 import java.io.StringReader;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.CoordinatorActionBean;
@@ -33,13 +35,16 @@ import org.apache.oozie.coord.CoordUtils
 import org.apache.oozie.coord.CoordinatorJobException;
 import org.apache.oozie.coord.SyncCoordAction;
 import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.service.PartitionDependencyManagerService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.HCatURI;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
+import org.apache.oozie.util.XLog;
 
 public class CoordCommandUtils {
     public static int CURRENT = 0;
@@ -53,6 +58,7 @@ public class CoordCommandUtils {
     /**
      * parse a function like coord:latest(n)/future() and return the 'n'.
      * <p/>
+     *
      * @param function
      * @param event
      * @param appInst
@@ -198,8 +204,8 @@ public class CoordCommandUtils {
                 if (startCal != null && endCal != null) {
                     List<Integer> expandedFreqs = 
CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
                     for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
-                        String matInstance = materializeInstance(event, 
"${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
-                                                                    appInst, 
conf, eval);
+                        String matInstance = materializeInstance(event, 
"${coord:offset(" + expandedFreqs.get(i)
+                                + ", \"MINUTE\")}", appInst, conf, eval);
                         if (matInstance == null || matInstance.length() == 0) {
                             // Earlier than dataset's initial instance
                             break;
@@ -220,7 +226,8 @@ public class CoordCommandUtils {
                 if (funcType == CURRENT) {
                     // Everything could be resolved NOW. no latest() ELs
                     for (int i = endIndex; i >= startIndex; i--) {
-                        String matInstance = materializeInstance(event, 
"${coord:current(" + i + ")}", appInst, conf, eval);
+                        String matInstance = materializeInstance(event, 
"${coord:current(" + i + ")}", appInst, conf,
+                                eval);
                         if (matInstance == null || matInstance.length() == 0) {
                             // Earlier than dataset's initial instance
                             break;
@@ -240,7 +247,8 @@ public class CoordCommandUtils {
                             
instances.append("${coord:latest(").append(startIndex).append(")}");
                         }
                         else if (funcType == FUTURE) {
-                            
instances.append("${coord:future(").append(startIndex).append(",'").append(endRestArg).append("')}");
+                            
instances.append("${coord:future(").append(startIndex).append(",'").append(endRestArg)
+                                    .append("')}");
                         }
                     }
                 }
@@ -325,7 +333,7 @@ public class CoordCommandUtils {
         String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
 
         for (int i = 0; i < instanceList.length; i++) {
-            if(instanceList[i].trim().length() == 0) {
+            if (instanceList[i].trim().length() == 0) {
                 continue;
             }
             int funcType = getFuncType(instanceList[i]);
@@ -419,7 +427,11 @@ public class CoordCommandUtils {
         
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
         
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
 
-        StringBuffer dependencyList = new StringBuffer();
+        HashMap<String, StringBuffer> dependencyList = new HashMap<String, 
StringBuffer>();
+        StringBuffer pushDepsList = new StringBuffer();
+        StringBuffer pullDepsList = new StringBuffer();
+        dependencyList.put("push", pushDepsList);
+        dependencyList.put("pull", pullDepsList);
 
         Element inputList = eAction.getChild("input-events", 
eAction.getNamespace());
         List<Element> dataInList = null;
@@ -432,9 +444,11 @@ public class CoordCommandUtils {
         List<Element> dataOutList = null;
         if (outputList != null) {
             dataOutList = outputList.getChildren("data-out", 
eAction.getNamespace());
-            StringBuffer tmp = new StringBuffer();
             // no dependency checks
-            materializeDataEvents(dataOutList, appInst, conf, tmp);
+            HashMap<String, StringBuffer> emptyDepsList = new HashMap<String, 
StringBuffer>();
+            emptyDepsList.put("pull", new StringBuffer());
+            emptyDepsList.put("push", new StringBuffer());
+            materializeDataEvents(dataOutList, appInst, conf, emptyDepsList);
         }
 
         eAction.removeAttribute("start");
@@ -443,8 +457,9 @@ public class CoordCommandUtils {
         eAction.setAttribute("action-nominal-time", 
DateUtils.formatDateOozieTZ(nominalTime));
         eAction.setAttribute("action-actual-time", 
DateUtils.formatDateOozieTZ(actualTime));
 
-        boolean isSla = 
CoordCommandUtils.materializeSLA(eAction.getChild("action", 
eAction.getNamespace()).getChild(
-                "info", eAction.getNamespace("sla")), nominalTime, conf);
+        boolean isSla = CoordCommandUtils.materializeSLA(
+                eAction.getChild("action", 
eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")),
+                nominalTime, conf);
 
         // Setting up action bean
         actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -455,7 +470,8 @@ public class CoordCommandUtils {
         actionBean.setLastModifiedTime(new Date());
         actionBean.setStatus(CoordinatorAction.Status.WAITING);
         actionBean.setActionNumber(instanceCount);
-        actionBean.setMissingDependencies(dependencyList.toString());
+        
actionBean.setMissingDependencies(dependencyList.get("pull").toString());
+        
actionBean.setPushMissingDependencies(dependencyList.get("push").toString());
         actionBean.setNominalTime(nominalTime);
         if (isSla == true) {
             actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -473,7 +489,8 @@ public class CoordCommandUtils {
         }
         else {
             String action = XmlUtils.prettyPrint(eAction).toString();
-            CoordActionInputCheckXCommand coordActionInput = new 
CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
+            CoordActionInputCheckXCommand coordActionInput = new 
CoordActionInputCheckXCommand(actionBean.getId(),
+                    actionBean.getJobId());
             StringBuilder actionXml = new StringBuilder(action);
             StringBuilder existList = new StringBuilder();
             StringBuilder nonExistList = new StringBuilder();
@@ -496,13 +513,23 @@ public class CoordCommandUtils {
      * @throws Exception
      */
     public static void materializeDataEvents(List<Element> events, 
SyncCoordAction appInst, Configuration conf,
-            StringBuffer dependencyList) throws Exception {
+            Map<String, StringBuffer> dependencyList) throws Exception {
 
         if (events == null) {
             return;
         }
-        StringBuffer unresolvedList = new StringBuffer();
+        HashMap<String, StringBuffer> unresolvedList = new HashMap<String, 
StringBuffer>();
+        unresolvedList.put("push", new StringBuffer());
+        unresolvedList.put("pull", new StringBuffer());
+
         for (Element event : events) {
+            Element uri = event.getChild("dataset", event.getNamespace())
+                    .getChild("uri-template", event.getNamespace());
+            String pullOrPush = "pull";
+            String uriTemplate = uri.getText();
+            if (uriTemplate != null && HCatURI.isHcatURI(uriTemplate)) {
+                pullOrPush = "push";
+            }
             StringBuilder instances = new StringBuilder();
             ELEvaluator eval = 
CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
             // Handle list of instance tag
@@ -510,20 +537,23 @@ public class CoordCommandUtils {
             // Handle start-instance and end-instance
             resolveInstanceRange(event, instances, appInst, conf, eval);
             // Separate out the unresolved instances
-            separateResolvedAndUnresolved(event, instances, dependencyList);
+            separateResolvedAndUnresolved(event, instances, 
dependencyList.get(pullOrPush));
             String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG, 
event.getNamespace());
             if (tmpUnresolved != null) {
-                if (unresolvedList.length() > 0) {
-                    unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+                if (unresolvedList.get(pullOrPush).length() > 0) {
+                    
unresolvedList.get(pullOrPush).append(CoordELFunctions.INSTANCE_SEPARATOR);
                 }
-                unresolvedList.append(tmpUnresolved);
+                unresolvedList.get(pullOrPush).append(tmpUnresolved);
             }
         }
-        if (unresolvedList.length() > 0) {
-            dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
-            dependencyList.append(unresolvedList);
+        if (unresolvedList.get("push").length() > 0) {
+            dependencyList.get("push").append(RESOLVED_UNRESOLVED_SEPARATOR);
+            dependencyList.get("push").append(unresolvedList.get("push"));
+        }
+        if (unresolvedList.get("pull").length() > 0) {
+            dependencyList.get("pull").append(RESOLVED_UNRESOLVED_SEPARATOR);
+            dependencyList.get("pull").append(unresolvedList.get("pull"));
         }
-        return;
     }
 
     /**
@@ -548,4 +578,38 @@ public class CoordCommandUtils {
         return resolved.toString();
     }
 
+    /**
+     * Register partition to PartitionDependencyManagerService
+     *
+     * @param actionBean
+     * @throws Exception
+     */
+    public static void registerPartition(CoordinatorActionBean actionBean) 
throws Exception {
+
+        String resolved = 
getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
+                new StringBuilder());
+        if (resolved.length() == 0)
+            return;
+        String[] resolvedList = 
resolved.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+        PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
+
+        // always register action ID to missing partition in PDMS before
+        // asking HCat to avoid corner case where JMS notification msg
+        // arrives while asking HCat for existence of partition
+        if (resolvedList != null && resolvedList.length > 0) {
+            pdms.addMissingPartitions(resolvedList, actionBean.getId());
+            // after Hcat answers, two things need to be done
+            // 1. if partition exists, remove actionId from missing
+            // partition in PDMS
+            // --> pdms.removeActionFromMissingPartitions(uri,
+            // actionBean.getId());
+            // 2. if partition not exists, no-op
+            // we probably delegate these tasks to other separate command,
+            // so end up with queuing the new command
+        }
+        else {
+            XLog.getLog(CoordCommandUtils.class).info("no resolved push data 
dependency");
+        }
+
+    }
 }

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
 Tue Nov 20 01:11:09 2012
@@ -329,6 +329,7 @@ public class CoordMaterializeTransitionX
 
             if (!dryrun) {
                 storeToDB(actionBean, action); // Storing to table
+                CoordCommandUtils.registerPartition(actionBean); // Register 
partition to PDMS
             }
             else {
                 actionStrings.append("action for new instance");

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 Tue Nov 20 01:11:09 2012
@@ -52,14 +52,14 @@ public class PartitionDependencyManagerS
     private static XLog log;
 
     /*
-     * Top-level map key = concatenated identifier for hcatServer + hcatDB
-     * value = table map (key = tableName string, value = PartitionsGroup
+     * Top-level map key = concatenated identifier for hcatServer + hcatDB 
value
+     * = table map (key = tableName string, value = PartitionsGroup
      */
     private Map<String, Map<String, PartitionsGroup>> hcatInstanceMap;
 
     /*
-     * Map denoting actions and corresponding 'available' partitions
-     * key = coordinator actionId, value = available partitions
+     * Map denoting actions and corresponding 'available' partitions key =
+     * coordinator actionId, value = available partitions
      */
     private Map<String, List<PartitionWrapper>> availMap;
 
@@ -125,7 +125,36 @@ public class PartitionDependencyManagerS
         return ret;
     }
 
- /**
+    /**
+     * Remove an action from missing partition map
+     *
+     * @param hcatURI
+     * @param actionId
+     * @return
+     * @throws MetadataServiceException
+     */
+    public boolean removeActionFromMissingPartitions(String hcatURI, String 
actionId) throws MetadataServiceException {
+        boolean ret = false;
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1503, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        List<String> actions = _getActionsForPartition(partition);
+        if (actions != null && actions.size() != 0) {
+            ret = actions.remove(actionId);
+        }
+        else {
+            log.info("No waiting actions in the partition [{0}], no-ops", 
partition);
+        }
+        return ret;
+    }
+
+    /**
      * Adding missing partition entry specified by PartitionWrapper object
      *
      * @param partition
@@ -183,14 +212,21 @@ public class PartitionDependencyManagerS
         addMissingPartition(partition, actionId);
     }
 
+    public void addMissingPartitions(String[] hcatURIs, String actionId) 
throws MetadataServiceException {
+        for (String uri : hcatURIs) {
+            if (uri != null && uri.length() > 0) {
+                addMissingPartition(uri, actionId);
+            }
+        }
+    }
+
     /**
-     * Remove partition entry specified by PartitionWrapper object
-     * and cascading delete indicator
+     * Remove partition entry specified by PartitionWrapper object and 
cascading
+     * delete indicator
      *
      * @param partition
      * @param cascade
-     * @return true if partition was successfully removed
-     * false otherwise
+     * @return true if partition was successfully removed false otherwise
      */
     public boolean removePartition(PartitionWrapper partition, boolean 
cascade) {
         String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
@@ -227,13 +263,12 @@ public class PartitionDependencyManagerS
     }
 
     /**
-     * Remove partition entry specified by HCat URI and
-     * cascading delete indicator
+     * Remove partition entry specified by HCat URI and cascading delete
+     * indicator
      *
      * @param hcatURI
      * @param cascade
-     * @return true if partition was successfully removed
-     * false otherwise
+     * @return true if partition was successfully removed false otherwise
      * @throws MetadataServiceException
      */
     public boolean removePartition(String hcatURI, boolean cascade) throws 
MetadataServiceException {
@@ -250,12 +285,11 @@ public class PartitionDependencyManagerS
     }
 
     /**
-     * Remove partition entry specified by HCat URI with
-     * default cascade mode - TRUE
+     * Remove partition entry specified by HCat URI with default cascade mode -
+     * TRUE
      *
      * @param hcatURI
-     * @return true if partition was successfully removed
-     * false otherwise
+     * @return true if partition was successfully removed false otherwise
      * @throws MetadataServiceException
      */
     public boolean removePartition(String hcatURI) throws 
MetadataServiceException {
@@ -272,59 +306,45 @@ public class PartitionDependencyManagerS
     }
 
     /**
-     * Move partition entry specified by ParitionWrapper object
-     * from 'missing' to 'available' map
+     * Move partition entry specified by ParitionWrapper object from 'missing'
+     * to 'available' map
      *
      * @param partition
-     * @return true if partition was successfully moved to availableMap
-     * false otherwise
+     * @return true if partition was successfully moved to availableMap false
+     *         otherwise
      */
     public boolean partitionAvailable(PartitionWrapper partition) {
-        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
-        if (hcatInstanceMap.containsKey(prefix)) {
-            Map<String, PartitionsGroup> tableMap = 
hcatInstanceMap.get(prefix);
-            String tableName = partition.getTableName();
-            PartitionsGroup missingPartitions = null;
-            if (tableMap.containsKey(tableName)) {
-                WaitingActions actions = _getActionsForPartition(tableMap, 
tableName, missingPartitions, partition);
-                if(actions != null) {
-                    List<String> actionsList = actions.getActions();
-                    Iterator<String> it = actionsList.iterator();
-                    while (it.hasNext()) { // add actions into separate entries
-                        String actionId = it.next();
-                        if (availMap.containsKey(actionId)) {
-                            // actionId exists, so append partition
-                            availMap.get(actionId).add(partition);
-                        }
-                        else { // new entry
-                            availMap.put(actionId,
-                                    new 
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
-                        }
-                    }
-                    removePartition(partition, true);
-                    return true;
+
+        List<String> actionsList = _getActionsForPartition(partition);
+        if (actionsList != null) {
+            Iterator<String> it = actionsList.iterator();
+            while (it.hasNext()) { // add actions into separate entries
+                String actionId = it.next();
+                if (availMap.containsKey(actionId)) {
+                    // actionId exists, so append partition
+                    availMap.get(actionId).add(partition);
                 }
-                else {
-                    log.warn("partitionAvailable: HCat Partition [{0}] not 
found", partition.toString());
+                else { // new entry
+                    availMap.put(actionId, new 
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
                 }
             }
-            else {
-                log.warn("HCat table [{0}] not found", tableName);
-            }
+            removePartition(partition, true);
+            return true;
         }
         else {
-            log.warn("HCat instance [{0}] not found", prefix);
+            log.warn("No coord actions waitings for HCat Partition [{0}]", 
partition.toString());
         }
+
         return false;
     }
 
     /**
-     * Move partition entry specified by HCat URI from 'missing' to
-     * 'available' map
+     * Move partition entry specified by HCat URI from 'missing' to 'available'
+     * map
      *
      * @param hcatURI
-     * @return true if partition was successfully moved to availableMap
-     * false otherwise
+     * @return true if partition was successfully moved to availableMap false
+     *         otherwise
      * @throws MetadataServiceException
      */
     public boolean partitionAvailable(String hcatURI) throws 
MetadataServiceException {
@@ -363,17 +383,40 @@ public class PartitionDependencyManagerS
         }
     }
 
-    private WaitingActions _getActionsForPartition(Map<String, 
PartitionsGroup> tableMap, String tableName,
-            PartitionsGroup missingPartitions, PartitionWrapper partition) {
-        WaitingActions actionsList = null;
-        missingPartitions = tableMap.get(tableName);
-        if (missingPartitions != null && 
missingPartitions.getPartitionsMap().containsKey(partition)) {
-            actionsList = missingPartitions.getPartitionsMap().get(partition);
+    private List<String> _getActionsForPartition(PartitionWrapper partition) {
+        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
+        if (hcatInstanceMap.containsKey(prefix)) {
+            Map<String, PartitionsGroup> tableMap = 
hcatInstanceMap.get(prefix);
+            String tableName = partition.getTableName();
+            if (tableMap.containsKey(tableName)) {
+                PartitionsGroup missingPartitions = tableMap.get(tableName);
+                if (missingPartitions != null) {
+                    if 
(missingPartitions.getPartitionsMap().containsKey(partition)) {
+                        WaitingActions actions = 
missingPartitions.getPartitionsMap().get(partition);
+                        if (actions != null) {
+                            return actions.getActions();
+                        }
+                        else {
+                            log.warn("No coord actions waitings for HCat 
Partition [{0}]", partition.toString());
+                        }
+                    }
+                    else {
+                        log.warn("HCat Partition [{0}] not found", 
partition.toString());
+                    }
+                }
+                else {
+                    log.warn("MissingPartitions not created in HCat table 
[{0}]", tableName);
+                }
+            }
+            else {
+                log.warn("HCat table [{0}] not found", tableName);
+            }
         }
         else {
-            log.warn( " _getActionsForPartition: HCat Partition [{0}] not 
found", partition.toString());
+            log.warn("HCat instance [{0}] not found", prefix);
         }
-        return actionsList;
+
+        return null;
     }
 
     private void _createPartitionMapForTable(Map<String, PartitionsGroup> 
tableMap, String tableName,

Added: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java?rev=1411494&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
 Tue Nov 20 01:11:09 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord;
+
+import java.util.Map;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+
+public class TestCoordCommandUtils extends XDataTestCase {
+    private Services services;
+
+    @Override
+    protected void setUp() throws Exception {
+
+        super.setUp();
+        
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME, 
"myhcatserver");
+        
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME, 
"myhcatdb");
+        
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, 
"100");
+        Services services = new Services();
+        addServiceToRun(services.getConf(), 
PartitionDependencyManagerService.class.getName());
+        services.init();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testRegisterPartition() throws Exception {
+
+        String hcatUriStr1 = 
"hcat://hcatserver.com:4080/mydb/mytable/?datestamp=1234&region=us";
+        String hcatUriStr2 = 
"hcat://hcatserver.com:4080/mydb/mytable/?click=1234";
+        CoordinatorActionBean action1 = new CoordinatorActionBean();
+        action1.setId("1");
+        StringBuffer st = new StringBuffer();
+        st.append(hcatUriStr1);
+        st.append(CoordELFunctions.INSTANCE_SEPARATOR);
+        st.append(hcatUriStr2);
+        st.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR);
+        st.append("${coord:latest(0)}");
+        action1.setPushMissingDependencies(st.toString());
+        CoordCommandUtils.registerPartition(action1);
+
+        HCatURI hcatUri1 = new HCatURI(hcatUriStr1);
+        HCatURI hcatUri2 = new HCatURI(hcatUriStr2);
+        PartitionDependencyManagerService pdms = 
Services.get().get(PartitionDependencyManagerService.class);
+        Map<String, Map<String, PartitionsGroup>> hcatInstanceMap = 
pdms.getHCatMap();
+        Map<String, PartitionsGroup> tablePartitionsMap = 
hcatInstanceMap.get(PartitionWrapper.makePrefix(
+                hcatUri1.getServer(), hcatUri1.getDb()));
+        // check tablePartitionMap exist for the table
+        assertTrue(tablePartitionsMap.containsKey(hcatUri1.getTable()));
+        PartitionsGroup missingPartitions = 
tablePartitionsMap.get(hcatUri1.getTable());
+        WaitingActions actions1 = missingPartitions.getPartitionsMap().get(new 
PartitionWrapper(hcatUri1));
+        WaitingActions actions2 = missingPartitions.getPartitionsMap().get(new 
PartitionWrapper(hcatUri2));
+        // check actionID is included in WaitingAction
+        assertTrue(actions1.getActions().contains(action1.getId()));
+        assertTrue(actions2.getActions().contains(action1.getId()));
+    }
+
+}

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 Tue Nov 20 01:11:09 2012
@@ -78,7 +78,7 @@ public class TestPartitionDependencyMana
     public void testAddMissingPartition() throws MetadataServiceException, 
URISyntaxException {
         Services services = Services.get();
         PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
-        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
@@ -107,7 +107,7 @@ public class TestPartitionDependencyMana
     public void testRemovePartition() throws MetadataServiceException, 
URISyntaxException {
         Services services = Services.get();
         PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
-        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
@@ -141,7 +141,7 @@ public class TestPartitionDependencyMana
     public void testAvailablePartition() throws MetadataServiceException, 
URISyntaxException {
         Services services = Services.get();
         PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
-        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
         String actionId = "myAction";
         pdms.addMissingPartition(newHCatDependency, actionId);
 
@@ -161,4 +161,35 @@ public class TestPartitionDependencyMana
                                                                         
//cascade - ON
         assertEquals(availMap.get(actionId).get(0), new 
PartitionWrapper(hcatUri));
     }
+
+    /**
+     * Test removal of action ID from missing partition
+     *
+     * @throws MetadataServiceException
+     * @throws URISyntaxException
+     */
+    @Test
+    public void testRemoveActionFromMissingPartition() throws 
MetadataServiceException, URISyntaxException {
+        Services services = Services.get();
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        String newHCatDependency1 = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12";
+        String newHCatDependency2 = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12&region=us";
+        String actionId1 = "1";
+        String actionId2 = "2";
+        pdms.addMissingPartition(newHCatDependency1, actionId1);
+        pdms.addMissingPartition(newHCatDependency2, actionId2);
+        // remove newHCatDependency2
+        pdms.removeActionFromMissingPartitions(newHCatDependency2, actionId2);
+
+        HCatURI hcatUri = new HCatURI(newHCatDependency1);
+        String prefix = PartitionWrapper.makePrefix(hcatUri.getServer(), 
hcatUri.getDb());
+        Map<String, PartitionsGroup> tablePartitionsMap = 
pdms.getHCatMap().get(prefix);
+        PartitionsGroup missingPartitions = 
tablePartitionsMap.get(hcatUri.getTable());
+        assertNotNull(missingPartitions);
+
+        WaitingActions actions = missingPartitions.getPartitionsMap().get(new 
PartitionWrapper(hcatUri));
+        assertNotNull(actions);
+        assertTrue(actions.getActions().contains(actionId1));
+        assertFalse(actions.getActions().contains(actionId2));
+    }
 }

Modified: oozie/branches/hcat-intre/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 20 01:11:09 2012
@@ -1,5 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
 
+OOZIE-1043 Add logic to register to Missing Dependency Structure in coord 
action materialization (ryota via mohammad)
 OOZIE-1061 Add new EL functions to retrieve HCatalog server, DB and table 
name(mohammad)
 OOZIE-1056 Command to update push-based dependency (mohammad)
 OOZIE-1059 Add static method to create URI String in HCatURI(ryota via 
mohammad)


Reply via email to