Repository: oozie Updated Branches: refs/heads/master 97b21af4d -> 993a8e542
OOZIE-2090 wf:lastErrorNode does not take into account transient errors with retries (ranter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/993a8e54 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/993a8e54 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/993a8e54 Branch: refs/heads/master Commit: 993a8e542300317dba7bd1e61f5a74165c80152d Parents: 97b21af Author: Robert Kanter <[email protected]> Authored: Tue Feb 10 10:33:07 2015 -0800 Committer: Robert Kanter <[email protected]> Committed: Tue Feb 10 10:33:07 2015 -0800 ---------------------------------------------------------------------- .../oozie/command/wf/ActionEndXCommand.java | 7 +- .../org/apache/oozie/TestDagELFunctions.java | 100 ++++++++++++++++++- release-log.txt | 1 + 3 files changed, 104 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/993a8e54/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java index f060e5a..4006441 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java @@ -182,9 +182,6 @@ public class ActionEndXCommand extends ActionXCommand<Void> { cron.stop(); addActionCron(wfAction.getType(), cron); - WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); - DagELFunctions.setActionInfo(wfInstance, wfAction); - wfJob.setWorkflowInstance(wfInstance); incrActionCounter(wfAction.getType(), 1); if (!context.isEnded()) { @@ -226,6 +223,10 @@ public class ActionEndXCommand extends ActionXCommand<Void> { } } } + WorkflowInstance wfInstance = wfJob.getWorkflowInstance(); + DagELFunctions.setActionInfo(wfInstance, wfAction); + wfJob.setWorkflowInstance(wfInstance); + updateList.add(new UpdateEntry<WorkflowActionQuery>(WorkflowActionQuery.UPDATE_ACTION_END,wfAction)); wfJob.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MODIFIED, wfJob)); http://git-wip-us.apache.org/repos/asf/oozie/blob/993a8e54/core/src/test/java/org/apache/oozie/TestDagELFunctions.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/TestDagELFunctions.java b/core/src/test/java/org/apache/oozie/TestDagELFunctions.java index 2d32bdf..3fb2029 100644 --- a/core/src/test/java/org/apache/oozie/TestDagELFunctions.java +++ b/core/src/test/java/org/apache/oozie/TestDagELFunctions.java @@ -20,7 +20,13 @@ package org.apache.oozie; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.wf.ActionEndXCommand; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.service.LiteWorkflowStoreService; +import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.XmlUtils; import org.apache.oozie.workflow.lite.EndNodeDef; import org.apache.oozie.workflow.lite.LiteWorkflowApp; @@ -36,7 +42,7 @@ import org.apache.oozie.DagELFunctions; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -public class TestDagELFunctions extends XTestCase { +public class TestDagELFunctions extends XDataTestCase { protected void setUp() throws Exception { super.setUp(); @@ -120,4 +126,96 @@ public class TestDagELFunctions extends XTestCase { assertEquals("externalStatus", eval.evaluate("${wf:actionExternalStatus('actionName')}", String.class)); } + // This test simulates an action that gets retried because of an Error and succeeds on one of the retries. The lastErrorNode + // EL function should never be set to this node. + public void testLastErrorNodeWithRetrySucceed() throws Exception { + WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.END_RETRY, true); + action.setType("java"); + action.setExternalStatus("FAILED"); + action.setErrorInfo("JA018", "some error occurred"); + WorkflowActionQueryExecutor.getInstance().executeUpdate( + WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); + + new ActionEndXCommandIgnoreSignalException(action.getId(), action.getType()).call(); + + ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("workflow"); + job = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, job.getId()); + action = WorkflowActionQueryExecutor.getInstance().get( + WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, action.getId()); + DagELFunctions.configureEvaluator(eval, job, action); + assertEquals("", eval.evaluate("${wf:lastErrorNode()}", String.class)); + + action.setExternalStatus("SUCCEEDED"); + action.setStatus(WorkflowAction.Status.DONE); + WorkflowActionQueryExecutor.getInstance().executeUpdate( + WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); + + new ActionEndXCommandIgnoreSignalException(action.getId(), action.getType()).call(); + + job = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, job.getId()); + action = WorkflowActionQueryExecutor.getInstance().get( + WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, action.getId()); + assertEquals(WorkflowAction.Status.OK, action.getStatus()); + DagELFunctions.configureEvaluator(eval, job, action); + assertEquals("", eval.evaluate("${wf:lastErrorNode()}", String.class)); + } + + // This test simulates an action that gets retried because of an Error and never succeeds on later retries. The lastErrorNode + // EL function should be set to this node, but only after the last retry. + public void testLastErrorNodeWithRetryFail() throws Exception { + WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.END_RETRY, true); + action.setType("java"); + action.setExternalStatus("FAILED"); + action.setErrorInfo("JA018", "some error occurred"); + WorkflowActionQueryExecutor.getInstance().executeUpdate( + WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); + + new ActionEndXCommandIgnoreSignalException(action.getId(), action.getType()).call(); + + ELEvaluator eval = Services.get().get(ELService.class).createEvaluator("workflow"); + job = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, job.getId()); + action = WorkflowActionQueryExecutor.getInstance().get( + WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, action.getId()); + DagELFunctions.configureEvaluator(eval, job, action); + assertEquals("", eval.evaluate("${wf:lastErrorNode()}", String.class)); + + action.setExternalStatus("FAILED"); + action.setErrorInfo("JA018", "some error occurred"); + action.setStatus(WorkflowAction.Status.END_RETRY); + action.setUserRetryCount(action.getUserRetryMax()); // make it the last retry + WorkflowActionQueryExecutor.getInstance().executeUpdate( + WorkflowActionQueryExecutor.WorkflowActionQuery.UPDATE_ACTION, action); + + new ActionEndXCommandIgnoreSignalException(action.getId(), action.getType()).call(); + + job = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQueryExecutor.WorkflowJobQuery.GET_WORKFLOW, job.getId()); + action = WorkflowActionQueryExecutor.getInstance().get( + WorkflowActionQueryExecutor.WorkflowActionQuery.GET_ACTION, action.getId()); + assertEquals(WorkflowAction.Status.ERROR, action.getStatus()); + DagELFunctions.configureEvaluator(eval, job, action); + assertEquals(action.getName(), eval.evaluate("${wf:lastErrorNode()}", String.class)); + } + + private class ActionEndXCommandIgnoreSignalException extends ActionEndXCommand { + + public ActionEndXCommandIgnoreSignalException(String actionId, String type) { + super(actionId, type); + } + + @Override + protected Void execute() throws CommandException { + try { + return super.execute(); + } catch (CommandException ce) { + // ActionEndXCommand will trigger a SignalXComamnd, which will complain about executionPath being empty -- ignore it + if (!(ce.getCause() instanceof IllegalArgumentException) + || !ce.getMessage().equals("E0607: Other error in operation [signal], executionPath cannot be empty")) { + throw ce; + } + } + return null; + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/993a8e54/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f592299..2cec842 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2090 wf:lastErrorNode does not take into account transient errors with retries (ranter) OOZIE-2133 Support getting ATS delegation tokens for tez jobs (rohini) OOZIE-2131 Add flag to sqoop action to skip hbase delegation token generation (abec via ranter) OOZIE-2127 Add created time to RecoveryService WF queries (puru)
