Author: kamrul
Date: Tue Nov 20 01:11:09 2012
New Revision: 1411494
URL: http://svn.apache.org/viewvc?rev=1411494&view=rev
Log:
OOZIE-1043 Add logic to register to Missing Dependency Structure in coord
action materialization (ryota via mohammad)
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/release-log.txt
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
Tue Nov 20 01:11:09 2012
@@ -20,7 +20,9 @@ package org.apache.oozie.command.coord;
import java.io.StringReader;
import java.util.Calendar;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
@@ -33,13 +35,16 @@ import org.apache.oozie.coord.CoordUtils
import org.apache.oozie.coord.CoordinatorJobException;
import org.apache.oozie.coord.SyncCoordAction;
import org.apache.oozie.coord.TimeUnit;
+import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.ELEvaluator;
+import org.apache.oozie.util.HCatURI;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.apache.oozie.util.XLog;
public class CoordCommandUtils {
public static int CURRENT = 0;
@@ -53,6 +58,7 @@ public class CoordCommandUtils {
/**
* parse a function like coord:latest(n)/future() and return the 'n'.
* <p/>
+ *
* @param function
* @param event
* @param appInst
@@ -198,8 +204,8 @@ public class CoordCommandUtils {
if (startCal != null && endCal != null) {
List<Integer> expandedFreqs =
CoordELFunctions.expandOffsetTimes(startCal, endCal, eval);
for (int i = expandedFreqs.size() - 1; i >= 0; i--) {
- String matInstance = materializeInstance(event,
"${coord:offset(" + expandedFreqs.get(i) + ", \"MINUTE\")}",
- appInst,
conf, eval);
+ String matInstance = materializeInstance(event,
"${coord:offset(" + expandedFreqs.get(i)
+ + ", \"MINUTE\")}", appInst, conf, eval);
if (matInstance == null || matInstance.length() == 0) {
// Earlier than dataset's initial instance
break;
@@ -220,7 +226,8 @@ public class CoordCommandUtils {
if (funcType == CURRENT) {
// Everything could be resolved NOW. no latest() ELs
for (int i = endIndex; i >= startIndex; i--) {
- String matInstance = materializeInstance(event,
"${coord:current(" + i + ")}", appInst, conf, eval);
+ String matInstance = materializeInstance(event,
"${coord:current(" + i + ")}", appInst, conf,
+ eval);
if (matInstance == null || matInstance.length() == 0) {
// Earlier than dataset's initial instance
break;
@@ -240,7 +247,8 @@ public class CoordCommandUtils {
instances.append("${coord:latest(").append(startIndex).append(")}");
}
else if (funcType == FUTURE) {
-
instances.append("${coord:future(").append(startIndex).append(",'").append(endRestArg).append("')}");
+
instances.append("${coord:future(").append(startIndex).append(",'").append(endRestArg)
+ .append("')}");
}
}
}
@@ -325,7 +333,7 @@ public class CoordCommandUtils {
String doneFlag = CoordUtils.getDoneFlag(doneFlagElement);
for (int i = 0; i < instanceList.length; i++) {
- if(instanceList[i].trim().length() == 0) {
+ if (instanceList[i].trim().length() == 0) {
continue;
}
int funcType = getFuncType(instanceList[i]);
@@ -419,7 +427,11 @@ public class CoordCommandUtils {
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
- StringBuffer dependencyList = new StringBuffer();
+ HashMap<String, StringBuffer> dependencyList = new HashMap<String,
StringBuffer>();
+ StringBuffer pushDepsList = new StringBuffer();
+ StringBuffer pullDepsList = new StringBuffer();
+ dependencyList.put("push", pushDepsList);
+ dependencyList.put("pull", pullDepsList);
Element inputList = eAction.getChild("input-events",
eAction.getNamespace());
List<Element> dataInList = null;
@@ -432,9 +444,11 @@ public class CoordCommandUtils {
List<Element> dataOutList = null;
if (outputList != null) {
dataOutList = outputList.getChildren("data-out",
eAction.getNamespace());
- StringBuffer tmp = new StringBuffer();
// no dependency checks
- materializeDataEvents(dataOutList, appInst, conf, tmp);
+ HashMap<String, StringBuffer> emptyDepsList = new HashMap<String,
StringBuffer>();
+ emptyDepsList.put("pull", new StringBuffer());
+ emptyDepsList.put("push", new StringBuffer());
+ materializeDataEvents(dataOutList, appInst, conf, emptyDepsList);
}
eAction.removeAttribute("start");
@@ -443,8 +457,9 @@ public class CoordCommandUtils {
eAction.setAttribute("action-nominal-time",
DateUtils.formatDateOozieTZ(nominalTime));
eAction.setAttribute("action-actual-time",
DateUtils.formatDateOozieTZ(actualTime));
- boolean isSla =
CoordCommandUtils.materializeSLA(eAction.getChild("action",
eAction.getNamespace()).getChild(
- "info", eAction.getNamespace("sla")), nominalTime, conf);
+ boolean isSla = CoordCommandUtils.materializeSLA(
+ eAction.getChild("action",
eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")),
+ nominalTime, conf);
// Setting up action bean
actionBean.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -455,7 +470,8 @@ public class CoordCommandUtils {
actionBean.setLastModifiedTime(new Date());
actionBean.setStatus(CoordinatorAction.Status.WAITING);
actionBean.setActionNumber(instanceCount);
- actionBean.setMissingDependencies(dependencyList.toString());
+
actionBean.setMissingDependencies(dependencyList.get("pull").toString());
+
actionBean.setPushMissingDependencies(dependencyList.get("push").toString());
actionBean.setNominalTime(nominalTime);
if (isSla == true) {
actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -473,7 +489,8 @@ public class CoordCommandUtils {
}
else {
String action = XmlUtils.prettyPrint(eAction).toString();
- CoordActionInputCheckXCommand coordActionInput = new
CoordActionInputCheckXCommand(actionBean.getId(), actionBean.getJobId());
+ CoordActionInputCheckXCommand coordActionInput = new
CoordActionInputCheckXCommand(actionBean.getId(),
+ actionBean.getJobId());
StringBuilder actionXml = new StringBuilder(action);
StringBuilder existList = new StringBuilder();
StringBuilder nonExistList = new StringBuilder();
@@ -496,13 +513,23 @@ public class CoordCommandUtils {
* @throws Exception
*/
public static void materializeDataEvents(List<Element> events,
SyncCoordAction appInst, Configuration conf,
- StringBuffer dependencyList) throws Exception {
+ Map<String, StringBuffer> dependencyList) throws Exception {
if (events == null) {
return;
}
- StringBuffer unresolvedList = new StringBuffer();
+ HashMap<String, StringBuffer> unresolvedList = new HashMap<String,
StringBuffer>();
+ unresolvedList.put("push", new StringBuffer());
+ unresolvedList.put("pull", new StringBuffer());
+
for (Element event : events) {
+ Element uri = event.getChild("dataset", event.getNamespace())
+ .getChild("uri-template", event.getNamespace());
+ String pullOrPush = "pull";
+ String uriTemplate = uri.getText();
+ if (uriTemplate != null && HCatURI.isHcatURI(uriTemplate)) {
+ pullOrPush = "push";
+ }
StringBuilder instances = new StringBuilder();
ELEvaluator eval =
CoordELEvaluator.createInstancesELEvaluator(event, appInst, conf);
// Handle list of instance tag
@@ -510,20 +537,23 @@ public class CoordCommandUtils {
// Handle start-instance and end-instance
resolveInstanceRange(event, instances, appInst, conf, eval);
// Separate out the unresolved instances
- separateResolvedAndUnresolved(event, instances, dependencyList);
+ separateResolvedAndUnresolved(event, instances,
dependencyList.get(pullOrPush));
String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG,
event.getNamespace());
if (tmpUnresolved != null) {
- if (unresolvedList.length() > 0) {
- unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
+ if (unresolvedList.get(pullOrPush).length() > 0) {
+
unresolvedList.get(pullOrPush).append(CoordELFunctions.INSTANCE_SEPARATOR);
}
- unresolvedList.append(tmpUnresolved);
+ unresolvedList.get(pullOrPush).append(tmpUnresolved);
}
}
- if (unresolvedList.length() > 0) {
- dependencyList.append(RESOLVED_UNRESOLVED_SEPARATOR);
- dependencyList.append(unresolvedList);
+ if (unresolvedList.get("push").length() > 0) {
+ dependencyList.get("push").append(RESOLVED_UNRESOLVED_SEPARATOR);
+ dependencyList.get("push").append(unresolvedList.get("push"));
+ }
+ if (unresolvedList.get("pull").length() > 0) {
+ dependencyList.get("pull").append(RESOLVED_UNRESOLVED_SEPARATOR);
+ dependencyList.get("pull").append(unresolvedList.get("pull"));
}
- return;
}
/**
@@ -548,4 +578,38 @@ public class CoordCommandUtils {
return resolved.toString();
}
+ /**
+ * Register partition to PartitionDependencyManagerService
+ *
+ * @param actionBean
+ * @throws Exception
+ */
+ public static void registerPartition(CoordinatorActionBean actionBean)
throws Exception {
+
+ String resolved =
getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
+ new StringBuilder());
+ if (resolved.length() == 0)
+ return;
+ String[] resolvedList =
resolved.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+
+ // always register action ID to missing partition in PDMS before
+ // asking HCat to avoid corner case where JMS notification msg
+ // arrives while asking HCat for existence of partition
+ if (resolvedList != null && resolvedList.length > 0) {
+ pdms.addMissingPartitions(resolvedList, actionBean.getId());
+ // after Hcat answers, two things need to be done
+ // 1. if partition exists, remove actionId from missing
+ // partition in PDMS
+ // --> pdms.removeActionFromMissingPartitions(uri,
+ // actionBean.getId());
+ // 2. if partition not exists, no-op
+ // we probably delegate these tasks to other separate command,
+ // so end up with queuing the new command
+ }
+ else {
+ XLog.getLog(CoordCommandUtils.class).info("no resolved push data
dependency");
+ }
+
+ }
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
Tue Nov 20 01:11:09 2012
@@ -329,6 +329,7 @@ public class CoordMaterializeTransitionX
if (!dryrun) {
storeToDB(actionBean, action); // Storing to table
+ CoordCommandUtils.registerPartition(actionBean); // Register
partition to PDMS
}
else {
actionStrings.append("action for new instance");
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
Tue Nov 20 01:11:09 2012
@@ -52,14 +52,14 @@ public class PartitionDependencyManagerS
private static XLog log;
/*
- * Top-level map key = concatenated identifier for hcatServer + hcatDB
- * value = table map (key = tableName string, value = PartitionsGroup
+ * Top-level map key = concatenated identifier for hcatServer + hcatDB
value
+ * = table map (key = tableName string, value = PartitionsGroup
*/
private Map<String, Map<String, PartitionsGroup>> hcatInstanceMap;
/*
- * Map denoting actions and corresponding 'available' partitions
- * key = coordinator actionId, value = available partitions
+ * Map denoting actions and corresponding 'available' partitions key =
+ * coordinator actionId, value = available partitions
*/
private Map<String, List<PartitionWrapper>> availMap;
@@ -125,7 +125,36 @@ public class PartitionDependencyManagerS
return ret;
}
- /**
+ /**
+ * Remove an action from missing partition map
+ *
+ * @param hcatURI
+ * @param actionId
+ * @return
+ * @throws MetadataServiceException
+ */
+ public boolean removeActionFromMissingPartitions(String hcatURI, String
actionId) throws MetadataServiceException {
+ boolean ret = false;
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1503,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ List<String> actions = _getActionsForPartition(partition);
+ if (actions != null && actions.size() != 0) {
+ ret = actions.remove(actionId);
+ }
+ else {
+ log.info("No waiting actions in the partition [{0}], no-ops",
partition);
+ }
+ return ret;
+ }
+
+ /**
* Adding missing partition entry specified by PartitionWrapper object
*
* @param partition
@@ -183,14 +212,21 @@ public class PartitionDependencyManagerS
addMissingPartition(partition, actionId);
}
+ public void addMissingPartitions(String[] hcatURIs, String actionId)
throws MetadataServiceException {
+ for (String uri : hcatURIs) {
+ if (uri != null && uri.length() > 0) {
+ addMissingPartition(uri, actionId);
+ }
+ }
+ }
+
/**
- * Remove partition entry specified by PartitionWrapper object
- * and cascading delete indicator
+ * Remove partition entry specified by PartitionWrapper object and
cascading
+ * delete indicator
*
* @param partition
* @param cascade
- * @return true if partition was successfully removed
- * false otherwise
+ * @return true if partition was successfully removed false otherwise
*/
public boolean removePartition(PartitionWrapper partition, boolean
cascade) {
String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
@@ -227,13 +263,12 @@ public class PartitionDependencyManagerS
}
/**
- * Remove partition entry specified by HCat URI and
- * cascading delete indicator
+ * Remove partition entry specified by HCat URI and cascading delete
+ * indicator
*
* @param hcatURI
* @param cascade
- * @return true if partition was successfully removed
- * false otherwise
+ * @return true if partition was successfully removed false otherwise
* @throws MetadataServiceException
*/
public boolean removePartition(String hcatURI, boolean cascade) throws
MetadataServiceException {
@@ -250,12 +285,11 @@ public class PartitionDependencyManagerS
}
/**
- * Remove partition entry specified by HCat URI with
- * default cascade mode - TRUE
+ * Remove partition entry specified by HCat URI with default cascade mode -
+ * TRUE
*
* @param hcatURI
- * @return true if partition was successfully removed
- * false otherwise
+ * @return true if partition was successfully removed false otherwise
* @throws MetadataServiceException
*/
public boolean removePartition(String hcatURI) throws
MetadataServiceException {
@@ -272,59 +306,45 @@ public class PartitionDependencyManagerS
}
/**
- * Move partition entry specified by ParitionWrapper object
- * from 'missing' to 'available' map
+ * Move partition entry specified by ParitionWrapper object from 'missing'
+ * to 'available' map
*
* @param partition
- * @return true if partition was successfully moved to availableMap
- * false otherwise
+ * @return true if partition was successfully moved to availableMap false
+ * otherwise
*/
public boolean partitionAvailable(PartitionWrapper partition) {
- String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
- if (hcatInstanceMap.containsKey(prefix)) {
- Map<String, PartitionsGroup> tableMap =
hcatInstanceMap.get(prefix);
- String tableName = partition.getTableName();
- PartitionsGroup missingPartitions = null;
- if (tableMap.containsKey(tableName)) {
- WaitingActions actions = _getActionsForPartition(tableMap,
tableName, missingPartitions, partition);
- if(actions != null) {
- List<String> actionsList = actions.getActions();
- Iterator<String> it = actionsList.iterator();
- while (it.hasNext()) { // add actions into separate entries
- String actionId = it.next();
- if (availMap.containsKey(actionId)) {
- // actionId exists, so append partition
- availMap.get(actionId).add(partition);
- }
- else { // new entry
- availMap.put(actionId,
- new
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
- }
- }
- removePartition(partition, true);
- return true;
+
+ List<String> actionsList = _getActionsForPartition(partition);
+ if (actionsList != null) {
+ Iterator<String> it = actionsList.iterator();
+ while (it.hasNext()) { // add actions into separate entries
+ String actionId = it.next();
+ if (availMap.containsKey(actionId)) {
+ // actionId exists, so append partition
+ availMap.get(actionId).add(partition);
}
- else {
- log.warn("partitionAvailable: HCat Partition [{0}] not
found", partition.toString());
+ else { // new entry
+ availMap.put(actionId, new
CopyOnWriteArrayList<PartitionWrapper>(Arrays.asList((partition))));
}
}
- else {
- log.warn("HCat table [{0}] not found", tableName);
- }
+ removePartition(partition, true);
+ return true;
}
else {
- log.warn("HCat instance [{0}] not found", prefix);
+ log.warn("No coord actions waitings for HCat Partition [{0}]",
partition.toString());
}
+
return false;
}
/**
- * Move partition entry specified by HCat URI from 'missing' to
- * 'available' map
+ * Move partition entry specified by HCat URI from 'missing' to 'available'
+ * map
*
* @param hcatURI
- * @return true if partition was successfully moved to availableMap
- * false otherwise
+ * @return true if partition was successfully moved to availableMap false
+ * otherwise
* @throws MetadataServiceException
*/
public boolean partitionAvailable(String hcatURI) throws
MetadataServiceException {
@@ -363,17 +383,40 @@ public class PartitionDependencyManagerS
}
}
- private WaitingActions _getActionsForPartition(Map<String,
PartitionsGroup> tableMap, String tableName,
- PartitionsGroup missingPartitions, PartitionWrapper partition) {
- WaitingActions actionsList = null;
- missingPartitions = tableMap.get(tableName);
- if (missingPartitions != null &&
missingPartitions.getPartitionsMap().containsKey(partition)) {
- actionsList = missingPartitions.getPartitionsMap().get(partition);
+ private List<String> _getActionsForPartition(PartitionWrapper partition) {
+ String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
+ if (hcatInstanceMap.containsKey(prefix)) {
+ Map<String, PartitionsGroup> tableMap =
hcatInstanceMap.get(prefix);
+ String tableName = partition.getTableName();
+ if (tableMap.containsKey(tableName)) {
+ PartitionsGroup missingPartitions = tableMap.get(tableName);
+ if (missingPartitions != null) {
+ if
(missingPartitions.getPartitionsMap().containsKey(partition)) {
+ WaitingActions actions =
missingPartitions.getPartitionsMap().get(partition);
+ if (actions != null) {
+ return actions.getActions();
+ }
+ else {
+ log.warn("No coord actions waitings for HCat
Partition [{0}]", partition.toString());
+ }
+ }
+ else {
+ log.warn("HCat Partition [{0}] not found",
partition.toString());
+ }
+ }
+ else {
+ log.warn("MissingPartitions not created in HCat table
[{0}]", tableName);
+ }
+ }
+ else {
+ log.warn("HCat table [{0}] not found", tableName);
+ }
}
else {
- log.warn( " _getActionsForPartition: HCat Partition [{0}] not
found", partition.toString());
+ log.warn("HCat instance [{0}] not found", prefix);
}
- return actionsList;
+
+ return null;
}
private void _createPartitionMapForTable(Map<String, PartitionsGroup>
tableMap, String tableName,
Added:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java?rev=1411494&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
(added)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/coord/TestCoordCommandUtils.java
Tue Nov 20 01:11:09 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.coord;
+
+import java.util.Map;
+import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.command.coord.CoordCommandUtils;
+import org.apache.oozie.service.PartitionDependencyManagerService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XDataTestCase;
+import org.apache.oozie.util.HCatURI;
+import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.PartitionsGroup;
+import org.apache.oozie.util.WaitingActions;
+
+public class TestCoordCommandUtils extends XDataTestCase {
+ private Services services;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ super.setUp();
+
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_SERVER_NAME,
"myhcatserver");
+
setSystemProperty(PartitionDependencyManagerService.HCAT_DEFAULT_DB_NAME,
"myhcatdb");
+
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY,
"100");
+ Services services = new Services();
+ addServiceToRun(services.getConf(),
PartitionDependencyManagerService.class.getName());
+ services.init();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testRegisterPartition() throws Exception {
+
+ String hcatUriStr1 =
"hcat://hcatserver.com:4080/mydb/mytable/?datestamp=1234®ion=us";
+ String hcatUriStr2 =
"hcat://hcatserver.com:4080/mydb/mytable/?click=1234";
+ CoordinatorActionBean action1 = new CoordinatorActionBean();
+ action1.setId("1");
+ StringBuffer st = new StringBuffer();
+ st.append(hcatUriStr1);
+ st.append(CoordELFunctions.INSTANCE_SEPARATOR);
+ st.append(hcatUriStr2);
+ st.append(CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR);
+ st.append("${coord:latest(0)}");
+ action1.setPushMissingDependencies(st.toString());
+ CoordCommandUtils.registerPartition(action1);
+
+ HCatURI hcatUri1 = new HCatURI(hcatUriStr1);
+ HCatURI hcatUri2 = new HCatURI(hcatUriStr2);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+ Map<String, Map<String, PartitionsGroup>> hcatInstanceMap =
pdms.getHCatMap();
+ Map<String, PartitionsGroup> tablePartitionsMap =
hcatInstanceMap.get(PartitionWrapper.makePrefix(
+ hcatUri1.getServer(), hcatUri1.getDb()));
+ // check tablePartitionMap exist for the table
+ assertTrue(tablePartitionsMap.containsKey(hcatUri1.getTable()));
+ PartitionsGroup missingPartitions =
tablePartitionsMap.get(hcatUri1.getTable());
+ WaitingActions actions1 = missingPartitions.getPartitionsMap().get(new
PartitionWrapper(hcatUri1));
+ WaitingActions actions2 = missingPartitions.getPartitionsMap().get(new
PartitionWrapper(hcatUri2));
+ // check actionID is included in WaitingAction
+ assertTrue(actions1.getActions().contains(action1.getId()));
+ assertTrue(actions2.getActions().contains(action1.getId()));
+ }
+
+}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
Tue Nov 20 01:11:09 2012
@@ -78,7 +78,7 @@ public class TestPartitionDependencyMana
public void testAddMissingPartition() throws MetadataServiceException,
URISyntaxException {
Services services = Services.get();
PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
- String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
@@ -107,7 +107,7 @@ public class TestPartitionDependencyMana
public void testRemovePartition() throws MetadataServiceException,
URISyntaxException {
Services services = Services.get();
PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
- String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
@@ -141,7 +141,7 @@ public class TestPartitionDependencyMana
public void testAvailablePartition() throws MetadataServiceException,
URISyntaxException {
Services services = Services.get();
PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
- String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12;region=us";
+ String newHCatDependency =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
String actionId = "myAction";
pdms.addMissingPartition(newHCatDependency, actionId);
@@ -161,4 +161,35 @@ public class TestPartitionDependencyMana
//cascade - ON
assertEquals(availMap.get(actionId).get(0), new
PartitionWrapper(hcatUri));
}
+
+ /**
+ * Test removal of action ID from missing partition
+ *
+ * @throws MetadataServiceException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testRemoveActionFromMissingPartition() throws
MetadataServiceException, URISyntaxException {
+ Services services = Services.get();
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ String newHCatDependency1 =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12";
+ String newHCatDependency2 =
"hcat://hcat.yahoo.com:5080/mydb/clicks/?datastamp=12®ion=us";
+ String actionId1 = "1";
+ String actionId2 = "2";
+ pdms.addMissingPartition(newHCatDependency1, actionId1);
+ pdms.addMissingPartition(newHCatDependency2, actionId2);
+ // remove newHCatDependency2
+ pdms.removeActionFromMissingPartitions(newHCatDependency2, actionId2);
+
+ HCatURI hcatUri = new HCatURI(newHCatDependency1);
+ String prefix = PartitionWrapper.makePrefix(hcatUri.getServer(),
hcatUri.getDb());
+ Map<String, PartitionsGroup> tablePartitionsMap =
pdms.getHCatMap().get(prefix);
+ PartitionsGroup missingPartitions =
tablePartitionsMap.get(hcatUri.getTable());
+ assertNotNull(missingPartitions);
+
+ WaitingActions actions = missingPartitions.getPartitionsMap().get(new
PartitionWrapper(hcatUri));
+ assertNotNull(actions);
+ assertTrue(actions.getActions().contains(actionId1));
+ assertFalse(actions.getActions().contains(actionId2));
+ }
}
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1411494&r1=1411493&r2=1411494&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Tue Nov 20 01:11:09 2012
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1043 Add logic to register to Missing Dependency Structure in coord
action materialization (ryota via mohammad)
OOZIE-1061 Add new EL functions to retrieve HCatalog server, DB and table
name(mohammad)
OOZIE-1056 Command to update push-based dependency (mohammad)
OOZIE-1059 Add static method to create URI String in HCatURI(ryota via
mohammad)