Author: virag
Date: Thu Jan 10 20:37:15 2013
New Revision: 1431622
URL: http://svn.apache.org/viewvc?rev=1431622&view=rev
Log:
OOZIE-1156 Make all the latest/future instances as pull dependences (virag)
Added:
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/release-log.txt
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdatePushMissingDependency.java
Thu Jan 10 20:37:15 2013
@@ -6,14 +6,17 @@ import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.MetadataServiceException;
@@ -21,12 +24,14 @@ import org.apache.oozie.service.Partitio
import org.apache.oozie.service.Services;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.PartitionWrapper;
+import org.apache.oozie.util.StatusUtils;
public class CoordActionUpdatePushMissingDependency extends
CoordinatorXCommand<Void> {
private String actionId;
private JPAService jpaService = null;
private CoordinatorActionBean coordAction = null;
+ private CoordinatorJobBean coordJob = null;
public CoordActionUpdatePushMissingDependency(String actionId) {
super("coord_action_push_md", "coord_action_push_md", 0);
@@ -158,12 +163,16 @@ public class CoordActionUpdatePushMissin
return getName() + "_" + actionId;
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#eagerLoadState()
+ */
@Override
- protected void loadState() throws CommandException {
+ protected void eagerLoadState() throws CommandException {
try {
jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
coordAction = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(actionId));
+ coordJob = jpaService.execute(new
CoordJobGetJPAExecutor(coordAction.getJobId()));
LogUtils.setLogInfo(coordAction, logInfo);
}
else {
@@ -176,14 +185,47 @@ public class CoordActionUpdatePushMissin
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
+ */
@Override
- protected void verifyPrecondition() throws CommandException,
PreconditionException {
+ protected void eagerVerifyPrecondition() throws CommandException,
PreconditionException {
if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
throw new PreconditionException(ErrorCode.E1100, "[" + actionId
- + "]::CoordActionInputCheck:: Ignoring action. Should be
in WAITING state, but state="
+ + "]::CoordActionUpdatePushDependency:: Ignoring action.
Should be in WAITING state, but state="
+ coordAction.getStatus());
}
- // TODO: check the parent coordinator job?
+ // if eligible to do action input check when running with backward
+ // support is true
+ if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+ return;
+ }
+
+ if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus()
!= Job.Status.RUNNINGWITHERROR
+ && coordJob.getStatus() != Job.Status.PAUSED &&
coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordActionUpdatePushDependency:: Ignoring action."
+ + " Coordinator job is not in
RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+ + coordJob.getStatus());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#loadState()
+ */
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#verifyPrecondition()
+ */
+ @Override
+ protected void verifyPrecondition() throws CommandException,
PreconditionException {
}
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordCommandUtils.java
Thu Jan 10 20:37:15 2013
@@ -286,11 +286,11 @@ public class CoordCommandUtils {
* @param instances
* @throws Exception
*/
- public static StringBuffer separateResolvedAndUnresolved(Element event,
StringBuilder instances)
+ public static StringBuilder separateResolvedAndUnresolved(Element event,
StringBuilder instances)
throws Exception {
StringBuilder unresolvedInstances = new StringBuilder();
StringBuilder urisWithDoneFlag = new StringBuilder();
- StringBuffer depList = new StringBuffer();
+ StringBuilder depList = new StringBuilder();
String uris = createEarlyURIs(event, instances.toString(),
unresolvedInstances, urisWithDoneFlag);
if (uris.length() > 0) {
Element uriInstance = new Element("uris", event.getNamespace());
@@ -426,28 +426,20 @@ public class CoordCommandUtils {
appInst.setTimeZone(DateUtils.getTimeZone(eAction.getAttributeValue("timezone")));
appInst.setEndOfDuration(TimeUnit.valueOf(eAction.getAttributeValue("end_of_duration")));
- HashMap<String, StringBuffer> dependencyList = new HashMap<String,
StringBuffer>();
- StringBuffer pushDepsList = new StringBuffer();
- StringBuffer pullDepsList = new StringBuffer();
- dependencyList.put("push", pushDepsList);
- dependencyList.put("pull", pullDepsList);
+ Map<String, StringBuilder> dependencyMap = new HashMap<String,
StringBuilder>();
Element inputList = eAction.getChild("input-events",
eAction.getNamespace());
List<Element> dataInList = null;
if (inputList != null) {
dataInList = inputList.getChildren("data-in",
eAction.getNamespace());
- materializeDataEvents(dataInList, appInst, conf, dependencyList);
+ dependencyMap = materializeDataEvents(dataInList, appInst, conf);
}
Element outputList = eAction.getChild("output-events",
eAction.getNamespace());
List<Element> dataOutList = null;
if (outputList != null) {
dataOutList = outputList.getChildren("data-out",
eAction.getNamespace());
- // no dependency checks
- HashMap<String, StringBuffer> emptyDepsList = new HashMap<String,
StringBuffer>();
- emptyDepsList.put("pull", new StringBuffer());
- emptyDepsList.put("push", new StringBuffer());
- materializeDataEvents(dataOutList, appInst, conf, emptyDepsList);
+ materializeDataEvents(dataOutList, appInst, conf);
}
eAction.removeAttribute("start");
@@ -469,8 +461,14 @@ public class CoordCommandUtils {
actionBean.setLastModifiedTime(new Date());
actionBean.setStatus(CoordinatorAction.Status.WAITING);
actionBean.setActionNumber(instanceCount);
-
actionBean.setMissingDependencies(dependencyList.get("pull").toString());
-
actionBean.setPushMissingDependencies(dependencyList.get("push").toString());
+ StringBuilder sbPull = dependencyMap.get("pull");
+ if (sbPull != null) {
+ actionBean.setMissingDependencies(sbPull.toString());
+ }
+ StringBuilder sbPush = dependencyMap.get("push");
+ if (sbPush != null) {
+ actionBean.setPushMissingDependencies(sbPush.toString());
+ }
actionBean.setNominalTime(nominalTime);
if (isSla == true) {
actionBean.setSlaXml(XmlUtils.prettyPrint(
@@ -511,16 +509,14 @@ public class CoordCommandUtils {
* @param conf
* @throws Exception
*/
- public static void materializeDataEvents(List<Element> events,
SyncCoordAction appInst, Configuration conf,
- Map<String, StringBuffer> dependencyList) throws Exception {
+ public static Map<String, StringBuilder>
materializeDataEvents(List<Element> events, SyncCoordAction appInst,
Configuration conf
+ ) throws Exception {
if (events == null) {
- return;
+ return null;
}
- HashMap<String, StringBuffer> unresolvedList = new HashMap<String,
StringBuffer>();
- unresolvedList.put("push", new StringBuffer());
- unresolvedList.put("pull", new StringBuffer());
-
+ StringBuilder unresolvedList = new StringBuilder();
+ Map<String, StringBuilder> dependencyMap = new HashMap<String,
StringBuilder>();
URIHandlerService uriService =
Services.get().get(URIHandlerService.class);
for (Element event : events) {
@@ -535,29 +531,31 @@ public class CoordCommandUtils {
// Handle start-instance and end-instance
resolveInstanceRange(event, instances, appInst, conf, eval);
// Separate out the unresolved instances
- StringBuffer depList = separateResolvedAndUnresolved(event,
instances);
+ StringBuilder depList = separateResolvedAndUnresolved(event,
instances);
URI baseURI = uriService.getAuthorityWithScheme(uriTemplate);
URIHandler handler = uriService.getURIHandler(baseURI);
if
(handler.getDependencyType(baseURI).equals(DependencyType.PUSH)) {
pullOrPush = "push";
}
- dependencyList.put(pullOrPush, depList);
+ dependencyMap.put(pullOrPush, depList);
String tmpUnresolved = event.getChildTextTrim(UNRESOLVED_INST_TAG,
event.getNamespace());
if (tmpUnresolved != null) {
- if (unresolvedList.get(pullOrPush).length() > 0) {
-
unresolvedList.get(pullOrPush).append(CoordELFunctions.INSTANCE_SEPARATOR);
+ if (unresolvedList.length() > 0) {
+ unresolvedList.append(CoordELFunctions.INSTANCE_SEPARATOR);
}
- unresolvedList.get(pullOrPush).append(tmpUnresolved);
+ unresolvedList.append(tmpUnresolved);
}
}
- if (unresolvedList.get("push").length() > 0) {
- dependencyList.get("push").append(RESOLVED_UNRESOLVED_SEPARATOR);
- dependencyList.get("push").append(unresolvedList.get("push"));
- }
- if (unresolvedList.get("pull").length() > 0) {
- dependencyList.get("pull").append(RESOLVED_UNRESOLVED_SEPARATOR);
- dependencyList.get("pull").append(unresolvedList.get("pull"));
+ if (unresolvedList.length() > 0) {
+ StringBuilder sb = dependencyMap.get("pull");
+ if (sb == null) {
+ dependencyMap.put("pull", new
StringBuilder(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList));
+ }
+ else {
+
sb.append(RESOLVED_UNRESOLVED_SEPARATOR).append(unresolvedList);
+ }
}
+ return dependencyMap;
}
/**
@@ -589,19 +587,14 @@ public class CoordCommandUtils {
* @throws Exception
*/
public static void registerPartition(CoordinatorActionBean actionBean)
throws MetadataServiceException {
-
- String resolved =
getResolvedList(actionBean.getPushMissingDependencies(), new StringBuilder(),
- new StringBuilder());
- if (resolved.length() == 0)
- return;
- String[] resolvedList =
resolved.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+ String missDeps = actionBean.getPushMissingDependencies();
+ String[] missDepsList =
missDeps.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
// always register action ID to missing partition in PDMS before
// asking HCat to avoid corner case where JMS notification msg
// arrives while asking HCat for existence of partition
- if (resolvedList != null && resolvedList.length > 0) {
- pdms.addMissingPartitions(resolvedList, actionBean.getId());
+ pdms.addMissingPartitions(missDepsList, actionBean.getId());
// after Hcat answers, two things need to be done
// 1. if partition exists, remove actionId from missing
// partition in PDMS
@@ -610,10 +603,5 @@ public class CoordCommandUtils {
// 2. if partition not exists, no-op
// we probably delegate these tasks to other separate command,
// so end up with queuing the new command
- }
- else {
- XLog.getLog(CoordCommandUtils.class).info("no resolved push data
dependency");
- }
-
}
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java
Thu Jan 10 20:37:15 2013
@@ -112,8 +112,10 @@ public class CoordMaterializeTransitionX
for (JsonBean actionBean : insertList) {
if (actionBean instanceof CoordinatorActionBean) {
CoordinatorActionBean coordAction =
(CoordinatorActionBean) actionBean;
- CoordCommandUtils.registerPartition(coordAction);
- queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()));
+ if (coordAction.getPushMissingDependencies() != null) {
+ CoordCommandUtils.registerPartition(coordAction);
+ queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()));
+ }
}
}
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Thu Jan 10 20:37:15 2013
@@ -22,15 +22,18 @@ import java.io.StringReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.CoordinatorActionBean;
+import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.XException;
import org.apache.oozie.client.CoordinatorAction;
+import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
@@ -38,20 +41,25 @@ import org.apache.oozie.coord.CoordELFun
import org.apache.oozie.dependency.URIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetForInputCheckJPAExecutor;
import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
+import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.MetadataServiceException;
+import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.ParamChecker;
+import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
public class CoordPushDependencyCheckXCommand extends
CoordinatorXCommand<Void> {
private String actionId;
private JPAService jpaService = null;
private CoordinatorActionBean coordAction = null;
+ private CoordinatorJobBean coordJob = null;
/**
* Property name of command re-queue interval for coordinator push check in
@@ -88,12 +96,16 @@ public class CoordPushDependencyCheckXCo
}
String user =
ParamChecker.notEmpty(actionConf.get(OozieClient.USER_NAME),
OozieClient.USER_NAME);
List<String> missingDeps = getMissingDependencies(pushDepList,
actionConf, user);
+ List<String> availableDepList = new
ArrayList<String>(Arrays.asList(pushDepList));
+ availableDepList.removeAll(missingDeps);
+
if (missingDeps.size() > 0) {
pushDeps = StringUtils.join(missingDeps,
CoordELFunctions.INSTANCE_SEPARATOR);
coordAction.setPushMissingDependencies(pushDeps);
// Checking for timeout
if (!isTimeout()) {
- queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()),
getCoordPushCheckRequeueInterval());
+ queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()),
+ getCoordPushCheckRequeueInterval());
}
else {
queue(new CoordActionTimeOutXCommand(coordAction), 100);
@@ -107,7 +119,8 @@ public class CoordPushDependencyCheckXCo
queue(new CoordActionReadyXCommand(coordAction.getJobId()),
100);
}
}
- updateCoordAction(coordAction);
+
+ updateCoordAction(coordAction, availableDepList);
return null;
}
@@ -143,15 +156,26 @@ public class CoordPushDependencyCheckXCo
return missingDeps;
}
- private void updateCoordAction(CoordinatorActionBean coordAction2) throws
CommandException {
+ private void updateCoordAction(CoordinatorActionBean coordAction2,
List<String> availPartitionList) throws CommandException {
coordAction.setLastModifiedTime(new Date());
if (jpaService != null) {
try {
jpaService.execute(new
CoordActionUpdatePushInputCheckJPAExecutor(coordAction));
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+ if (pdms.removeAvailablePartitions(
+
PartitionDependencyManagerService.createPartitionWrappers(availPartitionList),
actionId)) {
+ LOG.debug("Succesfully removed partitions for actionId:
[{0}] from available Map ", actionId);
+ }
+ else {
+ LOG.warn("Unable to remove partitions for actionId: [{0}]
from available Map ", actionId);
+ }
}
catch (JPAExecutorException jex) {
throw new CommandException(ErrorCode.E1023, jex.getMessage(),
jex);
}
+ catch (MetadataServiceException e) {
+ throw new CommandException(ErrorCode.E0902, e.getMessage(), e);
+ }
}
}
@@ -173,9 +197,8 @@ public class CoordPushDependencyCheckXCo
* @see org.apache.oozie.command.XCommand#getEntityKey()
*/
@Override
- // TODO - Check whether the entityKey should be JobId or actionId
public String getEntityKey() {
- return actionId;
+ return coordAction.getJobId();
}
/* (non-Javadoc)
@@ -196,18 +219,17 @@ public class CoordPushDependencyCheckXCo
return true;
}
- /*
- * (non-Javadoc)
- *
- * @see org.apache.oozie.command.XCommand#loadState()
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#eagerLoadState()
*/
@Override
- protected void loadState() throws CommandException {
+ protected void eagerLoadState() throws CommandException {
try {
jpaService = Services.get().get(JPAService.class);
if (jpaService != null) {
coordAction = jpaService.execute(new
CoordActionGetForInputCheckJPAExecutor(actionId));
+ coordJob = jpaService.execute(new
CoordJobGetJPAExecutor(coordAction.getJobId()));
LogUtils.setLogInfo(coordAction, logInfo);
}
else {
@@ -219,6 +241,41 @@ public class CoordPushDependencyCheckXCo
}
}
+ /* (non-Javadoc)
+ * @see org.apache.oozie.command.XCommand#eagerVerifyPrecondition()
+ */
+ @Override
+ protected void eagerVerifyPrecondition() throws CommandException,
PreconditionException {
+ if (coordAction.getStatus() != CoordinatorActionBean.Status.WAITING) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordPushDependencyCheck:: Ignoring action. Should
be in WAITING state, but state="
+ + coordAction.getStatus());
+ }
+
+ // if eligible to do action input check when running with backward
+ // support is true
+ if (StatusUtils.getStatusForCoordActionInputCheck(coordJob)) {
+ return;
+ }
+
+ if (coordJob.getStatus() != Job.Status.RUNNING && coordJob.getStatus()
!= Job.Status.RUNNINGWITHERROR
+ && coordJob.getStatus() != Job.Status.PAUSED &&
coordJob.getStatus() != Job.Status.PAUSEDWITHERROR) {
+ throw new PreconditionException(ErrorCode.E1100, "[" + actionId
+ + "]::CoordPushDependencyCheck:: Ignoring action."
+ + " Coordinator job is not in
RUNNING/RUNNINGWITHERROR/PAUSED/PAUSEDWITHERROR state, but state="
+ + coordJob.getStatus());
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.oozie.command.XCommand#loadState()
+ */
+ @Override
+ protected void loadState() throws CommandException {
+ }
+
/*
* (non-Javadoc)
*
@@ -226,9 +283,6 @@ public class CoordPushDependencyCheckXCo
*/
@Override
protected void verifyPrecondition() throws CommandException,
PreconditionException {
- if (coordAction.getStatus().equals(CoordinatorAction.Status.WAITING)
== false) {
- throw new PreconditionException(ErrorCode.E1100);
- }
}
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
Thu Jan 10 20:37:15 2013
@@ -18,6 +18,7 @@
package org.apache.oozie.service;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
@@ -31,6 +32,7 @@ import com.googlecode.concurrentlinkedha
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.coord.CoordActionUpdatePushMissingDependency;
import org.apache.oozie.jms.HCatMessageHandler;
import org.apache.oozie.jms.MessageReceiver;
@@ -589,4 +591,17 @@ public class PartitionDependencyManagerS
}
}
+ public static List<PartitionWrapper> createPartitionWrappers(List<String>
partitions) throws CommandException {
+ List<PartitionWrapper> ret = new ArrayList<PartitionWrapper>();
+ for (String partURI : partitions) {
+ try {
+ ret.add(new PartitionWrapper(partURI));
+ }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E1025, e);
+ }
+ }
+ return ret;
+ }
+
}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordMaterializeTransitionXCommand.java
Thu Jan 10 20:37:15 2013
@@ -22,9 +22,13 @@ import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
+import org.apache.oozie.ErrorCode;
import org.apache.oozie.SLAEventBean;
import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.client.CoordinatorJob.Timeunit;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
@@ -35,6 +39,7 @@ import org.apache.oozie.executor.jpa.SLA
import org.apache.oozie.local.LocalOozie;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
@@ -65,6 +70,48 @@ public class TestCoordMaterializeTransit
checkCoordAction(job.getId() + "@1");
}
+ public void testActionMaterForHcatalog() throws Exception {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ services.getConf().set(URIHandlerService.URI_HANDLERS,
+ FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
+ services.init();
+ Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-matd-hcat.xml",
+ CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ CoordinatorActionBean actionBean = getCoordAction(job.getId() + "@1");
+ assertEquals("file://dummyhdfs/2009/05/_SUCCESS" +
CoordCommandUtils.RESOLVED_UNRESOLVED_SEPARATOR
+ + "${coord:latestRange(-1,0)}",
actionBean.getMissingDependencies());
+
+ assertEquals("hcat://dummyhcat:1000/db/table/ds=2009-12",
actionBean.getPushMissingDependencies());
+ }
+
+
+ public void testActionMaterForHcatalogIncorrectURI() throws Exception {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ services.getConf().set(URIHandlerService.URI_HANDLERS,
+ FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
+ services.init();
+ Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T010:00Z");
+ Date endTime = DateUtils.parseDateOozieTZ("2009-03-11T10:00Z");
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-matd-neg-hcat.xml",
+ CoordinatorJob.Status.RUNNING, startTime, endTime, false,
false, 0);
+ try {
+ new CoordMaterializeTransitionXCommand(job.getId(), 3600).call();
+ fail("Expected Command exception but didn't catch any");
+ }
+ catch (CommandException e) {
+ e.printStackTrace();
+ assertEquals(ErrorCode.E1001, e.getErrorCode());
+ }
+ catch (Exception e) {
+ fail("Unexpected exception " + e.getMessage());
+ }
+ }
+
public void testActionMaterWithPauseTime1() throws Exception {
Date startTime = DateUtils.parseDateOozieTZ("2009-03-06T10:00Z");
Date endTime = DateUtils.parseDateOozieTZ("2009-03-06T10:14Z");
@@ -240,6 +287,13 @@ public class TestCoordMaterializeTransit
return actionBean;
}
+ private CoordinatorActionBean getCoordAction(String actionId) throws
JPAExecutorException {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean actionBean;
+ actionBean = jpaService.execute(new
CoordActionGetJPAExecutor(actionId));
+ return actionBean;
+ }
+
private CoordinatorJob.Status getStatus(String jobId){
CoordinatorJob job = null;
try {
@@ -284,4 +338,27 @@ public class TestCoordMaterializeTransit
fail("Action ID " + actionId + " was not stored properly in db");
}
}
+
+ private CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status,
+ Date start, Date end, boolean pending, boolean doneMatd, int
lastActionNum) throws Exception {
+
+ String testDir = getTestCaseDir();
+ CoordinatorJobBean coordJob = createCoordJob(testFileName, status,
start, end, pending, doneMatd, lastActionNum);
+ String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+ coordJob.setJobXml(appXml);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
+ jpaService.execute(coordInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord job record to table");
+ throw je;
+ }
+
+ return coordJob;
+ }
}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Thu Jan 10 20:37:15 2013
@@ -29,6 +29,7 @@ import org.apache.oozie.service.Services
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.PartitionWrapper;
+import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -41,8 +42,10 @@ public class TestCoordPushDependencyChec
protected void setUp() throws Exception {
super.setUp();
services = new Services();
- services.getConf().set(URIHandlerService.URI_HANDLERS,
+ Configuration conf = services.getConf();
+ conf.set(URIHandlerService.URI_HANDLERS,
FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
+ conf.set(Services.CONF_SERVICE_EXT_CLASSES,
"org.apache.oozie.service.PartitionDependencyManagerService");
services.init();
server = getMetastoreAuthority();
}
@@ -138,7 +141,7 @@ public class TestCoordPushDependencyChec
assertEquals(new PartitionWrapper(missDeps), new
PartitionWrapper(expDeps));
}
else {
- assertEquals(missDeps, expDeps);
+ assertEquals(expDeps, missDeps);
}
assertEquals(action.getStatus(), stat);
Added:
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml?rev=1431622&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
(added)
+++
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-hcat.xml
Thu Jan 10 20:37:15 2013
@@ -0,0 +1,64 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<coordinator-app xmlns='uri:oozie:coordinator:0.2' name='NAME'
+ frequency="1" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z'
+ timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>
+ <controls>
+ <timeout>10</timeout>
+ <concurrency>2</concurrency>
+ <execution>LIFO</execution>
+ </controls>
+ <input-events>
+ <data-in name='A' dataset='a'>
+ <dataset name='a' frequency='7'
initial-instance='2009-01-01T01:00Z'
+ timezone='UTC' freq_timeunit='DAY'
end_of_duration='NONE'>
+
<uri-template>hcat://dummyhcat:1000/db/table/ds=${YEAR}-${DAY}
+ </uri-template>
+ </dataset>
+ <instance>${coord:current(-3)}</instance>
+ </data-in>
+ <data-in name='B' dataset='b'>
+ <dataset name='b' frequency='7'
initial-instance='2009-01-01T01:00Z'
+ timezone='UTC' freq_timeunit='DAY'
end_of_duration='NONE'>
+
<uri-template>hcat://dummyhcat:1000/db/table/ds=${YEAR}-${DAY}
+ </uri-template>
+ </dataset>
+ <start-instance>${coord:latest(-1)}</start-instance>
+ <end-instance>${coord:latest(0)}</end-instance>
+ </data-in>
+ <data-in name='C' dataset='c'>
+ <dataset name='c' frequency='7'
initial-instance='2009-01-01T01:00Z'
+ timezone='UTC' freq_timeunit='DAY'
end_of_duration='NONE'>
+ <uri-template>file://dummyhdfs/${YEAR}/${DAY}
+ </uri-template>
+ </dataset>
+ <instance>${coord:current(0)}</instance>
+ </data-in>
+ </input-events>
+ <action>
+ <workflow>
+ <app-path>hdfs:///tmp/workflows/</app-path>
+ <configuration>
+ <property>
+ <name>inputA</name>
+ <value>${coord:dataIn('A')}</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
Added:
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml?rev=1431622&view=auto
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
(added)
+++
oozie/branches/hcat-intre/core/src/test/resources/coord-job-for-matd-neg-hcat.xml
Thu Jan 10 20:37:15 2013
@@ -0,0 +1,47 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<coordinator-app xmlns='uri:oozie:coordinator:0.2' name='NAME'
+ frequency="1" start='2009-02-01T01:00Z' end='2009-02-03T23:59Z'
+ timezone='UTC' freq_timeunit='DAY' end_of_duration='NONE'>
+ <controls>
+ <timeout>10</timeout>
+ <concurrency>2</concurrency>
+ <execution>LIFO</execution>
+ </controls>
+ <input-events>
+ <data-in name='A' dataset='a'>
+ <dataset name='a' frequency='7'
initial-instance='2009-01-01T01:00Z'
+ timezone='UTC' freq_timeunit='DAY'
end_of_duration='NONE'>
+
<uri-template>hcat://dummyhcat:1000/table/ds=${YEAR}/${DAY};region=us
+ </uri-template>
+ </dataset>
+ <instance>${coord:current(-3)}</instance>
+ </data-in>
+ </input-events>
+ <action>
+ <workflow>
+ <app-path>hdfs:///tmp/workflows/</app-path>
+ <configuration>
+ <property>
+ <name>inputA</name>
+ <value>${coord:dataIn('A')}</value>
+ </property>
+ </configuration>
+ </workflow>
+ </action>
+</coordinator-app>
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1431622&r1=1431621&r2=1431622&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Jan 10 20:37:15 2013
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1156 Make all the latest/future instances as pull dependences (virag)
OOZIE-1144 OOZIE-1137 breaks the sharelib (rkanter)
OOZIE-1035 Improve forkjoin validation to allow same errorTo transitions
(rkanter)
OOZIE-1137 In light of federation use actionLibPath instead of appPath (vaidya
via rkanter)