Author: virag
Date: Thu Jan 3 17:23:45 2013
New Revision: 1428491
URL: http://svn.apache.org/viewvc?rev=1428491&view=rev
Log:
OOZIE-1145 Modify Recovery Service to handle push missing dependencies (virag)
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/branches/hcat-intre/release-log.txt
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java
Thu Jan 3 17:23:45 2013
@@ -126,7 +126,7 @@ import org.apache.openjpa.persistence.jd
@NamedQuery(name = "GET_RUNNING_ACTIONS_OLDER_THAN", query = "select
a.id from CoordinatorActionBean a where a.status = 'RUNNING' AND
a.lastModifiedTimestamp <= :lastModifiedTime"),
- @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN",
query = "select a.id, a.jobId, a.status, a.externalId from
CoordinatorActionBean a where (a.status = 'WAITING' OR a.status = 'SUBMITTED')
AND a.lastModifiedTimestamp <= :lastModifiedTime"),
+ @NamedQuery(name = "GET_COORD_ACTIONS_WAITING_SUBMITTED_OLDER_THAN",
query = "select a.id, a.jobId, a.status, a.externalId,
a.pushMissingDependencies from CoordinatorActionBean a where (a.status =
'WAITING' OR a.status = 'SUBMITTED') AND a.lastModifiedTimestamp <=
:lastModifiedTime"),
@NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query
= "select a.id, a.jobId, a.status, a.externalId from CoordinatorActionBean a
where a.pending > 0 AND (a.status = 'SUSPENDED' OR a.status = 'KILLED' OR
a.status = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"),
// Select query used by rerun, requires almost all columns so select *
is used
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Thu Jan 3 17:23:45 2013
@@ -143,18 +143,18 @@ public class CoordActionInputCheckXComma
// pass jobID to the CoordActionReadyXCommand
queue(new CoordActionReadyXCommand(coordAction.getJobId()),
100);
}
- else {
- long waitingTime = (currentTime.getTime() -
Math.max(coordAction.getNominalTime().getTime(), coordAction
- .getCreatedTime().getTime()))
- / (60 * 1000);
- int timeOut = coordAction.getTimeOut();
- if ((timeOut >= 0) && (waitingTime > timeOut)) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ else if (!isTimeout(currentTime)) {
+ if (status == false) {
+ queue(new
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
+ getCoordInputCheckRequeueInterval());
}
else {
- queue(new
CoordActionInputCheckXCommand(coordAction.getId(), coordAction.getJobId()),
getCoordInputCheckRequeueInterval());
+ queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()));
}
}
+ else {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
}
catch (Exception e) {
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
@@ -174,6 +174,18 @@ public class CoordActionInputCheckXComma
return null;
}
+
+ private boolean isTimeout(Date currentTime) {
+ long waitingTime = (currentTime.getTime() -
Math.max(coordAction.getNominalTime().getTime(), coordAction
+ .getCreatedTime().getTime()))
+ / (60 * 1000);
+ int timeOut = coordAction.getTimeOut();
+ if ((timeOut >= 0) && (waitingTime > timeOut)) {
+ return true;
+ }
+ return false;
+ }
+
/**
* This function reads the value of re-queue interval for coordinator input
* check command from the Oozie configuration provided by Configuration
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Thu Jan 3 17:23:45 2013
@@ -40,6 +40,7 @@ import org.apache.oozie.executor.jpa.Coo
import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
+import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIAccessorException;
import org.apache.oozie.service.URIHandlerService;
@@ -52,6 +53,18 @@ public class CoordPushDependencyCheckXCo
private JPAService jpaService = null;
private CoordinatorActionBean coordAction = null;
+ /**
+ * Property name of command re-queue interval for coordinator push check in
+ * milliseconds.
+ */
+ public static final String CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL =
Service.CONF_PREFIX
+ + "coord.push.check.requeue.interval";
+ /**
+ * Default re-queue interval in ms. It is applied when no value defined in
+ * the oozie configuration.
+ */
+ private final int DEFAULT_COMMAND_REQUEUE_INTERVAL = 600000;
+
public CoordPushDependencyCheckXCommand(String actionId) {
super("coord_push_dep_check", "coord_push_dep_check", 0);
this.actionId = actionId;
@@ -79,7 +92,12 @@ public class CoordPushDependencyCheckXCo
pushDeps = StringUtils.join(missingDeps,
CoordELFunctions.INSTANCE_SEPARATOR);
coordAction.setPushMissingDependencies(pushDeps);
// Checking for timeout
- handleTimeout();
+ if (!isTimeout()) {
+ queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()),
getCoordPushCheckRequeueInterval());
+ }
+ else {
+ queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ }
}
else { // All push-based dependencies are available
coordAction.setPushMissingDependencies("");
@@ -93,6 +111,16 @@ public class CoordPushDependencyCheckXCo
return null;
}
+ /**
+ * Return the re-queue interval for coord push dependency check
+ * @return
+ */
+ public long getCoordPushCheckRequeueInterval() {
+ long requeueInterval =
Services.get().getConf().getLong(CONF_COORD_PUSH_CHECK_REQUEUE_INTERVAL,
+ DEFAULT_COMMAND_REQUEUE_INTERVAL);
+ return requeueInterval;
+ }
+
private List<String> getMissingDependencies(String[] dependencies,
Configuration conf, String user)
throws CommandException {
List<String> missingDeps = new ArrayList<String>();
@@ -127,14 +155,16 @@ public class CoordPushDependencyCheckXCo
}
}
- private void handleTimeout() {
+ // returns true if timeout command is queued
+ private boolean isTimeout() {
long waitingTime = (new Date().getTime() -
Math.max(coordAction.getNominalTime().getTime(), coordAction
.getCreatedTime().getTime()))
/ (60 * 1000);
int timeOut = coordAction.getTimeOut();
if ((timeOut >= 0) && (waitingTime > timeOut)) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ return true;
}
+ return false;
}
/*
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionsGetForRecoveryJPAExecutor.java
Thu Jan 3 17:23:45 2013
@@ -60,7 +60,7 @@ public class CoordActionsGetForRecoveryJ
q.setParameter("lastModifiedTime", ts);
List<Object[]> objectArrList = q.getResultList();
for (Object[] arr : objectArrList) {
- CoordinatorActionBean caa =
getBeanForCoordinatorActionFromArray(arr);
+ CoordinatorActionBean caa =
getBeanForCoordinatorActionFromArrayForRecovery(arr);
allActions.add(caa);
}
@@ -68,7 +68,7 @@ public class CoordActionsGetForRecoveryJ
q.setParameter("lastModifiedTime", ts);
objectArrList = q.getResultList();
for (Object[] arr : objectArrList) {
- CoordinatorActionBean caa =
getBeanForCoordinatorActionFromArray(arr);
+ CoordinatorActionBean caa =
getBeanForCoordinatorActionFromArrayForWaiting(arr);
allActions.add(caa);
}
@@ -79,7 +79,7 @@ public class CoordActionsGetForRecoveryJ
}
}
- private CoordinatorActionBean
getBeanForCoordinatorActionFromArray(Object[] arr) {
+ private CoordinatorActionBean
getBeanForCoordinatorActionFromArrayForRecovery(Object[] arr) {
CoordinatorActionBean bean = new CoordinatorActionBean();
if (arr[0] != null) {
bean.setId((String) arr[0]);
@@ -96,4 +96,25 @@ public class CoordActionsGetForRecoveryJ
return bean;
}
+
+ private CoordinatorActionBean
getBeanForCoordinatorActionFromArrayForWaiting(Object[] arr){
+ CoordinatorActionBean bean = new CoordinatorActionBean();
+ if (arr[0] != null) {
+ bean.setId((String) arr[0]);
+ }
+ if (arr[1] != null){
+ bean.setJobId((String) arr[1]);
+ }
+ if (arr[2] != null) {
+ bean.setStatus(CoordinatorAction.Status.valueOf((String) arr[2]));
+ }
+ if (arr[3] != null) {
+ bean.setExternalId((String) arr[3]);
+ }
+ if (arr[4] != null) {
+ bean.setPushMissingDependencies((String) arr[4]);
+ }
+ return bean;
+ }
+
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/JMSAccessorService.java
Thu Jan 3 17:23:45 2013
@@ -321,7 +321,7 @@ public class JMSAccessorService implemen
});
}
- catch (Exception e1){
+ catch (Exception e1) {
LOG.error(e1.getMessage(), e1);
if (conn != null) {
try {
@@ -331,6 +331,7 @@ public class JMSAccessorService implemen
LOG.error(e2.getMessage(), e2);
}
}
+ throw new ServiceException(ErrorCode.E0100, getClass().getName(),
e1.getMessage(), e1);
}
return conn;
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/PartitionDependencyManagerService.java
Thu Jan 3 17:23:45 2013
@@ -146,6 +146,40 @@ public class PartitionDependencyManagerS
}
/**
+ * Checks whether the tables exists in the Map or not
+ * @param hcatURI
+ * @return
+ * @throws MetadataServiceException
+ */
+ public boolean containsTable(String hcatURI) throws
MetadataServiceException {
+ HCatURI uri;
+ try {
+ uri = new HCatURI(hcatURI);
+ }
+ catch (URISyntaxException e) {
+ throw new MetadataServiceException(ErrorCode.E1025,
e.getMessage());
+ }
+ PartitionWrapper partition = new PartitionWrapper(uri.getServer(),
uri.getDb(), uri.getTable(),
+ uri.getPartitionMap());
+ return containsTable(partition);
+
+ }
+
+ private boolean containsTable(PartitionWrapper partition) {
+ String prefix = PartitionWrapper.makePrefix(partition.getServerName(),
partition.getDbName());
+ Map<String, PartitionsGroup> tablePartitionsMap;
+ String tableName = partition.getTableName();
+ if (hcatInstanceMap.containsKey(prefix)) {
+ tablePartitionsMap = hcatInstanceMap.get(prefix);
+ if (tablePartitionsMap.containsKey(tableName)) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ /**
* Adding missing partition entry specified by PartitionWrapper object
*
* @param partition
@@ -445,8 +479,7 @@ public class PartitionDependencyManagerS
return containsPartition(partition);
}
-
- /**
+ /**
* Determine if a partition entry exists in cache
*
* @param partition
@@ -474,7 +507,8 @@ public class PartitionDependencyManagerS
PartitionsGroup missingPartitions = tableMap.get(tableName);
if (missingPartitions != null &&
missingPartitions.getPartitionsMap().containsKey(partition)) {
actionsList = missingPartitions.getPartitionsMap().get(partition);
- if (actionsList != null) {
+ // TODO - check whether set will be better than list
+ if (actionsList != null &&
!actionsList.getActions().contains(actionId)) {
// partition exists, therefore append action
actionsList.addAndUpdate(actionId);
}
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/service/RecoveryService.java
Thu Jan 3 17:23:45 2013
@@ -37,6 +37,7 @@ import org.apache.oozie.command.coord.Co
import org.apache.oozie.command.coord.CoordActionReadyXCommand;
import org.apache.oozie.command.coord.CoordActionStartXCommand;
import org.apache.oozie.command.coord.CoordKillXCommand;
+import org.apache.oozie.command.coord.CoordPushDependencyCheckXCommand;
import org.apache.oozie.command.coord.CoordResumeXCommand;
import org.apache.oozie.command.coord.CoordSubmitXCommand;
import org.apache.oozie.command.coord.CoordSuspendXCommand;
@@ -46,6 +47,7 @@ import org.apache.oozie.command.wf.KillX
import org.apache.oozie.command.wf.ResumeXCommand;
import org.apache.oozie.command.wf.SignalXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
+import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.BundleActionsGetWaitingOlderJPAExecutor;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionsGetForRecoveryJPAExecutor;
@@ -80,6 +82,12 @@ public class RecoveryService implements
* The number of callables to be queued in a batch.
*/
public static final String CONF_CALLABLE_BATCH_SIZE = CONF_PREFIX +
"callable.batch.size";
+
+ /**
+ * Delay for the push missing dependencies in milliseconds.
+ */
+ public static final String CONF_PUSH_DEPENDENCY_DELAY = CONF_PREFIX +
"push.dependency.delay";
+
/**
* Age of actions to queue, in seconds.
*/
@@ -113,6 +121,10 @@ public class RecoveryService implements
private List<XCallable<?>> delayedCallables;
private StringBuilder msg = null;
private JPAService jpaService = null;
+ URIHandlerService uriService =
Services.get().get(URIHandlerService.class);
+ JMSAccessorService jmsService =
Services.get().get(JMSAccessorService.class);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+
public RecoveryRunnable(long olderThan, long coordOlderThan,long
bundleOlderThan) {
this.olderThan = olderThan;
@@ -210,6 +222,31 @@ public class RecoveryService implements
}
}
+ }
+
+ private void registerPartitions(CoordinatorActionBean actionBean)
throws URIAccessorException,
+ MetadataServiceException {
+ String pushDeps = actionBean.getPushMissingDependencies();
+ String[] pushDepsArr =
pushDeps.split(CoordELFunctions.INSTANCE_SEPARATOR, -1);
+
+ String firstURI = pushDepsArr[0];
+ String uriWithSchemeAuthority =
uriService.getAuthorityWithScheme(firstURI).toString();
+ if (jmsService.isExistsConnection(uriWithSchemeAuthority)) {
+ if (pdms.containsTable(firstURI)) {
+ return; // assuming that all partitions are registered as
+ // connection and table exists
+ }
+ else {
+ for (String uri : pushDepsArr) {
+ pdms.addMissingPartition(uri, actionBean.getId());
+ }
+ }
+ }
+ else if (jmsService.getOrCreateConnection(uriWithSchemeAuthority))
{
+ for (String uri : pushDepsArr) {
+ pdms.addMissingPartition(uri, actionBean.getId());
+ }
+ }
}
@@ -219,6 +256,7 @@ public class RecoveryService implements
private void runCoordActionRecovery() {
XLog.Info.get().clear();
XLog log = XLog.getLog(getClass());
+ long pushMissingDepDelay =
Services.get().getConf().getLong(CONF_PUSH_DEPENDENCY_DELAY, 60000);
List<CoordinatorActionBean> cactions = null;
try {
cactions = jpaService.execute(new
CoordActionsGetForRecoveryJPAExecutor(coordOlderThan));
@@ -234,8 +272,11 @@ public class RecoveryService implements
.incr(INSTRUMENTATION_GROUP,
INSTR_RECOVERED_COORD_ACTIONS_COUNTER, 1);
if (caction.getStatus() ==
CoordinatorActionBean.Status.WAITING) {
queueCallable(new
CoordActionInputCheckXCommand(caction.getId(), caction.getJobId()));
-
- log.info("Recover a WAITTING coord action and resubmit
CoordActionInputCheckXCommand :"
+ log.info("Recover a WAITING coord action and resubmit
CoordActionInputCheckXCommand :"
+ + caction.getId());
+ queueCallable(new
CoordPushDependencyCheckXCommand(caction.getId()), pushMissingDepDelay);
+ registerPartitions(caction);
+ log.info("Recover a WAITING coord action and resubmit
CoordPushDependencyCheckX :"
+ caction.getId());
}
else if (caction.getStatus() ==
CoordinatorActionBean.Status.SUBMITTED) {
Modified:
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
(original)
+++
oozie/branches/hcat-intre/core/src/main/java/org/apache/oozie/util/WaitingActions.java
Thu Jan 3 17:23:45 2013
@@ -34,6 +34,7 @@ public class WaitingActions {
* Empty (default) constructor
*/
public WaitingActions() {
+ // TODO - as Writes are frequent, check whether this is correct or not?
this(new CopyOnWriteArrayList<String>());
}
Modified: oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml
(original)
+++ oozie/branches/hcat-intre/core/src/main/resources/oozie-default.xml Thu Jan
3 17:23:45 2013
@@ -262,6 +262,15 @@
</property>
<property>
+ <name>oozie.service.RecoveryService.push.dependency.delay</name>
+ <value>60000</value>
+ <description>
+ This value determines the delay for push missing dependancy command
+ in Recovery Service
+ </description>
+ </property>
+
+ <property>
<name>oozie.service.RecoveryService.interval</name>
<value>60</value>
<description>
@@ -396,6 +405,14 @@
</property>
<property>
+ <name>oozie.service.coord.push.check.requeue.interval
+ </name>
+ <value>600000</value>
+ <description>Command re-queue interval for push dependencies
(in millisecond).
+ </description>
+ </property>
+
+ <property>
<name>oozie.service.coord.default.concurrency
</name>
<value>1</value>
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionUpdatePushMissingDependency.java
Thu Jan 3 17:23:45 2013
@@ -17,35 +17,24 @@
*/
package org.apache.oozie.command.coord;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.Date;
import java.util.List;
import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JMSAccessorService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.PartitionDependencyManagerService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.HCatURI;
-import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PartitionWrapper;
-import org.apache.oozie.util.XLog;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestCoordActionUpdatePushMissingDependency extends XDataTestCase {
- private String TZ;
private Services services;
@Before
@@ -56,8 +45,6 @@ public class TestCoordActionUpdatePushMi
setSystemProperty(PartitionDependencyManagerService.MAP_MAX_WEIGHTED_CAPACITY,
"100");
services = super.setupServicesForHCatalog();
services.init();
- TZ =
(getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" :
getProcessingTZ()
- .substring(3);
}
@After
@@ -157,71 +144,4 @@ public class TestCoordActionUpdatePushMi
}
}
- private String addInitRecords(String pushMissingDependencies) throws
Exception {
- Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
- Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
- CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
- CoordinatorJob.Status.RUNNING, startTime, endTime, false,
true, 3);
-
- CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", pushMissingDependencies);
- return action1.getId();
- }
-
- protected CoordinatorActionBean
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
- CoordinatorAction.Status status, String resourceXmlName, String
pushMissingDependencies) throws Exception {
- CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0, TZ);
- action.setPushMissingDependencies(pushMissingDependencies);
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordActionInsertJPAExecutor coordActionInsertCmd = new
CoordActionInsertJPAExecutor(action);
- jpaService.execute(coordActionInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord action record to table");
- throw je;
- }
- return action;
- }
-
- protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status,
- Date start, Date end, boolean pending, boolean doneMatd, int
lastActionNum) throws Exception {
-
- String testDir = getTestCaseDir();
- CoordinatorJobBean coordJob = createCoordJob(testFileName, status,
start, end, pending, doneMatd, lastActionNum);
- String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
- coordJob.setJobXml(appXml);
-
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
- jpaService.execute(coordInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord job record to table");
- throw je;
- }
-
- return coordJob;
- }
-
- protected String getCoordJobXmlForWaiting(String testFileName, String
testDir) {
- try {
- Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
- String appXml = IOUtils.getReaderAsString(reader, -1);
- appXml = appXml.replaceAll("#testDir", testDir);
- return appXml;
- }
- catch (IOException ioe) {
- throw new RuntimeException(XLog.format("Could not get " +
testFileName, ioe));
- }
- }
-
- protected String getProcessingTZ() {
- return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
- }
}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Thu Jan 3 17:23:45 2013
@@ -17,34 +17,23 @@
*/
package org.apache.oozie.command.coord;
-import java.io.IOException;
-import java.io.Reader;
-import java.util.Date;
import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.client.CoordinatorAction;
-import org.apache.oozie.client.CoordinatorJob;
import org.apache.oozie.coord.CoordELFunctions;
import org.apache.oozie.dependency.FSURIHandler;
import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.util.DateUtils;
-import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.PartitionWrapper;
-import org.apache.oozie.util.XLog;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestCoordPushDependencyCheckXCommand extends XDataTestCase {
- private String TZ;
private String server;
private Services services = null;
@@ -56,8 +45,6 @@ public class TestCoordPushDependencyChec
FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
services.init();
server = getMetastoreAuthority();
- TZ =
(getProcessingTZ().equals(DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT)) ? "Z" :
getProcessingTZ()
- .substring(3);
}
@After
@@ -162,72 +149,4 @@ public class TestCoordPushDependencyChec
}
}
- private String addInitRecords(String pushMissingDependencies) throws
Exception {
- Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59" + TZ);
- Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59" + TZ);
- CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
- CoordinatorJob.Status.RUNNING, startTime, endTime, false,
true, 3);
-
- CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", pushMissingDependencies);
- return action1.getId();
- }
-
- protected CoordinatorActionBean
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
- CoordinatorAction.Status status, String resourceXmlName, String
pushMissingDependencies) throws Exception {
- CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0, TZ);
- action.setPushMissingDependencies(pushMissingDependencies);
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordActionInsertJPAExecutor coordActionInsertCmd = new
CoordActionInsertJPAExecutor(action);
- jpaService.execute(coordActionInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord action record to table");
- throw je;
- }
- return action;
- }
-
- protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status,
- Date start, Date end, boolean pending, boolean doneMatd, int
lastActionNum) throws Exception {
-
- String testDir = getTestCaseDir();
- CoordinatorJobBean coordJob = createCoordJob(testFileName, status,
start, end, pending, doneMatd, lastActionNum);
- String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
- coordJob.setJobXml(appXml);
-
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
- jpaService.execute(coordInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord job record to table");
- throw je;
- }
-
- return coordJob;
- }
-
- protected String getCoordJobXmlForWaiting(String testFileName, String
testDir) {
- try {
- Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
- String appXml = IOUtils.getReaderAsString(reader, -1);
- appXml = appXml.replaceAll("#testDir", testDir);
- return appXml;
- }
- catch (IOException ioe) {
- throw new RuntimeException(XLog.format("Could not get " +
testFileName, ioe));
- }
- }
-
- protected String getProcessingTZ() {
- return DateUtils.OOZIE_PROCESSING_TIMEZONE_DEFAULT;
- }
-
}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestPartitionDependencyManagerService.java
Thu Jan 3 17:23:45 2013
@@ -199,8 +199,6 @@ public class TestPartitionDependencyMana
assertFalse(actions.getActions().contains(actionId2));
}
-
-
/**
* Test removal of partitions from Available map
*/
@@ -248,5 +246,26 @@ public class TestPartitionDependencyMana
}
}
+ /**
+ * Test table available from the Map
+ * @throws URISyntaxException
+ * @throws MetadataServiceException
+ */
+ @Test
+ public void testMapContainsTable() throws URISyntaxException,
MetadataServiceException{
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+ jmsService.getOrCreateConnection("hcat://hcat.server.com:5080");
+ String newHCatDependency1 =
"hcat://hcat.server.com:5080/mydb/clicks/datastamp=12";
+
+ // +ve test
+ pdms.addMissingPartition(newHCatDependency1, "1");
+ assertTrue(pdms.containsTable(newHCatDependency1));
+ // -ve test
+ pdms.removePartition(newHCatDependency1, true);
+ assertFalse(pdms.containsTable(newHCatDependency1));
+
+ }
+
}
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
Thu Jan 3 17:23:45 2013
@@ -25,8 +25,13 @@ import java.io.PrintWriter;
import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Date;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -52,6 +57,9 @@ import org.apache.oozie.client.WorkflowJ
import org.apache.oozie.client.CoordinatorJob.Execution;
import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
+import org.apache.oozie.coord.CoordELFunctions;
+import org.apache.oozie.dependency.FSURIHandler;
+import org.apache.oozie.dependency.HCatURIHandler;
import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobInsertJPAExecutor;
@@ -66,6 +74,7 @@ import org.apache.oozie.store.WorkflowSt
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PartitionWrapper;
import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
@@ -73,15 +82,18 @@ import org.apache.oozie.workflow.Workflo
public class TestRecoveryService extends XDataTestCase {
private Services services;
+ private String server;
@Override
protected void setUp() throws Exception {
super.setUp();
+ server = getMetastoreAuthority();
setSystemProperty(SchemaService.WF_CONF_EXT_SCHEMAS,
"wf-ext-schema.xsd");
services = new Services();
services.init();
cleanUpDBTables();
services.get(ActionService.class).register(ForTestingActionExecutor.class);
+
}
@Override
@@ -186,9 +198,9 @@ public class TestRecoveryService extends
store3.commitTrx();
store3.closeTrx();
}
-
+
/**
- * Tests functionality of the Recovery Service Runnable command. </p>
Starts an action with USER_RETRY status.
+ * Tests functionality of the Recovery Service Runnable command. </p>
Starts an action with USER_RETRY status.
* Runs the recovery runnable, and ensures the state changes to OK and the
job completes successfully.
*
* @throws Exception
@@ -197,11 +209,11 @@ public class TestRecoveryService extends
final JPAService jpaService = Services.get().get(JPAService.class);
WorkflowJobBean job =
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(),
"1", WorkflowAction.Status.USER_RETRY);
-
+
Runnable recoveryRunnable = new RecoveryRunnable(0, 60, 60);
recoveryRunnable.run();
sleep(3000);
-
+
final WorkflowActionGetJPAExecutor wfActionGetCmd = new
WorkflowActionGetJPAExecutor(action.getId());
waitFor(5000, new Predicate() {
@@ -233,7 +245,7 @@ public class TestRecoveryService extends
assertTrue(launcherJob.isSuccessful());
assertTrue(LauncherMapper.hasIdSwap(launcherJob));
}
-
+
/**
* Tests functionality of the Recovery Service Runnable command. </p>
Insert a coordinator job with RUNNING and
@@ -294,11 +306,8 @@ public class TestRecoveryService extends
*/
public void testCoordActionRecoveryServiceForWaiting() throws Exception {
- String currentDatePlusMonth =
XDataTestCase.getCurrentDateafterIncrementingInMonths(1);
- Date startTime = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
- Date endTime = DateUtils.parseDateOozieTZ(currentDatePlusMonth);
CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
- CoordinatorJob.Status.RUNNING, startTime, endTime, false,
true, 0);
+ CoordinatorJob.Status.RUNNING, false, true);
CoordinatorActionBean action =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml");
@@ -332,6 +341,109 @@ public class TestRecoveryService extends
}
}
+
+ public void testCoordActionRecoveryServiceForWaitingRegisterPartition()
throws Exception {
+ services.destroy();
+ services = super.setupServicesForHCatalog();
+ services.getConf().set(URIHandlerService.URI_HANDLERS,
+ FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
+ services.init();
+
+ String db = "default";
+ String table = "tablename";
+
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120412;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+ String actionId = addInitRecords(newHCatDependency);
+
+ CoordinatorAction ca = checkCoordActionDependencies(actionId,
newHCatDependency, 0);
+ assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+
+ sleep(2000);
+
+ Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
+ recoveryRunnable.run();
+
+ sleep(2000);
+
+ PartitionDependencyManagerService pdms =
services.get(PartitionDependencyManagerService.class);
+ JMSAccessorService jmsService = services.get(JMSAccessorService.class);
+
+ // Recovery service should have created the partitions and JMS
Connection should exist
+ assertTrue(jmsService.isExistsConnection("hcat://" + server));
+ assertTrue(pdms.containsPartition(newHCatDependency1));
+ assertTrue(pdms.containsPartition(newHCatDependency2));
+
+ }
+
+ public void testCoordActionRecoveryServiceForWaitingPushMissingDeps()
throws Exception {
+ services.destroy();
+ services = new Services();
+ Configuration conf = services.getConf();
+ conf.set(URIHandlerService.URI_HANDLERS,
+ FSURIHandler.class.getName() + "," +
HCatURIHandler.class.getName());
+ conf.setLong(RecoveryService.CONF_PUSH_DEPENDENCY_DELAY, 10);
+
+ services.init();
+
+ String db = "default";
+ String table = "tablename";
+
+
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120412;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+ populateTable(db, table);
+ String actionId = addInitRecords(newHCatDependency);
+
+ CoordinatorAction ca = checkCoordActionDependencies(actionId,
newHCatDependency, 0);
+ assertEquals(CoordinatorAction.Status.WAITING, ca.getStatus());
+
+ sleep(3000);
+
+ Runnable recoveryRunnable = new RecoveryRunnable(0, 1, 1);
+ recoveryRunnable.run();
+
+ sleep(3000);
+
+ ca = checkCoordActionDependencies(actionId, "", 0);
+ assertFalse(ca.getStatus().equals(CoordinatorAction.Status.WAITING));
+ }
+
+ private void populateTable(String db, String table) throws Exception {
+ dropTable(db, table, true);
+ dropDatabase(db, true);
+ createDatabase(db);
+ createTable(db, table, "dt,country");
+ addPartition(db, table, "dt=20120430;country=usa");
+ addPartition(db, table, "dt=20120412;country=brazil");
+ addPartition(db, table, "dt=20120413;country=brazil");
+ }
+
+
+ private CoordinatorActionBean checkCoordActionDependencies(String
actionId, String expDeps,
+ int type) throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new
CoordActionGetJPAExecutor(actionId));
+ String missDeps = action.getPushMissingDependencies();
+ if (type != 0) {
+ assertEquals(new PartitionWrapper(missDeps), new
PartitionWrapper(expDeps));
+ }
+ else {
+ assertEquals(missDeps, expDeps);
+ }
+
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored
properly in db");
+ }
+ }
+
/**
* Tests functionality of the Recovery Service Runnable command. </p>
Insert a coordinator job with SUSPENDED and
* action with SUSPENDED and workflow with RUNNING. Then, runs the
recovery runnable and ensures the workflow status changes to SUSPENDED.
@@ -441,29 +553,6 @@ public class TestRecoveryService extends
assertEquals(WorkflowJob.Status.RUNNING, ret.getStatus());
}
- protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status, Date start, Date end,
- boolean pending, boolean doneMatd, int lastActionNum) throws
Exception {
-
- String testDir = getTestCaseDir();
- CoordinatorJobBean coordJob = createCoordJob(testFileName, status,
start, end, pending, doneMatd, lastActionNum);
- String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
- coordJob.setJobXml(appXml);
-
- try {
- JPAService jpaService = Services.get().get(JPAService.class);
- assertNotNull(jpaService);
- CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
- jpaService.execute(coordInsertCmd);
- }
- catch (JPAExecutorException je) {
- je.printStackTrace();
- fail("Unable to insert the test coord job record to table");
- throw je;
- }
-
- return coordJob;
- }
-
protected CoordinatorActionBean
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
CoordinatorAction.Status status, String resourceXmlName) throws
Exception {
CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0);
@@ -500,18 +589,6 @@ public class TestRecoveryService extends
}
}
- protected String getCoordJobXmlForWaiting(String testFileName, String
testDir) {
- try {
- Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
- String appXml = IOUtils.getReaderAsString(reader, -1);
- appXml = appXml.replaceAll("#testDir", testDir);
- return appXml;
- }
- catch (IOException ioe) {
- throw new RuntimeException(XLog.format("Could not get "+
testFileName, ioe));
- }
- }
-
private void addRecordToActionTable(String jobId, int actionNum, String
actionId, CoordinatorStore store, String baseDir) throws StoreException,
IOException {
CoordinatorActionBean action = new CoordinatorActionBean();
action.setJobId(jobId);
@@ -699,7 +776,7 @@ public class TestRecoveryService extends
throw se;
}
}
-
+
@Override
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String
actionName, WorkflowAction.Status status)
throws Exception {
Modified:
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
---
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
(original)
+++
oozie/branches/hcat-intre/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
Thu Jan 3 17:23:45 2013
@@ -1269,4 +1269,65 @@ public abstract class XDataTestCase exte
return DateUtils.formatDateOozieTZ(currentDate);
}
+ protected String addInitRecords(String pushMissingDependencies) throws
Exception {
+ CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
+ CoordinatorJob.Status.RUNNING, false, true);
+
+ CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", pushMissingDependencies);
+ return action1.getId();
+ }
+
+ protected CoordinatorActionBean
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
+ CoordinatorAction.Status status, String resourceXmlName, String
pushMissingDependencies) throws Exception {
+ CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0);
+ action.setPushMissingDependencies(pushMissingDependencies);
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordActionInsertJPAExecutor coordActionInsertCmd = new
CoordActionInsertJPAExecutor(action);
+ jpaService.execute(coordActionInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord action record to table");
+ throw je;
+ }
+ return action;
+ }
+
+ protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status,
+ boolean pending, boolean doneMatd) throws Exception {
+
+ String testDir = getTestCaseDir();
+ CoordinatorJobBean coordJob = createCoordJob(status, pending,
doneMatd);
+ String appXml = getCoordJobXmlForWaiting(testFileName, testDir);
+ coordJob.setJobXml(appXml);
+
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ assertNotNull(jpaService);
+ CoordJobInsertJPAExecutor coordInsertCmd = new
CoordJobInsertJPAExecutor(coordJob);
+ jpaService.execute(coordInsertCmd);
+ }
+ catch (JPAExecutorException je) {
+ je.printStackTrace();
+ fail("Unable to insert the test coord job record to table");
+ throw je;
+ }
+
+ return coordJob;
+ }
+
+ protected String getCoordJobXmlForWaiting(String testFileName, String
testDir) {
+ try {
+ Reader reader = IOUtils.getResourceAsReader(testFileName, -1);
+ String appXml = IOUtils.getReaderAsString(reader, -1);
+ appXml = appXml.replaceAll("#testDir", testDir);
+ return appXml;
+ }
+ catch (IOException ioe) {
+ throw new RuntimeException(XLog.format("Could not get " +
testFileName, ioe));
+ }
+ }
}
Modified: oozie/branches/hcat-intre/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/hcat-intre/release-log.txt?rev=1428491&r1=1428490&r2=1428491&view=diff
==============================================================================
--- oozie/branches/hcat-intre/release-log.txt (original)
+++ oozie/branches/hcat-intre/release-log.txt Thu Jan 3 17:23:45 2013
@@ -1,5 +1,6 @@
-- Oozie 3.4.0 release (trunk - unreleased)
+OOZIE-1145 Modify Recovery Service to handle push missing dependencies (virag)
OOZIE-1135 Display missing partition dependencies via job -info command on CLI
(mona)
OOZIE-1125 Prepare actions for hcat (rohini via virag)
OOZIE-1123 EL Functions for hcatalog (mona)