Author: kamrul
Date: Thu Nov 8 20:09:12 2012
New Revision: 1407251
URL: http://svn.apache.org/viewvc?rev=1407251&view=rev
Log:
OOZIE-1039 Implement the Missing Dependency structure for HCat partitions(mona
via mohammad)
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
Modified:
oozie/branches/hcat-intre/core/pom.xml
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
oozie/branches/hcat-intre/pom.xml
oozie/branches/hcat-intre/release-log.txt
Modified: oozie/branches/hcat-intre/core/pom.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/pom.xml?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/pom.xml (original)
+++ oozie/branches/hcat-intre/core/pom.xml Thu Nov 8 20:09:12 2012
@@ -162,6 +162,12 @@
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+ <artifactId>concurrentlinkedhashmap-lru</artifactId>
+ <scope>compile</scope>
+ </dependency>
+
<!--
Oozie web-app module must exclude it.
-->
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/ErrorCode.java
Thu Nov 8 20:09:12 2012
@@ -196,6 +196,7 @@ public enum ErrorCode {
E1020(XLog.STD, "Could not kill coord job, this job either finished
successfully or does not exist , [{0}]"),
E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
E1022(XLog.STD, "Cannot delete running/completed coordinator action:
[{0}]"),
+ E1023(XLog.STD, "Coord Action push Input Check Error: {0}"),
E1100(XLog.STD, "Command precondition does not hold before execution,
[{0}]"),
@@ -226,6 +227,10 @@ public enum ErrorCode {
E1400(XLog.STD, "doAs (proxyuser) failure"),
+ E1501(XLog.STD, "Partition Dependency Manager could not add cache entry"),
+ E1502(XLog.STD, "Partition cache lookup error"),
+ E1503(XLog.STD, "Error in Metadata URI"),
+
ETEST(XLog.STD, "THIS SHOULD HAPPEN ONLY IN TESTING, invalid job id
[{0}]"),;
private String template;
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/MetadataServiceException.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import org.apache.oozie.XException;
+import org.apache.oozie.ErrorCode;
+
+public class MetadataServiceException extends XException {
+
+ /**
+ * Create an MetadataService exception from a XException.
+ *
+ * @param cause the XException cause.
+ */
+ public MetadataServiceException(XException cause) {
+ super(cause);
+ }
+
+ /**
+ * Create a MetadataService exception.
+ *
+ * @param errorCode error code.
+ * @param params parameters for the error code message template.
+ */
+ public MetadataServiceException(ErrorCode errorCode, Object... params) {
+ super(errorCode, params);
+ }
+
+}
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.service.Service;
+import org.apache.oozie.service.ServiceException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.XLog;
+
+/**
+ * Module that functions like a caching service to maintain partition
dependency
+ * mappings
+ */
+public class PartitionDependencyManagerService implements Service {
+
+ public static final String CONF_PREFIX = Service.CONF_PREFIX +
"PartitionDependencyManagerService.";
+ public static final String HCAT_DEFAULT_SERVER_NAME =
"hcat.default.server.name";
+ public static final String HCAT_DEFAULT_DB_NAME = "hcat.default.db.name";
+ public static final String MAP_MAX_WEIGHTED_CAPACITY = CONF_PREFIX +
"map.max.weighted.capacity";
+ private static final int DEFAULT_MAP_MAX_WEIGHTED_CAPACITY = 10000;
+
+ private static XLog log;
+
+ /*
+ * Top-level map key = concatenated identifier for hcatServer + hcatDB
+ * value = table map (key = tableName string, value = PartitionsGroup
+ */
+ private Map<String, Map<String, PartitionsGroup>> hcatInstanceMap;
+
+ /*
+ * Map denoting actions and corresponding 'available' partitions
+ * key = coordinator actionId, value = available partitions
+ */
+ private Map<String, List<PartitionWrapper>> availMap;
+
+ private static int maxCapacity;
+
+ @Override
+ public void init(Services services) throws ServiceException {
+ init(services.getConf());
+ }
+
+ public void init(Configuration conf) throws ServiceException {
+ hcatInstanceMap = new ConcurrentHashMap<String, Map<String,
PartitionsGroup>>();
+ availMap = new ConcurrentHashMap<String, List<PartitionWrapper>>();
+ maxCapacity = conf.getInt(MAP_MAX_WEIGHTED_CAPACITY,
DEFAULT_MAP_MAX_WEIGHTED_CAPACITY);
+ log = XLog.getLog(getClass());
+ }
+
+ @Override
+ public void destroy() {
+ }
+
+ @Override
+ public Class<? extends Service> getInterface() {
+ return PartitionDependencyManagerService.class;
+ }
+
+ /**
+ * getter for hcatInstanceMap
+ */
+ public Map<String, Map<String, PartitionsGroup>> getHCatMap() {
+ return hcatInstanceMap;
+ }
+
+ /**
+ * getter for availMap
+ */
+ public Map<String, List<PartitionWrapper>> getAvailableMap() {
+ return availMap;
+ }
+
+ /**
+ * Adding missing partition entry specified by PartitionWrapper object
+ *
+ * @param partition
+ * @param actionId
+ * @throws MetadataServiceException
+ */
+ @SuppressWarnings("unused")
+ public void addMissingPartition(PartitionWrapper partition, String
actionId) throws MetadataServiceException {
+ String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
+ Map<String, PartitionsGroup> tablePartitionsMap;
+ String tableName = partition.getTableName();
+ PartitionsGroup missingPartitions = null;
+ WaitingActions actionsList;
+ try {
+ if (hcatInstanceMap.containsKey(prefix)) {
+ tablePartitionsMap = hcatInstanceMap.get(prefix);
+ if (tablePartitionsMap.containsKey(tableName)) {
+ actionsList = _getActionsForPartition(tablePartitionsMap,
tableName, missingPartitions, partition);
+ if(missingPartitions != null) {
+ if(actionsList != null) {
+ // partition exists, therefore append action
+ actionsList.addAndUpdate(actionId);
+ }
+ else {
+ // new partition entry and info
+ actionsList = new WaitingActions(actionId);
+ missingPartitions.addPartitionAndAction(partition,
actionsList);
+ }
+ }
+ else {
+ log.warn("No partition entries for table [{0}]",
tableName);
+ }
+ }
+ else { // new table entry
+ tablePartitionsMap = new ConcurrentHashMap<String,
PartitionsGroup>();
+ _createPartitionMapForTable(tablePartitionsMap, tableName,
partition, actionId);
+ }
+ }
+ else { // new partition from different hcat server/db
+ _addNewEntry(hcatInstanceMap, prefix, tableName, partition,
actionId);
+ }
+ }
+ catch (ClassCastException e) {
+ throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+ }
+ catch (NullPointerException e) {
+ throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+ }
+ catch (IllegalArgumentException e) {
+ throw new MetadataServiceException(ErrorCode.E1501, e.getCause());
+ }
+ }
+
+ /**
+ * Add missing partition entry specified by HCat URI
+ *
+ * @param hcatURI
+ * @param actionId
+ * @throws MetadataServiceException
+ */
+ public void addMissingPartition(String hcatURI, String actionId) throws
MetadataServiceException {
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1503,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ addMissingPartition(partition, actionId);
+ }
+
+ /**
+ * Remove partition entry specified by PartitionWrapper object
+ * and cascading delete indicator
+ *
+ * @param partition
+ * @param cascade
+ * @return true if partition was successfully removed
+ * false otherwise
+ */
+ public boolean removePartition(PartitionWrapper partition, boolean
cascade) {
+ String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
+ if (hcatInstanceMap.containsKey(prefix)) {
+ Map<String, PartitionsGroup> tableMap =
hcatInstanceMap.get(prefix);
+ String tableName = partition.getTableName();
+ if (tableMap.containsKey(tableName)) {
+ PartitionsGroup missingPartitions = tableMap.get(tableName);
+ if (missingPartitions != null) {
+ missingPartitions.getPartitionsMap().remove(partition);
+ // cascading removal
+ if (cascade) {
+ if (missingPartitions.getPartitionsMap().size() == 0) {
+ tableMap.remove(tableName);
+ if (tableMap.size() == 0) {
+ hcatInstanceMap.remove(prefix);
+ }
+ }
+ }
+ return true;
+ }
+ else {
+ log.warn("No partition entries for table [{0}]",
tableName);
+ }
+ }
+ else {
+ log.warn("HCat table [{0}] not found", tableName);
+ }
+ }
+ else {
+ log.warn("HCat instance entry [{0}] not found", prefix);
+ }
+ return false;
+ }
+
+ /**
+ * Remove partition entry specified by HCat URI and
+ * cascading delete indicator
+ *
+ * @param hcatURI
+ * @param cascade
+ * @return true if partition was successfully removed
+ * false otherwise
+ * @throws MetadataServiceException
+ */
+ public boolean removePartition(String hcatURI, boolean cascade) throws
MetadataServiceException {
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1503,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ return removePartition(partition, cascade);
+ }
+
+ /**
+ * Remove partition entry specified by HCat URI with
+ * default cascade mode - TRUE
+ *
+ * @param hcatURI
+ * @return true if partition was successfully removed
+ * false otherwise
+ * @throws MetadataServiceException
+ */
+ public boolean removePartition(String hcatURI) throws
MetadataServiceException {
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1503,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ return removePartition(partition, true);
+ }
+
+ /**
+ * Move partition entry specified by ParitionWrapper object
+ * from 'missing' to 'available' map
+ *
+ * @param partition
+ * @return true if partition was successfully moved to availableMap
+ * false otherwise
+ */
+ public boolean partitionAvailable(PartitionWrapper partition) {
+ String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
+ if (hcatInstanceMap.containsKey(prefix)) {
+ Map<String, PartitionsGroup> tableMap =
hcatInstanceMap.get(prefix);
+ String tableName = partition.getTableName();
+ PartitionsGroup missingPartitions = null;
+ if (tableMap.containsKey(tableName)) {
+ WaitingActions actions = _getActionsForPartition(tableMap,
tableName, missingPartitions, partition);
+ if(actions != null) {
+ List<String> actionsList = actions.getActions();
+ Iterator<String> it = actionsList.iterator();
+ while (it.hasNext()) { // add actions into separate entries
+ String actionId = it.next();
+ if (availMap.containsKey(actionId)) {
+ // actionId exists, so append partition
+ availMap.get(actionId).add(partition);
+ }
+ else { // new entry
+ availMap.put(actionId,
+ new
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
+ }
+ }
+ removePartition(partition, true);
+ return true;
+ }
+ else {
+ log.warn("HCat Partition [{0}] not found",
partition.toString());
+ }
+ }
+ else {
+ log.warn("HCat table [{0}] not found", tableName);
+ }
+ }
+ else {
+ log.warn("HCat instance [{0}] not found", prefix);
+ }
+ return false;
+ }
+
+ /**
+ * Move partition entry specified by HCat URI from 'missing' to
+ * 'available' map
+ *
+ * @param hcatURI
+ * @return true if partition was successfully moved to availableMap
+ * false otherwise
+ * @throws MetadataServiceException
+ */
+ public boolean partitionAvailable(String hcatURI) throws
MetadataServiceException {
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1503,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ return partitionAvailable(partition);
+ }
+
+ private WaitingActions _getActionsForPartition(Map<String,
PartitionsGroup> tableMap, String tableName,
+ PartitionsGroup missingPartitions, PartitionWrapper partition) {
+ WaitingActions actionsList = null;
+ missingPartitions = tableMap.get(tableName);
+ if (missingPartitions != null &&
missingPartitions.getPartitionsMap().containsKey(partition)) {
+ actionsList = missingPartitions.getPartitionsMap().get(partition);
+ }
+ else {
+ log.warn("HCat Partition [{0}] not found", partition.toString());
+ }
+ return actionsList;
+ }
+
+ private void _createPartitionMapForTable(Map<String, PartitionsGroup>
tableMap, String tableName,
+ PartitionWrapper partition, String actionId) {
+ PartitionsGroup partitions = new PartitionsGroup(
+ new ConcurrentLinkedHashMap.Builder<PartitionWrapper,
WaitingActions>().maximumWeightedCapacity(
+ maxCapacity).build());
+ tableMap.put(tableName, partitions);
+ WaitingActions newActions = new WaitingActions(actionId);
+ partitions.getPartitionsMap().put(partition, newActions);
+ }
+
+ private void _addNewEntry(Map<String, Map<String, PartitionsGroup>>
instanceMap, String prefix, String tableName,
+ PartitionWrapper partition, String actionId) {
+ Map<String, PartitionsGroup> tableMap = new ConcurrentHashMap<String,
PartitionsGroup>();
+ instanceMap.put(prefix, tableMap);
+ _createPartitionMapForTable(tableMap, tableName, partition, actionId);
+ }
+
+}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/HCatURI.java
Thu Nov 8 20:09:12 2012
@@ -85,7 +85,7 @@ public class HCatURI {
}
String[] keyVal = part.split(PARTITION_KEYVAL_SEPARATOR, -1);
if (keyVal.length != 2) {
- throw new URISyntaxException(uri.toString(), "Parition key
value pair is not specified properly in ("
+ throw new URISyntaxException(uri.toString(), "Partition key
value pair is not specified properly in ("
+ part + ")");
}
partitions.put(keyVal[0], keyVal[1]);
@@ -138,11 +138,11 @@ public class HCatURI {
this.partitions = partitions;
}
- public String getParitionValue(String key) {
+ public String getPartitionValue(String key) {
return partitions.get(key);
}
- public String setParition(String key, String value) {
+ public String setPartition(String key, String value) {
return partitions.put(key, value);
}
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionWrapper.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Class to hold the partition related information required by Oozie missing
+ * dependency service
+ */
+public class PartitionWrapper {
+ private String serverName;
+ private String dbName;
+ private String tableName;
+ private Map<String, String> partition;
+ private static String CONCATENATOR = "#";
+
+ public PartitionWrapper(Map<String, String> partition) {
+ this("DEFAULT", "DEFAULT", "DEFAULT", partition);
+ }
+
+ public PartitionWrapper(String server, String db, String table,
Map<String, String> partition) {
+ this.serverName = server;
+ this.dbName = db;
+ this.tableName = table;
+ setPartition(partition);
+ }
+
+ public PartitionWrapper(HCatURI hcatUri) {
+ this(hcatUri.getServer(), hcatUri.getDb(), hcatUri.getTable(),
hcatUri.getPartitionMap());
+ }
+
+ /**
+ * @return the server name
+ */
+ public String getServerName() {
+ return serverName;
+ }
+
+ /**
+ * @param server the instance to set
+ */
+ public void setServerName(String serverName) {
+ this.serverName = serverName;
+ }
+
+ /**
+ * @return the db name
+ */
+ public String getDbName() {
+ return dbName;
+ }
+
+ /**
+ * @param dbName the dbName to set
+ */
+ public void setDbName(String dbName) {
+ this.dbName = dbName;
+ }
+
+ /**
+ * @return the table
+ */
+ public String getTableName() {
+ return tableName;
+ }
+
+ /**
+ * @param table the table to set
+ */
+ public void setTableName(String table) {
+ this.tableName = table;
+ }
+
+ /**
+ * @return the partition
+ */
+ public Map<String, String> getPartition() {
+ return partition;
+ }
+
+ /**
+ * @param partition the partition to set
+ */
+ public void setPartition(Map<String, String> partition) {
+ this.partition = partition;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder partString = new StringBuilder("");
+ Iterator<Map.Entry<String, String>> it =
partition.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, String> partEntry = it.next();
+ partString.append(partEntry.getKey() + "=" + partEntry.getValue()
+ ";");
+ }
+ // adding prefix and removing the trailing ";"
+ return makePrefix(serverName, dbName) + CONCATENATOR + tableName +
CONCATENATOR
+ + partString.substring(0, partString.length() - 1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ PartitionWrapper pw = (PartitionWrapper) obj;
+ Iterator<Map.Entry<String, String>> it1 =
partition.entrySet().iterator();
+ Map<String, String> p = pw.getPartition();
+ boolean equals = true;
+ if (this.serverName.equals(pw.serverName) &&
this.dbName.equals(pw.dbName)
+ && this.tableName.equals(pw.tableName)) {
+ while (it1.hasNext()) {
+ String key = it1.next().getKey();
+ if (!(p.containsKey(key) &&
p.get(key).equals(partition.get(key)))) {
+ equals = false;
+ }
+ }
+ }
+ else {
+ equals = false;
+ }
+ return equals;
+ }
+
+ @Override
+ public int hashCode() {
+ return serverName.hashCode() + dbName.hashCode() +
tableName.hashCode() + partition.hashCode();
+ }
+
+ public static String makePrefix(String serverName, String dbName) {
+ return serverName + CONCATENATOR + dbName;
+ }
+
+}
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/PartitionsGroup.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import java.util.Map;
+
+/**
+ * Group of partitions corresponding to an HCat table in the form of a map in
+ * the missing dependency service cache
+ */
+public class PartitionsGroup {
+ private Map<PartitionWrapper, WaitingActions> partitionsMap;
+
+ public PartitionsGroup(Map<PartitionWrapper, WaitingActions> partitionMap)
{
+ setPartitionsMap(partitionMap);
+ }
+
+ /**
+ * @return the partitionsMap
+ */
+ public Map<PartitionWrapper, WaitingActions> getPartitionsMap() {
+ return partitionsMap;
+ }
+
+ /**
+ * @param partitionsMap the partitionsMap to set
+ */
+ public void setPartitionsMap(Map<PartitionWrapper, WaitingActions>
partitionsMap) {
+ this.partitionsMap = partitionsMap;
+ }
+
+ public void addPartitionAndAction(PartitionWrapper p, WaitingActions a) {
+ partitionsMap.put(p, a);
+ }
+}
Added:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
(added)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.util;
+
+import java.sql.Timestamp;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * The object to store list of waiting actions and timestamp corresponding to a
+ * particular hcat partition
+ */
+public class WaitingActions {
+ private List<String> actions;
+ private Timestamp lastModified;
+
+ /**
+ * Empty (default) constructor
+ */
+ public WaitingActions() {
+ this(new CopyOnWriteArrayList<String>());
+ }
+
+ /**
+ * Constructor that sets list directly
+ *
+ * @param List newActions
+ */
+ WaitingActions(List<String> newActions) {
+ this.setActions(newActions);
+ this.setLastModified(DateUtils.convertDateToTimestamp(new Date()));
+ }
+
+ /**
+ * constructor that sets single action in list
+ *
+ * @param newActionId
+ */
+ public WaitingActions(String actionId) {
+ this();
+ this.getActions().add(actionId);
+ }
+
+ /**
+ * @return the actions
+ */
+ public List<String> getActions() {
+ return actions;
+ }
+
+ /**
+ * @param actions the actions to set
+ */
+ public void setActions(List<String> actions) {
+ this.actions = actions;
+ }
+
+ /**
+ * @return the lastModified
+ */
+ public Timestamp getLastModified() {
+ return lastModified;
+ }
+
+ /**
+ * @param lastModified the lastModified to set
+ */
+ public void setLastModified(Timestamp lastModified) {
+ this.lastModified = lastModified;
+ }
+
+ /**
+ * @param lastModified the lastModified to set
+ */
+ public void setLastModified(Date lastModified) {
+ this.lastModified = DateUtils.convertDateToTimestamp(lastModified);
+ }
+
+ /**
+ * Add actionId to list and update lastModifiedTime
+ *
+ * @param actionId
+ */
+ public void addAndUpdate(String actionId) {
+ getActions().add(actionId);
+ setLastModified(new Date());
+ }
+}
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1407251&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
(added)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
Thu Nov 8 20:09:12 2012
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.service;
+
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test class to test the addition, removal and available operations
+ * on the partition dependencies cache structure
+ */
+public class TestPartitionDependencyManagerService extends XDataTestCase {
+
+ @Before
+ protected void setUp() throws Exception {
+ super.setUp();
+
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME,
"myhcatserver");
+
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME,
"myhcatdb");
+
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY,
"100");
+ Services services = new Services();
+ addServiceToRun(services.getConf(),
PartitionDependencyManagerService.class.getName());
+ services.init();
+ }
+
+ @After
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ /**
+ * Test basic service startup and required structures
+ * @throws MetadataServiceException
+ */
+ @Test
+ public void testBasicService() throws MetadataServiceException {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ assertNotNull(pdms);
+ assertNotNull(pdms.getHCatMap());
+ assertNotNull(pdms.getAvailableMap());
+ }
+
+ /**
+ * Test addition of missing partition into cache
+ *
+ * @throws MetadataServiceException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testAddMissingPartition() throws MetadataServiceException,
URISyntaxException {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String actionId = "myAction";
+ pdms.addMissingPartition(newHCatDependency, actionId);
+
+ HCatURI hcatUri = new HCatURI(newHCatDependency);
+ Map<String, PartitionsGroup> tablePartitionsMap =
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+
hcatUri.getDb()); // clicks
+ assertNotNull(tablePartitionsMap);
+ assertTrue(tablePartitionsMap.containsKey("clicks"));
+ PartitionsGroup missingPartitions =
tablePartitionsMap.get(hcatUri.getTable());
+ assertNotNull(missingPartitions);
+
+
assertEquals(missingPartitions.getPartitionsMap().keySet().iterator().next(),
+ new PartitionWrapper(hcatUri)); // datastamp=12;region=us
+ WaitingActions actions = missingPartitions.getPartitionsMap().get(new
PartitionWrapper(hcatUri));
+ assertNotNull(actions);
+ assertTrue(actions.getActions().contains(actionId));
+ }
+
+ /**
+ * Test removal of partition from cache
+ *
+ * @throws MetadataServiceException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testRemovePartition() throws MetadataServiceException,
URISyntaxException {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String actionId = "myAction";
+ pdms.addMissingPartition(newHCatDependency, actionId);
+
+ HCatURI hcatUri = new HCatURI(newHCatDependency);
+ Map<String, PartitionsGroup> tablePartitionsMap =
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+
hcatUri.getDb()); // clicks
+ assertNotNull(tablePartitionsMap);
+ assertTrue(tablePartitionsMap.containsKey("clicks"));
+ PartitionsGroup missingPartitions =
tablePartitionsMap.get(hcatUri.getTable());
+ assertNotNull(missingPartitions);
+
+ // remove with cascading - OFF
+ pdms.removePartition(newHCatDependency, false);
+
assertFalse(missingPartitions.getPartitionsMap().containsKey(hcatUri.getPartitionMap()));
+
+ pdms.addMissingPartition(newHCatDependency, actionId);
+ assertNotNull(missingPartitions);
+
+ // remove with cascading - ON
+ pdms.removePartition(newHCatDependency);
+ assertFalse(pdms.getHCatMap().containsKey(hcatUri.getTable()));
+ }
+
+ /**
+ * Test partition available function on cache
+ *
+ * @throws MetadataServiceException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testAvailablePartition() throws MetadataServiceException,
URISyntaxException {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String actionId = "myAction";
+ pdms.addMissingPartition(newHCatDependency, actionId);
+
+ HCatURI hcatUri = new HCatURI(newHCatDependency);
+ Map<String, PartitionsGroup> tablePartitionsMap =
pdms.getHCatMap().get(hcatUri.getServer() + "#" +
+
hcatUri.getDb()); // clicks
+ assertNotNull(tablePartitionsMap);
+ assertTrue(tablePartitionsMap.containsKey("clicks"));
+ PartitionsGroup missingPartitions =
tablePartitionsMap.get(hcatUri.getTable());
+ assertNotNull(missingPartitions);
+
+ pdms.partitionAvailable(newHCatDependency);
+ Map<String, List<PartitionWrapper>> availMap = pdms.getAvailableMap();
+ assertNotNull(availMap);
+ assertTrue(availMap.containsKey(actionId)); //found in 'available'
cache
+ assertFalse(pdms.getHCatMap().containsKey(hcatUri.getTable()));
//removed from 'missing' cache
+
//cascade - ON
+ assertEquals(availMap.get(actionId).get(0), new
PartitionWrapper(hcatUri));
+ }
+}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
Thu Nov 8 20:09:12 2012
@@ -1185,6 +1185,18 @@ public abstract class XDataTestCase exte
conf.set(Services.CONF_SERVICE_CLASSES, new String(builder));
}
+ /**
+ * Add a particular service class to be run in addition to default ones
+ * @param conf
+ * @param serviceName
+ */
+ protected void addServiceToRun(Configuration conf, String serviceName) {
+ String classes = conf.get(Services.CONF_SERVICE_CLASSES);
+ StringBuilder builder = new StringBuilder(classes);
+ builder.append("," + serviceName);
+ conf.set(Services.CONF_SERVICE_CLASSES, new String(builder));
+ }
+
/**
* Adds the db records for the Bulk Monitor tests
*/
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/util/TestHCatURI.java
Thu Nov 8 20:09:12 2012
@@ -37,8 +37,8 @@ public class TestHCatURI {
assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
assertEquals(uri.getDb(),"mydb");
assertEquals(uri.getTable(),"clicks");
- assertEquals(uri.getParitionValue("datastamp"),"12");
- assertEquals(uri.getParitionValue("region"),"us");
+ assertEquals(uri.getPartitionValue("datastamp"),"12");
+ assertEquals(uri.getPartitionValue("region"),"us");
}
@@ -60,8 +60,8 @@ public class TestHCatURI {
assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
assertEquals(uri.getDb(),"mydb");
assertEquals(uri.getTable(),"clicks");
- assertEquals(uri.getParitionValue("datastamp"),"12");
- assertEquals(uri.getParitionValue("region"),"us");
+ assertEquals(uri.getPartitionValue("datastamp"),"12");
+ assertEquals(uri.getPartitionValue("region"),"us");
}
@Test
@@ -82,8 +82,8 @@ public class TestHCatURI {
assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
assertEquals(uri.getDb(),"mydb");
assertEquals(uri.getTable(),"clicks");
- assertEquals(uri.getParitionValue("datastamp"),"12");
- assertEquals(uri.getParitionValue("region"),"us");
+ assertEquals(uri.getPartitionValue("datastamp"),"12");
+ assertEquals(uri.getPartitionValue("region"),"us");
}
@Test
@@ -104,8 +104,8 @@ public class TestHCatURI {
assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
assertEquals(uri.getDb(),"mydb");
assertEquals(uri.getTable(),"clicks");
- assertEquals(uri.getParitionValue("datastamp"),"12");
- assertEquals(uri.getParitionValue("region"),"us");
+ assertEquals(uri.getPartitionValue("datastamp"),"12");
+ assertEquals(uri.getPartitionValue("region"),"us");
}
@@ -127,8 +127,8 @@ public class TestHCatURI {
assertEquals(uri.getServer(),"hcat.yahoo.com:5080");
assertEquals(uri.getDb(),"mydb");
assertEquals(uri.getTable(),"clicks");
- assertEquals(uri.getParitionValue("datastamp"),"12");
- assertEquals(uri.getParitionValue("region"),"us");
+ assertEquals(uri.getPartitionValue("datastamp"),"12");
+ assertEquals(uri.getPartitionValue("region"),"us");
}
@Test(expected = URISyntaxException.class)
Modified: oozie/branches/hcat-intre/pom.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/pom.xml?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/pom.xml (original)
+++ oozie/branches/hcat-intre/pom.xml Thu Nov 8 20:09:12 2012
@@ -638,6 +638,12 @@
<version>1.8.5</version>
</dependency>
+ <dependency>
+ <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+ <artifactId>concurrentlinkedhashmap-lru</artifactId>
+ <version>1.3.1</version>
+ </dependency>
+
<!-- examples -->
<dependency>
<groupId>commons-httpclient</groupId>
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1407251&r1=1407250&r2=1407251&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Nov 8 20:09:12 2012
@@ -1,4 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+
+OOZIE-1039 Implement the Missing Dependency structure for HCat partitions
(mona via mohammad)
OOZIE-1028 Add EL function to allow date ranges to be used for dataset ranges
(rkanter via tucu)
OOZIE-1045 Parameterize <unresolved-instances> tag currently hardcoded
(egashira via mona)
OOZIE-1029 MiniMRCluster fails to start when used against YARN (rkanter via
tucu)