Author: mona
Date: Tue Mar 5 22:57:37 2013
New Revision: 1453070
URL: http://svn.apache.org/r1453070
Log:
OOZIE-1250 Coord action timeout not happening when there is a exception (rohini
via mona)
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
oozie/trunk/release-log.txt
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordActionInputCheckXCommand.java
Tue Mar 5 22:57:37 2013
@@ -42,6 +42,7 @@ import org.apache.oozie.executor.jpa.Coo
import
org.apache.oozie.executor.jpa.CoordActionUpdateForModifiedTimeJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
@@ -157,7 +158,7 @@ public class CoordActionInputCheckXComma
}
else {
if (!nonExistListStr.isEmpty() && pushDeps == null ||
pushDeps.length() == 0) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ queue(new CoordActionTimeOutXCommand(coordAction));
}
else {
// Let CoordPushDependencyCheckXCommand queue the timeout
@@ -167,7 +168,9 @@ public class CoordActionInputCheckXComma
}
catch (Exception e) {
if (isTimeout(currentTime)) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ LOG.debug("Queueing timeout command");
+ // XCommand.queue() will not work when there is a Exception
+ Services.get().get(CallableQueueService.class).queue(new
CoordActionTimeOutXCommand(coordAction));
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified:
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
---
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
(original)
+++
oozie/trunk/core/src/main/java/org/apache/oozie/command/coord/CoordPushDependencyCheckXCommand.java
Tue Mar 5 22:57:37 2013
@@ -20,6 +20,7 @@ package org.apache.oozie.command.coord;
import java.io.IOException;
import java.io.StringReader;
import java.net.URI;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -32,7 +33,6 @@ import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
-import org.apache.oozie.coord.CoordELEvaluator;
import org.apache.oozie.dependency.DependencyChecker;
import org.apache.oozie.dependency.ActionDependency;
import org.apache.oozie.dependency.URIHandler;
@@ -41,16 +41,14 @@ import org.apache.oozie.executor.jpa.Coo
import
org.apache.oozie.executor.jpa.CoordActionUpdatePushInputCheckJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
+import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Service;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.LogUtils;
import org.apache.oozie.util.StatusUtils;
import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
public class CoordPushDependencyCheckXCommand extends
CoordinatorXCommand<Void> {
protected String actionId;
@@ -93,7 +91,9 @@ public class CoordPushDependencyCheckXCo
LOG.info("Nothing to check. Empty push missing dependency");
}
else {
- LOG.info("Push missing dependencies for actionID [{0}] is [{1}] ",
actionId, pushMissingDeps);
+ String[] missingDepsArray =
DependencyChecker.dependenciesAsArray(pushMissingDeps);
+ LOG.info("First Push missing dependency for actionID [{0}] is
[{1}] ", actionId, missingDepsArray[0]);
+ LOG.trace("Push missing dependencies for actionID [{0}] is [{1}]
", actionId, pushMissingDeps);
try {
Configuration actionConf = null;
@@ -104,7 +104,6 @@ public class CoordPushDependencyCheckXCo
throw new CommandException(ErrorCode.E1307,
e.getMessage(), e);
}
- String[] missingDepsArray =
DependencyChecker.dependenciesAsArray(pushMissingDeps);
// Check all dependencies during materialization to avoid
registering in the cache.
// But check only first missing one afterwards similar to
// CoordActionInputCheckXCommand for efficiency.
listPartitions is costly.
@@ -129,7 +128,7 @@ public class CoordPushDependencyCheckXCo
// Checking for timeout
timeout = isTimeout();
if (timeout) {
- queue(new CoordActionTimeOutXCommand(coordAction),
100);
+ queue(new CoordActionTimeOutXCommand(coordAction));
}
else {
queue(new
CoordPushDependencyCheckXCommand(coordAction.getId()),
@@ -150,7 +149,10 @@ public class CoordPushDependencyCheckXCo
}
catch (Exception e) {
if (isTimeout()) {
- queue(new CoordActionTimeOutXCommand(coordAction), 100);
+ LOG.debug("Queueing timeout command");
+ // XCommand.queue() will not work when there is a Exception
+ Services.get().get(CallableQueueService.class).queue(new
CoordActionTimeOutXCommand(coordAction));
+
unregisterMissingDependencies(Arrays.asList(missingDepsArray));
}
throw new CommandException(ErrorCode.E1021, e.getMessage(), e);
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionInputCheckXCommand.java
Tue Mar 5 22:57:37 2013
@@ -51,6 +51,7 @@ import org.apache.oozie.util.XConfigurat
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
+import org.junit.Test;
public class TestCoordActionInputCheckXCommand extends XDataTestCase {
protected Services services;
@@ -570,6 +571,46 @@ public class TestCoordActionInputCheckXC
assertEquals(action.getStatus(), CoordinatorAction.Status.READY);
}
+ @Test
+ public void testTimeout() throws Exception {
+ String missingDeps = "hdfs:///dirx/filex";
+ String actionId = addInitRecords(missingDeps, null, TZ);
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0,
actionId.indexOf("@"))).call();
+ // Timeout is 10 mins. Change action creation time to before 12 min to
make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 *
1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0,
actionId.indexOf("@"))).call();
+ Thread.sleep(100);
+ checkCoordAction(actionId, missingDeps,
CoordinatorAction.Status.TIMEDOUT);
+ }
+
+ @Test
+ public void testTimeoutWithException() throws Exception {
+ String missingDeps = "nofs:///dirx/filex";
+ String actionId = addInitRecords(missingDeps, null, TZ);
+ try {
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0,
actionId.indexOf("@"))).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("No FileSystem for scheme"));
+ }
+ // Timeout is 10 mins. Change action created time to before 12 min to
make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 *
1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ try {
+ new CoordActionInputCheckXCommand(actionId, actionId.substring(0,
actionId.indexOf("@"))).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("No FileSystem for scheme"));
+ }
+ Thread.sleep(100);
+ checkCoordAction(actionId, missingDeps,
CoordinatorAction.Status.TIMEDOUT);
+ }
+
protected CoordinatorJobBean addRecordToCoordJobTableForWaiting(String
testFileName, CoordinatorJob.Status status,
Date start, Date end, boolean pending, boolean doneMatd, int
lastActionNum) throws Exception {
@@ -747,6 +788,22 @@ public class TestCoordActionInputCheckXC
}
}
+ private CoordinatorActionBean checkCoordAction(String actionId, String
expDeps, CoordinatorAction.Status stat)
+ throws Exception {
+ try {
+ JPAService jpaService = Services.get().get(JPAService.class);
+ CoordinatorActionBean action = jpaService.execute(new
CoordActionGetJPAExecutor(actionId));
+ String missDeps = action.getMissingDependencies();
+ assertEquals(expDeps, missDeps);
+ assertEquals(stat, action.getStatus());
+
+ return action;
+ }
+ catch (JPAExecutorException se) {
+ throw new Exception("Action ID " + actionId + " was not stored
properly in db");
+ }
+ }
+
private void createDir(String dir) {
Process pr;
try {
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
---
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
(original)
+++
oozie/trunk/core/src/test/java/org/apache/oozie/command/coord/TestCoordPushDependencyCheckXCommand.java
Tue Mar 5 22:57:37 2013
@@ -97,7 +97,7 @@ public class TestCoordPushDependencyChec
public void testUpdateCoordTableMultipleDepsV2() throws Exception {
// Test for two dependencies : one of them is already existing in the
// hcat server. Other one is not.
- // Expected to see both action in WAITING as first one is not
available.
+ // Expected to see both action in WAITING as first one is not
available and we only check for first.
// Later make the other partition also available. action is expected to
// be READY
String db = "default";
@@ -110,6 +110,7 @@ public class TestCoordPushDependencyChec
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+ // Checks only for first missing dependency
new CoordPushDependencyCheckXCommand(actionId).call();
// Checks dependencies in order. So list does not change if first one
is not available
@@ -126,6 +127,39 @@ public class TestCoordPushDependencyChec
checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
}
+ @Test
+ public void testUpdateCoordTableMultipleDepsV3() throws Exception {
+ // Test for two dependencies : one of them is already existing in the
+ // hcat server. Other one is not.
+ // Expected to see only first action in WAITING as we check for all
dependencies.
+ // Later make the other partition also available. action is expected to
+ // be READY
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+
+ // Checks for all missing dependencies
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ checkCoordAction(actionId, newHCatDependency1,
CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService =
Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)).contains(actionId));
+ assertTrue(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency2)));
+
+ // Make first dependency available
+ addPartition(db, table, "dt=20120430;country=brazil");
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
+ }
@Test
public void testResolveCoordConfiguration() throws Exception {
@@ -139,10 +173,11 @@ public class TestCoordPushDependencyChec
CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
- CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING,
"coord-action-for-action-push-check.xml", newHCatDependency);
+ CoordinatorActionBean action =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-push-check.xml", null, newHCatDependency,
+ "Z");
- String actionId = action1.getId();
+ String actionId = action.getId();
checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
new CoordPushDependencyCheckXCommand(actionId).call();
@@ -162,14 +197,8 @@ public class TestCoordPushDependencyChec
}
-
@Test
- public void testUpdateCoordTableMultipleDepsV3() throws Exception {
- // Test for two dependencies : one of them is already existing in the
- // hcat server. Other one is not.
- // Expected to see the action in WAITING
- // Later make the other partition also available. action is expected to
- // be READY
+ public void testTimeOut() throws Exception {
String db = "default";
String table = "tablename";
String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=brazil";
@@ -179,19 +208,99 @@ public class TestCoordPushDependencyChec
String actionId = addInitRecords(newHCatDependency);
checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
-
new CoordPushDependencyCheckXCommand(actionId, true).call();
checkCoordAction(actionId, newHCatDependency1,
CoordinatorAction.Status.WAITING);
PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
- assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)).contains(actionId));
HCatAccessorService hcatService =
Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)).contains(actionId));
assertTrue(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
- // Make first dependency available
- addPartition(db, table, "dt=20120430;country=brazil");
+ // Timeout is 10 mins. Change action created time to before 12 min to
make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 *
1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
new CoordPushDependencyCheckXCommand(actionId).call();
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency1,
CoordinatorAction.Status.TIMEDOUT);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
+
+ }
+
+ @Test
+ public void testTimeOutWithException1() throws Exception {
+ // Test timeout when missing dependencies are from a non existing table
+ String newHCatDependency1 = "hcat://" + server +
"/nodb/notable/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server +
"/nodb/notable/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService =
Services.get().get(HCatAccessorService.class);
+ assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
+ assertFalse(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
+
+ // Timeout is 10 mins. Change action created time to before 12 min to
make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 *
1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.TIMEDOUT);
+ }
+
+ @Test
+ public void testTimeOutWithException2() throws Exception {
+ // Test timeout when table containing missing dependencies is dropped
in between
+ String db = "default";
+ String table = "tablename";
+ String newHCatDependency1 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=brazil";
+ String newHCatDependency2 = "hcat://" + server + "/" + db + "/" +
table + "/dt=20120430;country=usa";
+ String newHCatDependency = newHCatDependency1 +
CoordELFunctions.INSTANCE_SEPARATOR + newHCatDependency2;
+ populateTable(db, table);
+
+ String actionId = addInitRecords(newHCatDependency);
+ checkCoordAction(actionId, newHCatDependency,
CoordinatorAction.Status.WAITING);
+ new CoordPushDependencyCheckXCommand(actionId, true).call();
+ checkCoordAction(actionId, newHCatDependency1,
CoordinatorAction.Status.WAITING);
+ PartitionDependencyManagerService pdms =
Services.get().get(PartitionDependencyManagerService.class);
+ HCatAccessorService hcatService =
Services.get().get(HCatAccessorService.class);
+ assertTrue(pdms.getWaitingActions(new
HCatURI(newHCatDependency1)).contains(actionId));
+ assertTrue(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
+
+ // Timeout is 10 mins. Change action created time to before 12 min to
make the action
+ // timeout.
+ long timeOutCreationTime = System.currentTimeMillis() - (12 * 60 *
1000);
+ setCoordActionCreationTime(actionId, timeOutCreationTime);
+ dropTable(db, table, true);
+ try {
+ new CoordPushDependencyCheckXCommand(actionId).call();
+ fail();
+ }
+ catch (Exception e) {
+ assertTrue(e.getMessage().contains("NoSuchObjectException"));
+ }
+ Thread.sleep(100);
+ // Check for timeout status and unregistered missing dependencies
+ checkCoordAction(actionId, newHCatDependency1,
CoordinatorAction.Status.TIMEDOUT);
assertNull(pdms.getWaitingActions(new HCatURI(newHCatDependency1)));
- checkCoordAction(actionId, "", CoordinatorAction.Status.READY);
assertFalse(hcatService.isRegisteredForNotification(new
HCatURI(newHCatDependency1)));
}
@@ -212,7 +321,7 @@ public class TestCoordPushDependencyChec
CoordinatorActionBean action = jpaService.execute(new
CoordActionGetJPAExecutor(actionId));
String missDeps = action.getPushMissingDependencies();
assertEquals(expDeps, missDeps);
- assertEquals(action.getStatus(), stat);
+ assertEquals(stat, action.getStatus());
return action;
}
Modified:
oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
URL:
http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
--- oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
(original)
+++ oozie/trunk/core/src/test/java/org/apache/oozie/test/XDataTestCase.java Tue
Mar 5 22:57:37 2013
@@ -58,6 +58,7 @@ import org.apache.oozie.executor.jpa.JPA
import org.apache.oozie.executor.jpa.SLAEventInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.service.CoordinatorStoreService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.Services;
@@ -65,6 +66,7 @@ import org.apache.oozie.service.UUIDServ
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
import org.apache.oozie.service.UUIDService.ApplicationType;
+import org.apache.oozie.store.CoordinatorStore;
import org.apache.oozie.util.DateUtils;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
@@ -581,6 +583,7 @@ public abstract class XDataTestCase exte
action.setCreatedTime(new Date());
action.setStatus(status);
action.setActionXml(actionXml);
+ action.setTimeOut(10);
Configuration conf = getCoordConf(appPath);
action.setCreatedConf(XmlUtils.prettyPrint(conf).toString());
@@ -1284,17 +1287,26 @@ public abstract class XDataTestCase exte
}
protected String addInitRecords(String pushMissingDependencies) throws
Exception {
+ return addInitRecords(null, pushMissingDependencies, "Z");
+ }
+
+ protected String addInitRecords(String missingDependencies, String
pushMissingDependencies, String oozieTimeZoneMask)
+ throws Exception {
CoordinatorJobBean job =
addRecordToCoordJobTableForWaiting("coord-job-for-action-input-check.xml",
CoordinatorJob.Status.RUNNING, false, true);
- CoordinatorActionBean action1 =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
- CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", pushMissingDependencies);
- return action1.getId();
+ CoordinatorActionBean action =
addRecordToCoordActionTableForWaiting(job.getId(), 1,
+ CoordinatorAction.Status.WAITING,
"coord-action-for-action-input-check.xml", missingDependencies,
+ pushMissingDependencies, oozieTimeZoneMask);
+ return action.getId();
}
protected CoordinatorActionBean
addRecordToCoordActionTableForWaiting(String jobId, int actionNum,
- CoordinatorAction.Status status, String resourceXmlName, String
pushMissingDependencies) throws Exception {
- CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0);
+ CoordinatorAction.Status status, String resourceXmlName, String
missingDependencies,
+ String pushMissingDependencies, String oozieTimeZoneMask) throws
Exception {
+ CoordinatorActionBean action = createCoordAction(jobId, actionNum,
status, resourceXmlName, 0,
+ oozieTimeZoneMask);
+ action.setMissingDependencies(missingDependencies);
action.setPushMissingDependencies(pushMissingDependencies);
try {
JPAService jpaService = Services.get().get(JPAService.class);
@@ -1344,4 +1356,13 @@ public abstract class XDataTestCase exte
throw new RuntimeException(XLog.format("Could not get " +
testFileName, ioe));
}
}
+
+ protected void setCoordActionCreationTime(String actionId, long
actionCreationTime) throws Exception {
+ CoordinatorStore store =
Services.get().get(CoordinatorStoreService.class).create();
+ CoordinatorActionBean action = store.getCoordinatorAction(actionId,
false);
+ action.setCreatedTime(new Date(actionCreationTime));
+ store.beginTrx();
+ store.updateCoordinatorAction(action);
+ store.commitTrx();
+ }
}
Modified: oozie/trunk/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1453070&r1=1453069&r2=1453070&view=diff
==============================================================================
--- oozie/trunk/release-log.txt (original)
+++ oozie/trunk/release-log.txt Tue Mar 5 22:57:37 2013
@@ -4,6 +4,7 @@ OOZIE-1239 Bump up trunk to 4.1.0-SNAPSH
-- Oozie 4.0.0 (unreleased)
+OOZIE-1250 Coord action timeout not happening when there is a exception
(rohini via mona)
OOZIE-1207 Optimize current EL resolution in case of start-instance and
end-instance (rohini via mona)
OOZIE-1247 CoordActionInputCheck shouldn't queue CoordPushInputCheck (rohini
via virag)
OOZIE-1238 CoordPushCheck doesn't evaluate the configuration section which is
propogated to workflow (virag)