Author: kamrul
Date: Thu Nov  8 20:09:12 2012
New Revision: 1407251

URL: http://svn.apache.org/viewvc?rev=1407251&view=rev
Log:
OOZIE-1039 Implement the Missing Dependency structure for HCat partitions(mona 
via mohammad) 

Added:
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
Modified:
    oozie/branches/hcat-intre/core/pom.xml
    oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
    
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
    
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
    oozie/branches/hcat-intre/pom.xml
    oozie/branches/hcat-intre/release-log.txt

Modified: oozie/branches/hcat-intre/core/pom.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Thu Nov  8 20:09:12 2012
@@ -162,6 +162,12 @@
             <scope>compile</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+            <artifactId>concurrentlinkedhashmap-lru</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
         <!--
         Oozie web-app module must exclude it.
          -->

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java 
(original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java 
Thu Nov  8 20:09:12 2012
@@ -196,6 +196,7 @@ public enum ErrorCode {
     E1020(XLog.STD, "Could not kill coord job, this job either finished 
successfully or does not exist , [{0}]"),
     E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
     E1022(XLog.STD, "Cannot delete running/completed coordinator action: 
[{0}]"),
+    E1023(XLog.STD, "Coord Action push Input Check Error: {0}"),
 
     E1100(XLog.STD, "Command precondition does not hold before execution, 
[{0}]"),
 
@@ -226,6 +227,10 @@ public enum ErrorCode {
 
     E1400(XLog.STD, "doAs (proxyuser) failure"),
 
+    E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
+    E1502(XLog.STD, "Partition cache lookup error"),
+    E1503(XLog.STD, "Error in Metadata URI"),
+
     ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id 
[{0}]"),;
 
     private String template;

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.ErrorCode;
+
+public class MetadataServiceException extends XException {
+
+    /**
+     * Create an MetadataService exception from a XException.
+     *
+     * @param cause the XException cause.
+     */
+    public MetadataServiceException(XException cause) {
+        super(cause);
+    }
+
+    /**
+     * Create a MetadataService exception.
+     *
+     * @param errorCode error code.
+     * @param params parameters for the error code message template.
+     */
+    public MetadataServiceException(ErrorCode errorCode, Object... params) {
+        super(errorCode, params);
+    }
+
+}

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Module that functions like a caching service to maintain partition 
dependency
+ * mappings
+ */
+public class PartitionDependencyManagerService implements Service {
+
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + 
"PartitionDependencyManagerService.";
+    public static final String HCAT_DEFAULT_SERVER_NAME = 
"hcat.default.server.name";
+    public static final String HCAT_DEFAULT_DB_NAME = "hcat.default.db.name";
+    public static final String MAP_MAX_WEIGHTED_CAPACITY = CONF_PREFIX + 
"map.max.weighted.capacity";
+    private static final int DEFAULT_MAP_MAX_WEIGHTED_CAPACITY = 10000;
+
+    private static XLog log;
+
+    /*
+     * Top-level map key = concatenated identifier for hcatServer + hcatDB
+     * value = table map (key = tableName string, value = PartitionsGroup
+     */
+    private Map<String, Map<String, PartitionsGroup>> hcatInstanceMap;
+
+    /*
+     * Map denoting actions and corresponding 'available' partitions
+     * key = coordinator actionId, value = available partitions
+     */
+    private Map<String, List<PartitionWrapper>> availMap;
+
+    private static int maxCapacity;
+
+    @Override
+    public void init(Services services) throws ServiceException {
+        init(services.getConf());
+    }
+
+    public void init(Configuration conf) throws ServiceException {
+        hcatInstanceMap = new ConcurrentHashMap<String, Map<String, 
PartitionsGroup>>();
+        availMap = new ConcurrentHashMap<String, List<PartitionWrapper>>();
+        maxCapacity = conf.getInt(MAP_MAX_WEIGHTED_CAPACITY, 
DEFAULT_MAP_MAX_WEIGHTED_CAPACITY);
+        log = XLog.getLog(getClass());
+    }
+
+    @Override
+    public void destroy() {
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return PartitionDependencyManagerService.class;
+    }
+
+    /**
+     * getter for hcatInstanceMap
+     */
+    public Map<String, Map<String, PartitionsGroup>> getHCatMap() {
+        return hcatInstanceMap;
+    }
+
+    /**
+     * getter for availMap
+     */
+    public Map<String, List<PartitionWrapper>> getAvailableMap() {
+        return availMap;
+    }
+
+    /**
+     * Adding missing partition entry specified by PartitionWrapper object
+     *
+     * @param partition
+     * @param actionId
+     * @throws MetadataServiceException
+     */
+    @SuppressWarnings("unused")
+    public void addMissingPartition(PartitionWrapper partition, String 
actionId) throws MetadataServiceException {
+        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
+        Map<String, PartitionsGroup> tablePartitionsMap;
+        String tableName = partition.getTableName();
+        PartitionsGroup missingPartitions = null;
+        WaitingActions actionsList;
+        try {
+            if (hcatInstanceMap.containsKey(prefix)) {
+                tablePartitionsMap = hcatInstanceMap.get(prefix);
+                if (tablePartitionsMap.containsKey(tableName)) {
+                    actionsList = _getActionsForPartition(tablePartitionsMap, 
tableName, missingPartitions, partition);
+                    if(missingPartitions != null) {
+                        if(actionsList != null) {
+                            // partition exists, therefore append action
+                            actionsList.addAndUpdate(actionId);
+                        }
+                        else {
+                            // new partition entry and info
+                            actionsList = new WaitingActions(actionId);
+                            missingPartitions.addPartitionAndAction(partition, 
actionsList);
+                        }
+                    }
+                    else {
+                        log.warn("No partition entries for table [{0}]", 
tableName);
+                    }
+                }
+                else { // new table entry
+                    tablePartitionsMap = new ConcurrentHashMap<String, 
PartitionsGroup>();
+                    _createPartitionMapForTable(tablePartitionsMap, tableName, 
partition, actionId);
+                }
+            }
+            else { // new partition from different hcat server/db
+                _addNewEntry(hcatInstanceMap, prefix, tableName, partition, 
actionId);
+            }
+        }
+        catch (ClassCastException e) {
+            throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+        }
+        catch (NullPointerException e) {
+            throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+        }
+        catch (IllegalArgumentException e) {
+            throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+        }
+    }
+
+    /**
+     * Add missing partition entry specified by HCat URI
+     *
+     * @param hcatURI
+     * @param actionId
+     * @throws MetadataServiceException
+     */
+    public void addMissingPartition(String hcatURI, String actionId) throws 
MetadataServiceException {
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1503, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        addMissingPartition(partition, actionId);
+    }
+
+    /**
+     * Remove partition entry specified by PartitionWrapper object
+     * and cascading delete indicator
+     *
+     * @param partition
+     * @param cascade
+     * @return true if partition was successfully removed
+     * false otherwise
+     */
+    public boolean removePartition(PartitionWrapper partition, boolean 
cascade) {
+        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
+        if (hcatInstanceMap.containsKey(prefix)) {
+            Map<String, PartitionsGroup> tableMap = 
hcatInstanceMap.get(prefix);
+            String tableName = partition.getTableName();
+            if (tableMap.containsKey(tableName)) {
+                PartitionsGroup missingPartitions = tableMap.get(tableName);
+                if (missingPartitions != null) {
+                    missingPartitions.getPartitionsMap().remove(partition);
+                    // cascading removal
+                    if (cascade) {
+                        if (missingPartitions.getPartitionsMap().size() == 0) {
+                            tableMap.remove(tableName);
+                            if (tableMap.size() == 0) {
+                                hcatInstanceMap.remove(prefix);
+                            }
+                        }
+                    }
+                    return true;
+                }
+                else {
+                    log.warn("No partition entries for table [{0}]", 
tableName);
+                }
+            }
+            else {
+                log.warn("HCat table [{0}] not found", tableName);
+            }
+        }
+        else {
+            log.warn("HCat instance entry [{0}] not found", prefix);
+        }
+        return false;
+    }
+
+    /**
+     * Remove partition entry specified by HCat URI and
+     * cascading delete indicator
+     *
+     * @param hcatURI
+     * @param cascade
+     * @return true if partition was successfully removed
+     * false otherwise
+     * @throws MetadataServiceException
+     */
+    public boolean removePartition(String hcatURI, boolean cascade) throws 
MetadataServiceException {
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1503, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        return removePartition(partition, cascade);
+    }
+
+    /**
+     * Remove partition entry specified by HCat URI with
+     * default cascade mode - TRUE
+     *
+     * @param hcatURI
+     * @return true if partition was successfully removed
+     * false otherwise
+     * @throws MetadataServiceException
+     */
+    public boolean removePartition(String hcatURI) throws 
MetadataServiceException {
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1503, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        return removePartition(partition, true);
+    }
+
+    /**
+     * Move partition entry specified by ParitionWrapper object
+     * from 'missing' to 'available' map
+     *
+     * @param partition
+     * @return true if partition was successfully moved to availableMap
+     * false otherwise
+     */
+    public boolean partitionAvailable(PartitionWrapper partition) {
+        String prefix = PartitionWrapper.makePrefix(partition.getServerName(), 
partition.getDbName());
+        if (hcatInstanceMap.containsKey(prefix)) {
+            Map<String, PartitionsGroup> tableMap = 
hcatInstanceMap.get(prefix);
+            String tableName = partition.getTableName();
+            PartitionsGroup missingPartitions = null;
+            if (tableMap.containsKey(tableName)) {
+                WaitingActions actions = _getActionsForPartition(tableMap, 
tableName, missingPartitions, partition);
+                if(actions != null) {
+                    List<String> actionsList = actions.getActions();
+                    Iterator<String> it = actionsList.iterator();
+                    while (it.hasNext()) { // add actions into separate entries
+                        String actionId = it.next();
+                        if (availMap.containsKey(actionId)) {
+                            // actionId exists, so append partition
+                            availMap.get(actionId).add(partition);
+                        }
+                        else { // new entry
+                            availMap.put(actionId,
+                                    new 
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
+                        }
+                    }
+                    removePartition(partition, true);
+                    return true;
+                }
+                else {
+                    log.warn("HCat Partition [{0}] not found", 
partition.toString());
+                }
+            }
+            else {
+                log.warn("HCat table [{0}] not found", tableName);
+            }
+        }
+        else {
+            log.warn("HCat instance [{0}] not found", prefix);
+        }
+        return false;
+    }
+
+    /**
+     * Move partition entry specified by HCat URI from 'missing' to
+     * 'available' map
+     *
+     * @param hcatURI
+     * @return true if partition was successfully moved to availableMap
+     * false otherwise
+     * @throws MetadataServiceException
+     */
+    public boolean partitionAvailable(String hcatURI) throws 
MetadataServiceException {
+        HCatURI uri;
+        try {
+            uri = new HCatURI(hcatURI);
+        }
+        catch (URISyntaxException e) {
+            throw new MetadataServiceException(ErrorCode.E1503, 
e.getMessage());
+        }
+        PartitionWrapper partition = new PartitionWrapper(uri.getServer(), 
uri.getDb(), uri.getTable(),
+                uri.getPartitionMap());
+        return partitionAvailable(partition);
+    }
+
+    private WaitingActions _getActionsForPartition(Map<String, 
PartitionsGroup> tableMap, String tableName,
+            PartitionsGroup missingPartitions, PartitionWrapper partition) {
+        WaitingActions actionsList = null;
+        missingPartitions = tableMap.get(tableName);
+        if (missingPartitions != null && 
missingPartitions.getPartitionsMap().containsKey(partition)) {
+            actionsList = missingPartitions.getPartitionsMap().get(partition);
+        }
+        else {
+            log.warn("HCat Partition [{0}] not found", partition.toString());
+        }
+        return actionsList;
+    }
+
+    private void _createPartitionMapForTable(Map<String, PartitionsGroup> 
tableMap, String tableName,
+            PartitionWrapper partition, String actionId) {
+        PartitionsGroup partitions = new PartitionsGroup(
+                new ConcurrentLinkedHashMap.Builder<PartitionWrapper, 
WaitingActions>().maximumWeightedCapacity(
+                        maxCapacity).build());
+        tableMap.put(tableName, partitions);
+        WaitingActions newActions = new WaitingActions(actionId);
+        partitions.getPartitionsMap().put(partition, newActions);
+    }
+
+    private void _addNewEntry(Map<String, Map<String, PartitionsGroup>> 
instanceMap, String prefix, String tableName,
+            PartitionWrapper partition, String actionId) {
+        Map<String, PartitionsGroup> tableMap = new ConcurrentHashMap<String, 
PartitionsGroup>();
+        instanceMap.put(prefix, tableMap);
+        _createPartitionMapForTable(tableMap, tableName, partition, actionId);
+    }
+
+}

Modified: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java 
(original)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java 
Thu Nov  8 20:09:12 2012
@@ -85,7 +85,7 @@ public class HCatURI {
             }
             String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR, -1);
             if (keyVal.length != 2) {
-                throw new URISyntaxException(uri.toString(), "Parition key 
value pair is not specified properly in ("
+                throw new URISyntaxException(uri.toString(), "Partition key 
value pair is not specified properly in ("
                         + part + ")");
             }
             partitions.put(keyVal[0], keyVal[1]);
@@ -138,11 +138,11 @@ public class HCatURI {
         this.partitions = partitions;
     }
 
-    public String getParitionValue(String key) {
+    public String getPartitionValue(String key) {
         return partitions.get(key);
     }
 
-    public String setParition(String key, String value) {
+    public String setPartition(String key, String value) {
         return partitions.put(key, value);
     }
 

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Class to hold the partition related information required by Oozie missing
+ * dependency service
+ */
+public class PartitionWrapper {
+    private String serverName;
+    private String dbName;
+    private String tableName;
+    private Map<String, String> partition;
+    private static String CONCATENATOR = "#";
+
+    public PartitionWrapper(Map<String, String> partition) {
+        this("DEFAULT", "DEFAULT", "DEFAULT", partition);
+    }
+
+    public PartitionWrapper(String server, String db, String table, 
Map<String, String> partition) {
+        this.serverName = server;
+        this.dbName = db;
+        this.tableName = table;
+        setPartition(partition);
+    }
+
+    public PartitionWrapper(HCatURI hcatUri) {
+        this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(), 
hcatUri.getPartitionMap());
+    }
+
+    /**
+     * @return the server name
+     */
+    public String getServerName() {
+        return serverName;
+    }
+
+    /**
+     * @param server the instance to set
+     */
+    public void setServerName(String serverName) {
+        this.serverName = serverName;
+    }
+
+    /**
+     * @return the db name
+     */
+    public String getDbName() {
+        return dbName;
+    }
+
+    /**
+     * @param dbName the dbName to set
+     */
+    public void setDbName(String dbName) {
+        this.dbName = dbName;
+    }
+
+    /**
+     * @return the table
+     */
+    public String getTableName() {
+        return tableName;
+    }
+
+    /**
+     * @param table the table to set
+     */
+    public void setTableName(String table) {
+        this.tableName = table;
+    }
+
+    /**
+     * @return the partition
+     */
+    public Map<String, String> getPartition() {
+        return partition;
+    }
+
+    /**
+     * @param partition the partition to set
+     */
+    public void setPartition(Map<String, String> partition) {
+        this.partition = partition;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder partString = new StringBuilder("");
+        Iterator<Map.Entry<String, String>> it = 
partition.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, String> partEntry = it.next();
+            partString.append(partEntry.getKey() + "=" + partEntry.getValue() 
+ ";");
+        }
+        // adding prefix and removing the trailing ";"
+        return makePrefix(serverName, dbName) + CONCATENATOR + tableName + 
CONCATENATOR
+                + partString.substring(0, partString.length() - 1);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        PartitionWrapper pw = (PartitionWrapper) obj;
+        Iterator<Map.Entry<String, String>> it1 = 
partition.entrySet().iterator();
+        Map<String, String> p = pw.getPartition();
+        boolean equals = true;
+        if (this.serverName.equals(pw.serverName) && 
this.dbName.equals(pw.dbName)
+                && this.tableName.equals(pw.tableName)) {
+            while (it1.hasNext()) {
+                String key = it1.next().getKey();
+                if (!(p.containsKey(key) && 
p.get(key).equals(partition.get(key)))) {
+                    equals = false;
+                }
+            }
+        }
+        else {
+            equals = false;
+        }
+        return equals;
+    }
+
+    @Override
+    public int hashCode() {
+        return serverName.hashCode() + dbName.hashCode() + 
tableName.hashCode() + partition.hashCode();
+    }
+
+    public static String makePrefix(String serverName, String dbName) {
+        return serverName + CONCATENATOR + dbName;
+    }
+
+}

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.util.Map;
+
+/**
+ * Group of partitions corresponding to an HCat table in the form of a map in
+ * the missing dependency service cache
+ */
+public class PartitionsGroup {
+    private Map<PartitionWrapper, WaitingActions> partitionsMap;
+
+    public PartitionsGroup(Map<PartitionWrapper, WaitingActions> partitionMap) 
{
+        setPartitionsMap(partitionMap);
+    }
+
+    /**
+     * @return the partitionsMap
+     */
+    public Map<PartitionWrapper, WaitingActions> getPartitionsMap() {
+        return partitionsMap;
+    }
+
+    /**
+     * @param partitionsMap the partitionsMap to set
+     */
+    public void setPartitionsMap(Map<PartitionWrapper, WaitingActions> 
partitionsMap) {
+        this.partitionsMap = partitionsMap;
+    }
+
+    public void addPartitionAndAction(PartitionWrapper p, WaitingActions a) {
+        partitionsMap.put(p, a);
+    }
+}

Added: 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * The object to store list of waiting actions and timestamp corresponding to a
+ * particular hcat partition
+ */
+public class WaitingActions {
+    private List<String> actions;
+    private Timestamp lastModified;
+
+    /**
+     * Empty (default) constructor
+     */
+    public WaitingActions() {
+        this(new CopyOnWriteArrayList<String>());
+    }
+
+    /**
+     * Constructor that sets list directly
+     *
+     * @param List newActions
+     */
+    WaitingActions(List<String> newActions) {
+        this.setActions(newActions);
+        this.setLastModified(DateUtils.convertDateToTimestamp(new Date()));
+    }
+
+    /**
+     * constructor that sets single action in list
+     *
+     * @param newActionId
+     */
+    public WaitingActions(String actionId) {
+        this();
+        this.getActions().add(actionId);
+    }
+
+    /**
+     * @return the actions
+     */
+    public List<String> getActions() {
+        return actions;
+    }
+
+    /**
+     * @param actions the actions to set
+     */
+    public void setActions(List<String> actions) {
+        this.actions = actions;
+    }
+
+    /**
+     * @return the lastModified
+     */
+    public Timestamp getLastModified() {
+        return lastModified;
+    }
+
+    /**
+     * @param lastModified the lastModified to set
+     */
+    public void setLastModified(Timestamp lastModified) {
+        this.lastModified = lastModified;
+    }
+
+    /**
+     * @param lastModified the lastModified to set
+     */
+    public void setLastModified(Date lastModified) {
+        this.lastModified = DateUtils.convertDateToTimestamp(lastModified);
+    }
+
+    /**
+     * Add actionId to list and update lastModifiedTime
+     *
+     * @param actionId
+     */
+    public void addAndUpdate(String actionId) {
+        getActions().add(actionId);
+        setLastModified(new Date());
+    }
+}

Added: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1407251&view=auto
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 (added)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
 Thu Nov  8 20:09:12 2012
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class to test the addition, removal and available operations
+ * on the partition dependencies cache structure
+ */
+public class TestPartitionDependencyManagerService extends XDataTestCase {
+
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME, 
"myhcatserver");
+        
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME, 
"myhcatdb");
+        
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY, 
"100");
+        Services services = new Services();
+        addServiceToRun(services.getConf(), 
PartitionDependencyManagerService.class.getName());
+        services.init();
+    }
+
+    @After
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    /**
+     * Test basic service startup and required structures
+     * @throws MetadataServiceException
+     */
+    @Test
+    public void testBasicService() throws MetadataServiceException {
+        Services services = Services.get();
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        assertNotNull(pdms);
+        assertNotNull(pdms.getHCatMap());
+        assertNotNull(pdms.getAvailableMap());
+    }
+
+    /**
+     * Test addition of missing partition into cache
+     *
+     * @throws MetadataServiceException
+     * @throws URISyntaxException
+     */
+    @Test
+    public void testAddMissingPartition() throws MetadataServiceException, 
URISyntaxException {
+        Services services = Services.get();
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String actionId = "myAction";
+        pdms.addMissingPartition(newHCatDependency, actionId);
+
+        HCatURI hcatUri = new HCatURI(newHCatDependency);
+        Map<String, PartitionsGroup> tablePartitionsMap = 
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+                                                                            
hcatUri.getDb()); // clicks
+        assertNotNull(tablePartitionsMap);
+        assertTrue(tablePartitionsMap.containsKey("clicks"));
+        PartitionsGroup missingPartitions = 
tablePartitionsMap.get(hcatUri.getTable());
+        assertNotNull(missingPartitions);
+
+        
assertEquals(missingPartitions.getPartitionsMap().keySet().iterator().next(),
+                new PartitionWrapper(hcatUri)); // datastamp=12;region=us
+        WaitingActions actions = missingPartitions.getPartitionsMap().get(new 
PartitionWrapper(hcatUri));
+        assertNotNull(actions);
+        assertTrue(actions.getActions().contains(actionId));
+    }
+
+    /**
+     * Test removal of partition from cache
+     *
+     * @throws MetadataServiceException
+     * @throws URISyntaxException
+     */
+    @Test
+    public void testRemovePartition() throws MetadataServiceException, 
URISyntaxException {
+        Services services = Services.get();
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String actionId = "myAction";
+        pdms.addMissingPartition(newHCatDependency, actionId);
+
+        HCatURI hcatUri = new HCatURI(newHCatDependency);
+        Map<String, PartitionsGroup> tablePartitionsMap = 
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+                                                                            
hcatUri.getDb()); // clicks
+        assertNotNull(tablePartitionsMap);
+        assertTrue(tablePartitionsMap.containsKey("clicks"));
+        PartitionsGroup missingPartitions = 
tablePartitionsMap.get(hcatUri.getTable());
+        assertNotNull(missingPartitions);
+
+        // remove with cascading - OFF
+        pdms.removePartition(newHCatDependency, false);
+        
assertFalse(missingPartitions.getPartitionsMap().containsKey(hcatUri.getPartitionMap()));
+
+        pdms.addMissingPartition(newHCatDependency, actionId);
+        assertNotNull(missingPartitions);
+
+        // remove with cascading - ON
+        pdms.removePartition(newHCatDependency);
+        assertFalse(pdms.getHCatMap().containsKey(hcatUri.getTable()));
+    }
+
+    /**
+     * Test partition available function on cache
+     *
+     * @throws MetadataServiceException
+     * @throws URISyntaxException
+     */
+    @Test
+    public void testAvailablePartition() throws MetadataServiceException, 
URISyntaxException {
+        Services services = Services.get();
+        PartitionDependencyManagerService pdms = 
services.get(PartitionDependencyManagerService.class);
+        String newHCatDependency = 
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+        String actionId = "myAction";
+        pdms.addMissingPartition(newHCatDependency, actionId);
+
+        HCatURI hcatUri = new HCatURI(newHCatDependency);
+        Map<String, PartitionsGroup> tablePartitionsMap = 
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+                                                                            
hcatUri.getDb()); // clicks
+        assertNotNull(tablePartitionsMap);
+        assertTrue(tablePartitionsMap.containsKey("clicks"));
+        PartitionsGroup missingPartitions = 
tablePartitionsMap.get(hcatUri.getTable());
+        assertNotNull(missingPartitions);
+
+        pdms.partitionAvailable(newHCatDependency);
+        Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+        assertNotNull(availMap);
+        assertTrue(availMap.containsKey(actionId)); //found in 'available' 
cache
+        assertFalse(pdms.getHCatMap().containsKey(hcatUri.getTable())); 
//removed from 'missing' cache
+                                                                        
//cascade - ON
+        assertEquals(availMap.get(actionId).get(0), new 
PartitionWrapper(hcatUri));
+    }
+}

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
 Thu Nov  8 20:09:12 2012
@@ -1185,6 +1185,18 @@ public abstract class XDataTestCase exte
        conf.set(Services.CONF_SERVICE_CLASSES, new String(builder));
    }
 
+   /**
+    * Add a particular service class to be run in addition to default ones
+    * @param conf
+    * @param serviceName
+    */
+   protected void addServiceToRun(Configuration conf, String serviceName) {
+       String classes = conf.get(Services.CONF_SERVICE_CLASSES);
+       StringBuilder builder = new StringBuilder(classes);
+       builder.append("," + serviceName);
+       conf.set(Services.CONF_SERVICE_CLASSES, new String(builder));
+   }
+
     /**
      * Adds the db records for the Bulk Monitor tests
      */

Modified: 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
 (original)
+++ 
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
 Thu Nov  8 20:09:12 2012
@@ -37,8 +37,8 @@ public class TestHCatURI {
         assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
         assertEquals(uri.getDb(),"mydb");
         assertEquals(uri.getTable(),"clicks");
-        assertEquals(uri.getParitionValue("datastamp"),"12");
-        assertEquals(uri.getParitionValue("region"),"us");
+        assertEquals(uri.getPartitionValue("datastamp"),"12");
+        assertEquals(uri.getPartitionValue("region"),"us");
 
     }
 
@@ -60,8 +60,8 @@ public class TestHCatURI {
         assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
         assertEquals(uri.getDb(),"mydb");
         assertEquals(uri.getTable(),"clicks");
-        assertEquals(uri.getParitionValue("datastamp"),"12");
-        assertEquals(uri.getParitionValue("region"),"us");
+        assertEquals(uri.getPartitionValue("datastamp"),"12");
+        assertEquals(uri.getPartitionValue("region"),"us");
     }
 
     @Test
@@ -82,8 +82,8 @@ public class TestHCatURI {
         assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
         assertEquals(uri.getDb(),"mydb");
         assertEquals(uri.getTable(),"clicks");
-        assertEquals(uri.getParitionValue("datastamp"),"12");
-        assertEquals(uri.getParitionValue("region"),"us");
+        assertEquals(uri.getPartitionValue("datastamp"),"12");
+        assertEquals(uri.getPartitionValue("region"),"us");
     }
 
     @Test
@@ -104,8 +104,8 @@ public class TestHCatURI {
         assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
         assertEquals(uri.getDb(),"mydb");
         assertEquals(uri.getTable(),"clicks");
-        assertEquals(uri.getParitionValue("datastamp"),"12");
-        assertEquals(uri.getParitionValue("region"),"us");
+        assertEquals(uri.getPartitionValue("datastamp"),"12");
+        assertEquals(uri.getPartitionValue("region"),"us");
     }
 
 
@@ -127,8 +127,8 @@ public class TestHCatURI {
         assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
         assertEquals(uri.getDb(),"mydb");
         assertEquals(uri.getTable(),"clicks");
-        assertEquals(uri.getParitionValue("datastamp"),"12");
-        assertEquals(uri.getParitionValue("region"),"us");
+        assertEquals(uri.getPartitionValue("datastamp"),"12");
+        assertEquals(uri.getPartitionValue("region"),"us");
     }
 
     @Test(expected = URISyntaxException.class)

Modified: oozie/branches/hcat-intre/pom.xml
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Thu Nov  8 20:09:12 2012
@@ -638,6 +638,12 @@
                 <version>1.8.5</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+                <artifactId>concurrentlinkedhashmap-lru</artifactId>
+                <version>1.3.1</version>
+            </dependency>
+
             <!-- examples -->
             <dependency>
                 <groupId>commons-httpclient</groupId>

Modified: oozie/branches/hcat-intre/release-log.txt
URL: 
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Nov  8 20:09:12 2012
@@ -1,4 +1,6 @@
 -- Oozie 3.4.0 release (trunk - unreleased)
+
+OOZIE-1039 Implement the Missing Dependency structure for HCat partitions 
(mona via mohammad)
 OOZIE-1028 Add EL function to allow date ranges to be used for dataset ranges 
(rkanter via tucu)
 OOZIE-1045 Parameterize <unresolved-instances> tag currently hardcoded 
(egashira via mona)
 OOZIE-1029 MiniMRCluster fails to start when used against YARN (rkanter via 
tucu)


Reply via email to