Author: kamrul
Date: Tue Nov 20 20:55:34 2012
New Revision: 1411860
URL: http://svn.apache.org/viewvc?rev=1411860&view=rev
Log:
OOZIE-994 ActionCheckXCommand does not handle failures properly(rkanter via
mohammad)
Modified:
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/test/XTestCase.java
oozie/branches/branch-3.3/release-log.txt
Modified:
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/action/ActionExecutor.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
(original)
+++
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
Tue Nov 20 20:55:34 2012
@@ -59,15 +59,17 @@ public abstract class ActionExecutor {
private static class ErrorInfo {
ActionExecutorException.ErrorType errorType;
String errorCode;
+ Class<?> errorClass;
- private ErrorInfo(ActionExecutorException.ErrorType errorType, String
errorCode) {
+ private ErrorInfo(ActionExecutorException.ErrorType errorType, String
errorCode, Class<?> errorClass) {
this.errorType = errorType;
this.errorCode = errorCode;
+ this.errorClass = errorClass;
}
}
private static boolean initMode = false;
- private static Map<String, Map<Class, ErrorInfo>> ERROR_INFOS = new
HashMap<String, Map<Class, ErrorInfo>>();
+ private static Map<String, Map<String, ErrorInfo>> ERROR_INFOS = new
HashMap<String, Map<String, ErrorInfo>>();
/**
* Context information passed to the ActionExecutor methods.
@@ -266,7 +268,7 @@ public abstract class ActionExecutor {
*/
public void initActionType() {
XLog.getLog(getClass()).trace(" Init Action Type : [{0}]", getType());
- ERROR_INFOS.put(getType(), new LinkedHashMap<Class, ErrorInfo>());
+ ERROR_INFOS.put(getType(), new LinkedHashMap<String, ErrorInfo>());
}
/**
@@ -310,9 +312,9 @@ public abstract class ActionExecutor {
throw new IllegalStateException("Error, action type info locked");
}
try {
- Class klass =
Thread.currentThread().getContextClassLoader().loadClass(exClass);
- Map<Class, ErrorInfo> executorErrorInfo =
ERROR_INFOS.get(getType());
- executorErrorInfo.put(klass, new ErrorInfo(errorType, errorCode));
+ Class errorClass =
Thread.currentThread().getContextClassLoader().loadClass(exClass);
+ Map<String, ErrorInfo> executorErrorInfo =
ERROR_INFOS.get(getType());
+ executorErrorInfo.put(exClass, new ErrorInfo(errorType, errorCode,
errorClass));
}
catch (ClassNotFoundException cnfe) {
XLog.getLog(getClass()).warn(
@@ -383,16 +385,41 @@ public abstract class ActionExecutor {
if (ex instanceof ActionExecutorException) {
return (ActionExecutorException) ex;
}
- for (Map.Entry<Class, ErrorInfo> errorInfo :
ERROR_INFOS.get(getType()).entrySet()) {
- if (errorInfo.getKey().isInstance(ex)) {
- return new
ActionExecutorException(errorInfo.getValue().errorType,
errorInfo.getValue().errorCode,
- "{0}", ex.getMessage(), ex);
+
+ ActionExecutorException aee = null;
+ // Check the cause of the exception first
+ if (ex.getCause() != null) {
+ aee = convertExceptionHelper(ex.getCause());
+ }
+ // If the cause isn't registered or doesn't exist, check the exception
itself
+ if (aee == null) {
+ aee = convertExceptionHelper(ex);
+ // If the cause isn't registered either, then just create a new
ActionExecutorException
+ if (aee == null) {
+ String exClass = ex.getClass().getName();
+ String errorCode = exClass.substring(exClass.lastIndexOf(".")
+ 1);
+ aee = new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode,
"{0}", ex.getMessage(), ex);
+ }
+ }
+ return aee;
+ }
+
+ private ActionExecutorException convertExceptionHelper(Throwable ex) {
+ Map<String, ErrorInfo> executorErrorInfo = ERROR_INFOS.get(getType());
+ // Check if we have registered ex
+ ErrorInfo classErrorInfo =
executorErrorInfo.get(ex.getClass().getName());
+ if (classErrorInfo != null) {
+ return new ActionExecutorException(classErrorInfo.errorType,
classErrorInfo.errorCode, "{0}", ex.getMessage(), ex);
+ }
+ // Else, check if a parent class of ex is registered
+ else {
+ for (ErrorInfo errorInfo : executorErrorInfo.values()) {
+ if (errorInfo.errorClass.isInstance(ex)) {
+ return new ActionExecutorException(errorInfo.errorType,
errorInfo.errorCode, "{0}", ex.getMessage(), ex);
+ }
}
}
- String errorCode = ex.getClass().getName();
- errorCode = errorCode.substring(errorCode.lastIndexOf(".") + 1);
- return new
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, errorCode,
"{0}", ex.getMessage(),
- ex);
+ return null;
}
/**
Modified:
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
(original)
+++
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
Tue Nov 20 20:55:34 2012
@@ -28,6 +28,7 @@ import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
+import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.WorkflowAction.Status;
import org.apache.oozie.client.rest.JsonBean;
@@ -158,6 +159,9 @@ public class ActionCheckXCommand extends
ActionExecutorContext context = null;
try {
boolean isRetry = false;
+ if (wfAction.getRetries() > 0) {
+ isRetry = true;
+ }
boolean isUserRetry = false;
context = new ActionXCommand.ActionExecutorContext(wfJob,
wfAction, isRetry, isUserRetry);
incrActionCounter(wfAction.getType(), 1);
@@ -198,6 +202,14 @@ public class ActionCheckXCommand extends
case ERROR:
handleUserRetry(wfAction);
break;
+ case TRANSIENT: // retry N times, then suspend
workflow
+ if (!handleTransient(context, executor,
WorkflowAction.Status.RUNNING)) {
+ handleNonTransient(context, executor,
WorkflowAction.Status.START_MANUAL);
+ wfAction.setPendingAge(new Date());
+ wfAction.setRetries(0);
+ wfAction.setStartTime(null);
+ }
+ break;
}
wfAction.setLastCheckTime(new Date());
updateList = new ArrayList<JsonBean>();
Modified:
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
(original)
+++
oozie/branches/branch-3.3/core/src/main/java/org/apache/oozie/command/wf/ResumeXCommand.java
Tue Nov 20 20:55:34 2012
@@ -17,6 +17,8 @@
*/
package org.apache.oozie.command.wf;
+import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -24,15 +26,22 @@ import java.util.List;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.control.EndActionExecutor;
+import org.apache.oozie.action.control.ForkActionExecutor;
+import org.apache.oozie.action.control.JoinActionExecutor;
+import org.apache.oozie.action.control.KillActionExecutor;
+import org.apache.oozie.action.control.StartActionExecutor;
import org.apache.oozie.client.WorkflowJob;
import org.apache.oozie.client.rest.JsonBean;
import org.apache.oozie.command.CommandException;
import org.apache.oozie.command.PreconditionException;
import org.apache.oozie.command.coord.CoordActionUpdateXCommand;
+import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
import org.apache.oozie.executor.jpa.BulkUpdateInsertJPAExecutor;
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowJobGetActionsJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.util.InstrumentUtils;
@@ -79,6 +88,19 @@ public class ResumeXCommand extends Work
if (action.isPending()) {
if (action.getStatus() ==
WorkflowActionBean.Status.PREP
|| action.getStatus() ==
WorkflowActionBean.Status.START_MANUAL) {
+ // When resuming a workflow that was
programatically suspended (via ActionCheckXCommand) because of
+ // a repeated transient error, we have to clean up
the action dir
+ if
(!action.getType().equals(StartActionExecutor.TYPE) && // The control
actions have invalid
+
!action.getType().equals(ForkActionExecutor.TYPE) && // action dir paths
because they
+
!action.getType().equals(JoinActionExecutor.TYPE) && // contain ":"
(colons)
+
!action.getType().equals(KillActionExecutor.TYPE) &&
+
!action.getType().equals(EndActionExecutor.TYPE)) {
+ ActionExecutorContext context =
+ new
ActionXCommand.ActionExecutorContext(workflow, action, false, false);
+ if
(context.getAppFileSystem().exists(context.getActionDir())) {
+
context.getAppFileSystem().delete(context.getActionDir(), true);
+ }
+ }
queue(new ActionStartXCommand(action.getId(),
action.getType()));
}
else {
@@ -118,6 +140,15 @@ public class ResumeXCommand extends Work
catch (JPAExecutorException e) {
throw new CommandException(e);
}
+ catch (HadoopAccessorException e) {
+ throw new CommandException(e);
+ }
+ catch (IOException e) {
+ throw new CommandException(ErrorCode.E0902, e);
+ }
+ catch (URISyntaxException e) {
+ throw new CommandException(ErrorCode.E0902, e);
+ }
finally {
// update coordinator action
new CoordActionUpdateXCommand(workflow).call();
Modified:
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
(original)
+++
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/action/TestActionExecutor.java
Tue Nov 20 20:55:34 2012
@@ -17,6 +17,7 @@
*/
package org.apache.oozie.action;
+import java.io.EOFException;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.client.WorkflowAction;
@@ -146,5 +147,73 @@ public class TestActionExecutor extends
fail();
}
+ cause = new EOFException(); // not registered, but subclass of
IOException
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(cause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT,
ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ Exception rootCause = new RemoteException();
+ cause = new RuntimeException(rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT,
ex.getErrorType());
+ assertEquals("RMI", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new RemoteException();
+ cause = new IOException(rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.NON_TRANSIENT,
ex.getErrorType());
+ assertEquals("RMI", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new IOException();
+ cause = new RemoteException("x", rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT,
ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
+
+ rootCause = new EOFException(); // not registered, but subclass of
IOException
+ cause = new RemoteException("x", rootCause);
+ try {
+ throw ae.convertException(cause);
+ }
+ catch (ActionExecutorException ex) {
+ assertEquals(rootCause, ex.getCause());
+ assertEquals(ActionExecutorException.ErrorType.TRANSIENT,
ex.getErrorType());
+ assertEquals("IO", ex.getErrorCode());
+ }
+ catch (Exception ex) {
+ fail();
+ }
}
}
Modified:
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
(original)
+++
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
Tue Nov 20 20:55:34 2012
@@ -39,6 +39,7 @@ import org.apache.oozie.command.wf.Actio
import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
@@ -237,6 +238,225 @@ public class TestActionCheckXCommand ext
}
+ public void testActionCheckTransientDuringLauncher() throws Exception {
+ services.destroy();
+ // Make the ActionCheckXCommand run more frequently so the test won't
take as long
+
setSystemProperty("oozie.service.ActionCheckerService.action.check.interval",
"10");
+
setSystemProperty("oozie.service.ActionCheckerService.action.check.delay",
"20");
+ // Make the max number of retries lower so the test won't take as long
+ final int maxRetries = 2;
+ setSystemProperty("oozie.action.retries.max",
Integer.toString(maxRetries));
+ services = new Services();
+ services.init();
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean job0 =
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ final String jobId = job0.getId();
+ WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1",
WorkflowAction.Status.PREP);
+ final String actionId = action0.getId();
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new
WorkflowActionGetJPAExecutor(actionId);
+
+ new ActionStartXCommand(actionId, "map-reduce").call();
+ final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+ String originalLauncherId = action1.getExternalId();
+
+ // At this point, the launcher job has started (but not finished)
+ // Now, shutdown the job tracker to pretend it has gone down during
the launcher job
+ executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+ @Override
+ public void execute() throws Exception {
+ assertEquals(0, action1.getRetries());
+ new ActionCheckXCommand(actionId).call();
+
+ waitFor(180 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a =
jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() == maxRetries + 1);
+ }
+ });
+ WorkflowActionBean action1b =
jpaService.execute(wfActionGetCmd);
+ assertEquals(0, action1b.getRetries());
+ assertEquals("START_MANUAL", action1b.getStatusStr());
+
+ WorkflowJobBean job1 = jpaService.execute(new
WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("SUSPENDED", job1.getStatusStr());
+
+ // At this point, the action has gotten a transient error,
even after maxRetries tries so the workflow has been
+ // SUSPENDED
+ }
+ });
+ // Now, lets bring the job tracker back up and resume the workflow
(which will restart the current action)
+ // It should now continue and finish with SUCCEEDED
+ new ResumeXCommand(jobId).call();
+ WorkflowJobBean job2 = jpaService.execute(new
WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("RUNNING", job2.getStatusStr());
+
+ ActionExecutorContext context = new
ActionXCommand.ActionExecutorContext(job2, action1, false, false);
+ WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+ MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+ JobConf conf = actionExecutor.createBaseHadoopConf(context,
XmlUtils.parseXml(action2.getConf()));
+ String user = conf.get("user.name");
+ JobClient jobClient =
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+ String launcherId = action3.getExternalId();
+ assertFalse(originalLauncherId.equals(launcherId));
+
+ final RunningJob launcherJob =
jobClient.getJob(JobID.forName(launcherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+ String mapperId = action4.getExternalId();
+
+ assertFalse(launcherId.equals(mapperId));
+
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertTrue(mrJob.isSuccessful());
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+ assertEquals("SUCCEEDED", action5.getExternalStatus());
+ }
+
+ public void testActionCheckTransientDuringMRAction() throws Exception {
+ services.destroy();
+ // Make the ActionCheckXCommand run more frequently so the test won't
take as long
+
setSystemProperty("oozie.service.ActionCheckerService.action.check.interval",
"10");
+
setSystemProperty("oozie.service.ActionCheckerService.action.check.delay",
"20");
+ // Make the max number of retries lower so the test won't take as long
+ final int maxRetries = 2;
+ setSystemProperty("oozie.action.retries.max",
Integer.toString(maxRetries));
+ services = new Services();
+ services.init();
+
+ final JPAService jpaService = Services.get().get(JPAService.class);
+ WorkflowJobBean job0 =
this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING,
WorkflowInstance.Status.RUNNING);
+ final String jobId = job0.getId();
+ WorkflowActionBean action0 = this.addRecordToWfActionTable(jobId, "1",
WorkflowAction.Status.PREP);
+ final String actionId = action0.getId();
+ final WorkflowActionGetJPAExecutor wfActionGetCmd = new
WorkflowActionGetJPAExecutor(actionId);
+
+ new ActionStartXCommand(actionId, "map-reduce").call();
+ final WorkflowActionBean action1 = jpaService.execute(wfActionGetCmd);
+ String originalLauncherId = action1.getExternalId();
+
+ ActionExecutorContext context = new
ActionXCommand.ActionExecutorContext(job0, action1, false, false);
+ MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
+ JobConf conf = actionExecutor.createBaseHadoopConf(context,
XmlUtils.parseXml(action1.getConf()));
+ String user = conf.get("user.name");
+ JobClient jobClient =
Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+
+ final RunningJob launcherJob =
jobClient.getJob(JobID.forName(originalLauncherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob.isComplete();
+ }
+ });
+ assertTrue(launcherJob.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob));
+
+ new ActionCheckXCommand(action1.getId()).call();
+ WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
+ String originalMapperId = action2.getExternalId();
+
+ assertFalse(originalLauncherId.equals(originalMapperId));
+
+ // At this point, the launcher job has finished and the map-reduce
action has started (but not finished)
+ // Now, shutdown the job tracker to pretend it has gone down during
the map-reduce job
+ executeWhileJobTrackerIsShutdown(new ShutdownJobTrackerExecutable() {
+ @Override
+ public void execute() throws Exception {
+ assertEquals(0, action1.getRetries());
+ new ActionCheckXCommand(actionId).call();
+
+ waitFor(180 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ WorkflowActionBean action1a =
jpaService.execute(wfActionGetCmd);
+ return (action1a.getRetries() == maxRetries + 1);
+ }
+ });
+ WorkflowActionBean action1b =
jpaService.execute(wfActionGetCmd);
+ assertEquals(0, action1b.getRetries());
+ assertEquals("START_MANUAL", action1b.getStatusStr());
+
+ WorkflowJobBean job1 = jpaService.execute(new
WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("SUSPENDED", job1.getStatusStr());
+
+ // At this point, the action has gotten a transient error,
even after maxRetries tries so the workflow has been
+ // SUSPENDED
+ }
+ });
+ // Now, lets bring the job tracker back up and resume the workflow
(which will restart the current action)
+ // It should now continue and finish with SUCCEEDED
+ new ResumeXCommand(jobId).call();
+ WorkflowJobBean job2 = jpaService.execute(new
WorkflowJobGetJPAExecutor(jobId));
+ assertEquals("RUNNING", job2.getStatusStr());
+
+ sleep(500);
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action3 = jpaService.execute(wfActionGetCmd);
+ String launcherId = action3.getExternalId();
+
+ assertFalse(originalLauncherId.equals(launcherId));
+ assertFalse(originalMapperId.equals(launcherId));
+
+ final RunningJob launcherJob2 =
jobClient.getJob(JobID.forName(launcherId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return launcherJob2.isComplete();
+ }
+ });
+ assertTrue(launcherJob2.isSuccessful());
+ assertTrue(LauncherMapper.hasIdSwap(launcherJob2));
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
+ String mapperId = action4.getExternalId();
+ assertFalse(originalMapperId.equals(mapperId));
+
+ assertFalse(launcherId.equals(mapperId));
+
+ final RunningJob mrJob = jobClient.getJob(JobID.forName(mapperId));
+
+ waitFor(120 * 1000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return mrJob.isComplete();
+ }
+ });
+ assertTrue(mrJob.isSuccessful());
+
+ new ActionCheckXCommand(actionId).call();
+ WorkflowActionBean action5 = jpaService.execute(wfActionGetCmd);
+
+ assertEquals("SUCCEEDED", action5.getExternalStatus());
+ }
+
@Override
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String
actionName, WorkflowAction.Status status) throws Exception {
WorkflowActionBean action = createWorkflowActionSetPending(wfId,
status);
@@ -281,6 +501,7 @@ public class TestActionCheckXCommand ext
String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<prepare><delete path=\"" + outputDir.toString() + "\"/></prepare>" +
"<configuration>" +
"<property><name>mapred.mapper.class</name><value>" +
MapperReducerForTest.class.getName() +
"</value></property>" +
Modified:
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/test/XTestCase.java
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/test/XTestCase.java?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
---
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/test/XTestCase.java
(original)
+++
oozie/branches/branch-3.3/core/src/test/java/org/apache/oozie/test/XTestCase.java
Tue Nov 20 20:55:34 2012
@@ -805,5 +805,38 @@ public abstract class XTestCase extends
return jobConf;
}
+ /**
+ * A 'closure' used by {@link XTestCase#executeWhileJobTrackerIsShutdown}
method.
+ */
+ public static interface ShutdownJobTrackerExecutable {
+
+ /**
+ * Execute some code
+ *
+ * @throws Exception thrown if the executed code throws an exception.
+ */
+ public void execute() throws Exception;
+ }
+
+ /**
+ * Execute some code, expressed via a {@link
ShutdownJobTrackerExecutable}, while the JobTracker is shutdown. Once the code
has
+ * finished, the JobTracker is restarted (even if an exception occurs).
+ *
+ * @param executable The ShutdownJobTrackerExecutable to execute while the
JobTracker is shutdown
+ */
+ protected void
executeWhileJobTrackerIsShutdown(ShutdownJobTrackerExecutable executable) {
+ mrCluster.stopJobTracker();
+ Exception ex = null;
+ try {
+ executable.execute();
+ } catch (Exception e) {
+ ex = e;
+ } finally {
+ mrCluster.startJobTracker();
+ }
+ if (ex != null) {
+ throw new RuntimeException(ex);
+ }
+ }
}
Modified: oozie/branches/branch-3.3/release-log.txt
URL:
http://svn.apache.org/viewvc/oozie/branches/branch-3.3/release-log.txt?rev=1411860&r1=1411859&r2=1411860&view=diff
==============================================================================
--- oozie/branches/branch-3.3/release-log.txt (original)
+++ oozie/branches/branch-3.3/release-log.txt Tue Nov 20 20:55:34 2012
@@ -1,5 +1,6 @@
-- Oozie 3.3.0 release
+OOZIE-994 ActionCheckXCommand does not handle failures properly(rkanter via
mohammad)
OOZIE-1058 ACL modify-job should not be hardcoded to group name(mona via
mohammad)
OOZIE-1052 HadoopAccessorService.createFileSystem throws exception in
map-reduce action, failing workflow.(ryota via mohammad).
OOZIE-1060 bump hadoop 2.X version to 2.0.2-alpha (rvs via tucu)