Repository: oozie Updated Branches: refs/heads/master ca98c6793 -> b8c6b702b
OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b8c6b702 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b8c6b702 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b8c6b702 Branch: refs/heads/master Commit: b8c6b702b0e5e120f8231448dbc32ea01443a150 Parents: ca98c67 Author: Robert Kanter <[email protected]> Authored: Thu Mar 17 18:32:24 2016 -0700 Committer: Robert Kanter <[email protected]> Committed: Thu Mar 17 18:32:24 2016 -0700 ---------------------------------------------------------------------- .../coord/CoordActionUpdateXCommand.java | 43 ++++++++++++-------- .../apache/oozie/event/TestEventGeneration.java | 32 +++++++++++---- release-log.txt | 1 + 3 files changed, 51 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java index d628a9f..ef49ea5 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordActionUpdateXCommand.java @@ -77,7 +77,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { @Override protected Void execute() throws CommandException { try { - LOG.debug("STARTED CoordActionUpdateXCommand for wfId=" + workflow.getId()); + LOG.debug("STARTED CoordActionUpdateXCommand for wfId=[{0}]", workflow.getId()); + final CoordinatorAction.Status formerCoordinatorStatus = coordAction.getStatus(); + final int formerCoordinatorPending = coordAction.getPending(); Status slaStatus = null; if (workflow.getStatus() == WorkflowJob.Status.SUCCEEDED) { coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); @@ -105,7 +107,7 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { coordAction.decrementAndGetPending(); } else { - LOG.warn("Unexpected workflow " + workflow.getId() + " STATUS " + workflow.getStatus()); + LOG.warn("Unexpected workflow [{0}] STATUS [{1}]", workflow.getId(), workflow.getStatus()); // update lastModifiedTime coordAction.setLastModifiedTime(new Date()); CoordActionQueryExecutor.getInstance().executeUpdate( @@ -119,8 +121,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { return null; } - LOG.info("Updating Coordintaor action id :" + coordAction.getId() + " status " - + " to " + coordAction.getStatus() + ", pending = " + coordAction.getPending()); + LOG.info("Updating Coordinator action id: [{0}] status [{1}] to [{2}] pending [{3}] to [{4}]", + coordAction.getId(), formerCoordinatorStatus, coordAction.getStatus(), + formerCoordinatorPending, coordAction.getPending()); coordAction.setLastModifiedTime(new Date()); updateList.add(new UpdateEntry<CoordActionQuery>(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, @@ -133,9 +136,9 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { LOG.info("Updating Coordinator job "+ coordJob.getId() + "pending to true"); }*/ if (slaStatus != null) { - SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), coordAction.getId(), slaStatus, - SlaAppType.COORDINATOR_ACTION, LOG); - if(slaEvent != null) { + SLAEventBean slaEvent = SLADbOperations.createStatusEvent(coordAction.getSlaXml(), + coordAction.getId(), slaStatus, SlaAppType.COORDINATOR_ACTION, LOG); + if (slaEvent != null) { insertList.add(slaEvent); } } @@ -149,10 +152,10 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { generateEvent(coordAction, coordJob.getUser(), coordJob.getAppName(), workflow.getStartTime()); } - LOG.debug("ENDED CoordActionUpdateXCommand for wfId=" + workflow.getId()); + LOG.debug("ENDED CoordActionUpdateXCommand for wfId= [{0}]", workflow.getId()); } catch (XException ex) { - LOG.warn("CoordActionUpdate Failed ", ex.getMessage()); + LOG.warn("CoordActionUpdate Failed [{0}]", ex.getMessage()); throw new CommandException(ex); } return null; @@ -202,18 +205,26 @@ public class CoordActionUpdateXCommand extends CoordinatorXCommand<Void> { @Override protected void verifyPrecondition() throws CommandException, PreconditionException { - - // if coord action is RUNNING and pending false and workflow is RUNNING, this doesn't need to be updated. - if (workflow.getStatus() == WorkflowJob.Status.RUNNING - && coordAction.getStatus() == CoordinatorAction.Status.RUNNING && !coordAction.isPending()) { + // if coord action status == workflow, and pending false then coord action is already updated + if (((workflow.getStatus() == WorkflowJob.Status.SUCCEEDED + && coordAction.getStatus() == CoordinatorAction.Status.SUCCEEDED) || + (workflow.getStatus() == WorkflowJob.Status.FAILED + && coordAction.getStatus() == CoordinatorAction.Status.FAILED) || + (workflow.getStatus() == WorkflowJob.Status.KILLED + && coordAction.getStatus() == CoordinatorAction.Status.KILLED) || + (workflow.getStatus() == WorkflowJob.Status.SUSPENDED + && coordAction.getStatus() == CoordinatorAction.Status.SUSPENDED) || + (workflow.getStatus() == WorkflowJob.Status.RUNNING + && coordAction.getStatus() == CoordinatorAction.Status.RUNNING) + ) && !coordAction.isPending()) { try { CoordActionQueryExecutor.getInstance().executeUpdate( CoordActionQueryExecutor.CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction); - } - catch (JPAExecutorException je) { + } catch (JPAExecutorException je) { throw new CommandException(je); } - throw new PreconditionException(ErrorCode.E1100, ", workflow is RUNNING and coordinator action is RUNNING and pending false"); + throw new PreconditionException(ErrorCode.E1100, ", workflow is " + workflow.getStatus() + + " and coordinator action is " + coordAction.getStatus() + " and pending false"); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java index 14f5294..5aaa344 100644 --- a/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java +++ b/core/src/test/java/org/apache/oozie/event/TestEventGeneration.java @@ -85,7 +85,6 @@ import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; import org.apache.oozie.test.XDataTestCase; -import org.apache.oozie.test.XTestCase.Predicate; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -120,6 +119,9 @@ public class TestEventGeneration extends XDataTestCase { super.setUp(); services = new Services(); Configuration conf = services.getConf(); + // The EventHandlerService manipulates the queues in the background, so the actual test results depend on the + // circumstances (like the speed of the machine, debugging etc). + conf.setInt("oozie.service.EventHandlerService.worker.threads", 0); conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService"); services.init(); ehs = services.get(EventHandlerService.class); @@ -491,7 +493,7 @@ public class TestEventGeneration extends XDataTestCase { } @Test - public void testForNoDuplicates() throws Exception { + public void testForNoDuplicatesWorkflowEvents() throws Exception { // test workflow job events Reader reader = IOUtils.getResourceAsReader("wf-no-op.xml", -1); Writer writer = new FileWriter(getTestCaseDir() + "/workflow.xml"); @@ -515,7 +517,10 @@ public class TestEventGeneration extends XDataTestCase { assertEquals(EventStatus.STARTED, ((JobEvent)queue.poll()).getEventStatus()); assertEquals(EventStatus.SUCCESS, ((JobEvent)queue.poll()).getEventStatus()); queue.clear(); + } + @Test + public void testForNoDuplicatesCoordinatorActionEvents() throws Exception { // test coordinator action events (failure case) Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z"); Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z"); @@ -535,10 +540,17 @@ public class TestEventGeneration extends XDataTestCase { assertEquals(2, queue.size()); assertEquals(EventStatus.WAITING, ((JobEvent)queue.poll()).getEventStatus()); assertEquals(EventStatus.FAILURE, ((JobEvent)queue.poll()).getEventStatus()); + queue.clear(); + } + + @Test + public void testInvalidXMLCoordinatorFailsForNoDuplicates() throws Exception { + Date startTime = DateUtils.parseDateOozieTZ("2009-02-01T23:59Z"); + Date endTime = DateUtils.parseDateOozieTZ("2009-02-02T23:59Z"); // test coordinator action events (failure from ActionStartX) ehs.getAppTypes().add("workflow_action"); - coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0); + CoordinatorJobBean coord = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, startTime, endTime, false, false, 0); CoordinatorActionBean action = addRecordToCoordActionTable(coord.getId(), 1, CoordinatorAction.Status.RUNNING, "coord-action-sla1.xml", 0); WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, @@ -549,14 +561,19 @@ public class TestEventGeneration extends XDataTestCase { String waId = _createWorkflowAction(wf.getId(), "wf-action"); new ActionStartXCommand(waId, action.getType()).call(); - final WorkflowJobGetJPAExecutor readCmd2 = new WorkflowJobGetJPAExecutor(jobId1); + final CoordJobGetJPAExecutor readCmd2 = new CoordJobGetJPAExecutor(coord.getId()); waitFor(1 * 100, new Predicate() { @Override public boolean evaluate() throws Exception { - return jpaService.execute(readCmd2).getStatus() == WorkflowJob.Status.KILLED; + return jpaService.execute(readCmd2).getStatus() == CoordinatorJob.Status.KILLED; } }); + assertEquals(3, queue.size()); + JobEvent coordActionEvent = (JobEvent) queue.poll(); + assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus()); + assertEquals(action.getId(), coordActionEvent.getId()); + assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType()); JobEvent wfActionEvent = (JobEvent) queue.poll(); assertEquals(EventStatus.FAILURE, wfActionEvent.getEventStatus()); assertEquals(waId, wfActionEvent.getId()); @@ -565,10 +582,7 @@ public class TestEventGeneration extends XDataTestCase { assertEquals(EventStatus.FAILURE, wfJobEvent.getEventStatus()); assertEquals(wf.getId(), wfJobEvent.getId()); assertEquals(AppType.WORKFLOW_JOB, wfJobEvent.getAppType()); - JobEvent coordActionEvent = (JobEvent) queue.poll(); - assertEquals(EventStatus.FAILURE, coordActionEvent.getEventStatus()); - assertEquals(action.getId(), coordActionEvent.getId()); - assertEquals(AppType.COORDINATOR_ACTION, coordActionEvent.getAppType()); + queue.clear(); } private class ActionCheckXCommandForTest extends ActionCheckXCommand { http://git-wip-us.apache.org/repos/asf/oozie/blob/b8c6b702/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f766a7c..9341014 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2429 TestEventGeneration test is flakey (fdenes via rkanter) OOZIE-2466 Repeated failure of TestMetricsInstrumentation.testSamplers (fdenes via rkanter) OOZIE-2470 Remove infinite socket timeouts in the Oozie email action (harsh) OOZIE-2246 CoordinatorInputCheckCommand does not behave properly when har file is one of data dependency and doesn't exist (satishsaley via puru)
