Repository: oozie Updated Branches: refs/heads/master 91df71edc -> 844b067b3
OOZIE-1849 If the underlying job finishes while a Workflow is suspended, Oozie can take a while to realize it (rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/844b067b Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/844b067b Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/844b067b Branch: refs/heads/master Commit: 844b067b34fdcc588495d37d3209ccd806949fff Parents: 91df71e Author: Robert Kanter <[email protected]> Authored: Tue May 20 10:29:28 2014 -0700 Committer: Robert Kanter <[email protected]> Committed: Tue May 20 10:29:28 2014 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 2 +- .../oozie/command/wf/ActionCheckXCommand.java | 2 +- .../command/wf/TestActionCheckXCommand.java | 85 ++++++++++++++++---- release-log.txt | 1 + 4 files changed, 72 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/844b067b/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 7d4af02..d3c1b03 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -170,7 +170,7 @@ public enum ErrorCode { E0815(XLog.STD, "Action pending=[{0}], status=[{1}]. Skipping ActionCheck Execution"), E0816(XLog.STD, "Action pending=[{0}], status=[{1}]. Skipping ActionStart Execution"), E0817(XLog.STD, "The wf action [{0}] has been udated recently. Ignoring ActionCheck."), - E0818(XLog.STD, "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING."), + E0818(XLog.STD, "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING or SUSPENDED."), E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{0}]."), E0820(XLog.STD, "Action user retry max [{0}] is over system defined max [{1}], re-assign to use system max."), http://git-wip-us.apache.org/repos/asf/oozie/blob/844b067b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java index 9f0390f..39c522d 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java @@ -144,7 +144,7 @@ public class ActionCheckXCommand extends ActionXCommand<Void> { if (!wfAction.isPending() || wfAction.getStatus() != WorkflowActionBean.Status.RUNNING) { throw new PreconditionException(ErrorCode.E0815, wfAction.isPending(), wfAction.getStatusStr()); } - if (wfJob.getStatus() != WorkflowJob.Status.RUNNING) { + if (wfJob.getStatus() != WorkflowJob.Status.RUNNING && wfJob.getStatus() != WorkflowJob.Status.SUSPENDED) { wfAction.setLastCheckTime(new Date()); try { WorkflowActionQueryExecutor.getInstance().executeUpdate( http://git-wip-us.apache.org/repos/asf/oozie/blob/844b067b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index 00a6368..a7316d5 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -81,21 +81,22 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionCheckXCommand checkCmd = new ActionCheckXCommand(action.getId(), 10); - Long counterVal = new Long(0); + long counterVal; try { counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); } catch (NullPointerException e){ //counter might be null + counterVal = 0L; } - assertEquals(new Long(0), new Long(counterVal)); + assertEquals(0L, counterVal); checkCmd.call(); //precondition failed because of actionCheckDelay > 0 counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); - assertEquals(new Long(1), new Long(counterVal)); + assertEquals(1L, counterVal); } /** @@ -111,21 +112,22 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionCheckXCommand checkCmd = new ActionCheckXCommand(action.getId()); - Long counterVal = new Long(0); + long counterVal; try { counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); } catch (NullPointerException e){ //counter might be null + counterVal = 0L; } - assertEquals(new Long(0), new Long(counterVal)); + assertEquals(0L, counterVal); checkCmd.call(); //precondition failed because of pending = false counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); - assertEquals(new Long(1), new Long(counterVal)); + assertEquals(1L, counterVal); } /** @@ -141,25 +143,26 @@ public class TestActionCheckXCommand extends XDataTestCase { ActionCheckXCommand checkCmd = new ActionCheckXCommand(action.getId()); - Long counterVal = new Long(0); + long counterVal; try{ counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); } catch (NullPointerException e){ //counter might be null + counterVal = 0L; } - assertEquals(new Long(0), new Long(counterVal)); + assertEquals(0L, counterVal); checkCmd.call(); //precondition failed because of action != RUNNING counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); - assertEquals(new Long(1), new Long(counterVal)); + assertEquals(1L, counterVal); } /** - * Test : verify the PreconditionException is thrown when job != RUNNING + * Test : verify the PreconditionException is thrown when job != RUNNING && job != SUSPENDED * * @throws Exception */ @@ -167,25 +170,75 @@ public class TestActionCheckXCommand extends XDataTestCase { Instrumentation inst = Services.get().get(InstrumentationService.class).get(); WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.FAILED, WorkflowInstance.Status.FAILED); - WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.PREP); + WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING); ActionCheckXCommand checkCmd = new ActionCheckXCommand(action.getId()); - Long counterVal = new Long(0); + long counterVal; try { counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); } catch (NullPointerException e){ //counter might be null + counterVal = 0L; } - assertEquals(new Long(0), new Long(counterVal)); + assertEquals(0L, counterVal); checkCmd.call(); - //precondition failed because of job != RUNNING - counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP).get(checkCmd.getName() + ".preconditionfailed").getValue(); - assertEquals(new Long(1), new Long(counterVal)); + //precondition failed because of job != RUNNING && job != SUSPENDED + counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP) + .get(checkCmd.getName() + ".preconditionfailed").getValue(); + assertEquals(1L, counterVal); + + job = this.addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING); + + checkCmd = new ActionCheckXCommand(action.getId()); + + checkCmd.call(); + + //precondition passed because job == RUNNING so counter shouldn't have incremented + counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP) + .get(checkCmd.getName() + ".preconditionfailed").getValue(); + assertEquals(1L, counterVal); + + job = this.addRecordToWfJobTable(WorkflowJob.Status.SUSPENDED, WorkflowInstance.Status.SUSPENDED); + action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING); + + checkCmd = new ActionCheckXCommand(action.getId()); + + checkCmd.call(); + + //precondition passed because job == SUSPENDED so counter shouldn't have incremented + counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP) + .get(checkCmd.getName() + ".preconditionfailed").getValue(); + assertEquals(1L, counterVal); + + job = this.addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); + action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING); + + checkCmd = new ActionCheckXCommand(action.getId()); + + checkCmd.call(); + + //precondition failed because of job != RUNNING && job != SUSPENDED + counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP) + .get(checkCmd.getName() + ".preconditionfailed").getValue(); + assertEquals(2L, counterVal); + + job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED); + action = this.addRecordToWfActionTable(job.getId(), "1", WorkflowAction.Status.RUNNING); + + checkCmd = new ActionCheckXCommand(action.getId()); + + checkCmd.call(); + + //precondition failed because of job != RUNNING && job != SUSPENDED + counterVal = inst.getCounters().get(XCommand.INSTRUMENTATION_GROUP) + .get(checkCmd.getName() + ".preconditionfailed").getValue(); + assertEquals(3L, counterVal); } public void testActionCheck() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/844b067b/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 86642f1..8759dad 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1849 If the underlying job finishes while a Workflow is suspended, Oozie can take a while to realize it (rkanter) OOZIE-1835 NullPointerException from SLAEmailEventListener (rkanter) OOZIE-1809 offset and len options are ignored in oozie job -info for workflow (ryota) OOZIE-1826 Add thread which detects JVM pauses (rkanter)
