Repository: oozie Updated Branches: refs/heads/master 5fbd3eb3f -> ba7a7b85e
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java index 432efef..559e2b3 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java @@ -108,11 +108,11 @@ public class TestSLACalculatorMemory extends XDataTestCase { public void testLoadOnRestart() throws Exception { SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); String jobId1 = slaRegBean1.getId(); - SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2-W", AppType.WORKFLOW_JOB); String jobId2 = slaRegBean2.getId(); - SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3-W", AppType.WORKFLOW_JOB); String jobId3 = slaRegBean3.getId(); List<String> idList = new ArrayList<String>(); idList.add(slaRegBean1.getId()); @@ -134,6 +134,12 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaRegBean1.setAlertEvents("MISS"); slaRegBean1.setJobData("jobData"); + Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back + Date endTime = new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000); // 1 hour back + + slaRegBean3.setExpectedStart(startTime); + slaRegBean3.setExpectedEnd(endTime); + slaCalcMemory.addRegistration(jobId1, slaRegBean1); slaCalcMemory.addRegistration(jobId2, slaRegBean2); slaCalcMemory.addRegistration(jobId3, slaRegBean3); @@ -142,9 +148,6 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLACalcStatus calc2 = slaCalcMemory.get(jobId2); SLACalcStatus calc3 = slaCalcMemory.get(jobId3); - calc1.setEventProcessed(5); - calc2.setEventProcessed(6); - calc3.setEventProcessed(7); calc1.setEventStatus(SLAEvent.EventStatus.END_MISS); calc1.setSLAStatus(SLAEvent.SLAStatus.MISS); @@ -154,11 +157,30 @@ public class TestSLACalculatorMemory extends XDataTestCase { calc1.setLastModifiedTime(lastModifiedTime); List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); - SLASummaryBean bean = new SLASummaryBean(calc1); - bean.setActualStart(sdf.parse("2011-03-09")); - bean.setActualEnd(sdf.parse("2011-03-10")); - bean.setActualDuration(456); - updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, bean)); + WorkflowJobBean wf1 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId1); + wf1.setId(jobId1); + wf1.setStatus(WorkflowJob.Status.SUCCEEDED); + wf1.setStartTime(sdf.parse("2011-03-09")); + wf1.setEndTime(sdf.parse("2011-03-10")); + wf1.setLastModifiedTime(new Date()); + + WorkflowJobBean wf2 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId2); + wf2.setId(jobId2); + wf2.setStatus(WorkflowJob.Status.RUNNING); + wf2.setStartTime(sdf.parse("2011-03-09")); + wf2.setEndTime(null); + wf2.setLastModifiedTime(new Date()); + + WorkflowJobBean wf3 = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3); + wf3.setId(jobId3); + wf3.setStatus(WorkflowJob.Status.RUNNING); + wf3.setStartTime(startTime); + wf3.setEndTime(null); + wf3.setLastModifiedTime(new Date()); + + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf1)); + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf2)); + updateList.add(new UpdateEntry<WorkflowJobQuery>(WorkflowJobQuery.UPDATE_WORKFLOW, wf3)); updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, new SLASummaryBean(calc2))); updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, @@ -169,10 +191,11 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - assertEquals(2, slaCalcMemory.size()); - SLACalcStatus calc = slaCalcMemory.get(jobId1); - assertEquals("job-1", calc.getId()); + SLACalcStatus calc = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get( + SLASummaryQuery.GET_SLA_SUMMARY, jobId1), SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ON_RESTART, jobId1)); + assertEquals("job-1-W", calc.getId()); assertEquals(AppType.WORKFLOW_JOB, calc.getAppType()); assertEquals("app-name", calc.getAppName()); assertEquals(123, calc.getExpectedDuration()); @@ -188,22 +211,35 @@ public class TestSLACalculatorMemory extends XDataTestCase { assertEquals("jobData", calc.getJobData()); assertEquals(sdf.parse("2011-03-09"), calc.getActualStart()); assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd()); - assertEquals(456, calc.getActualDuration()); assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus()); assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus()); assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus()); assertEquals(lastModifiedTime, calc1.getLastModifiedTime()); - assertEquals(5, calc.getEventProcessed()); - assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed()); - // jobId3 should be in history set as eventprocessed is 7 (111) - assertEquals(2, slaCalcMemory.size()); // 2 out of 3 jobs in map + calc2 = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get( + SLASummaryQuery.GET_SLA_SUMMARY, jobId2), SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ON_RESTART, jobId2)); + + + assertEquals(8, calc.getEventProcessed()); + assertEquals(7, calc2.getEventProcessed()); + // jobId2 should be in history set as eventprocessed is 7 (111) + //job3 will be in slamap + assertEquals(1, slaCalcMemory.size()); // 1 out of 3 jobs in map + WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW, jobId3); + wf.setId(jobId3); + wf.setStatus(WorkflowJob.Status.SUCCEEDED); + wf.setEndTime(endTime); + wf.setStartTime(startTime); + WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wf); + slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, - sdf.parse("2011-03-09"), sdf.parse("2011-04-09")); + startTime, endTime); + SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId3); assertEquals(8, slaSummary.getEventProcessed()); - assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart()); - assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd()); + assertEquals(startTime, slaSummary.getActualStart()); + assertEquals(endTime, slaSummary.getActualEnd()); assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus()); } @@ -213,7 +249,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); String jobId1 = slaRegBean1.getId(); slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); @@ -240,11 +276,8 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - - // As job succeeded, it should not be in memory - assertEquals(0, slaCalcMemory.size()); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); - assertEquals("job-1", slaSummary.getId()); + assertEquals("job-1-W", slaSummary.getId()); assertEquals(8, slaSummary.getEventProcessed()); assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType()); assertEquals("SUCCEEDED", slaSummary.getJobStatus()); @@ -270,7 +303,6 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - assertEquals(0, slaCalcMemory.size()); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); assertEquals("FAILED", slaSummary.getJobStatus()); assertEquals(8, slaSummary.getEventProcessed()); @@ -293,15 +325,14 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - - assertEquals(1, slaCalcMemory.size()); - SLACalcStatus calc = slaCalcMemory.get(jobId1); - assertEquals(1, calc.getEventProcessed()); - assertEquals("RUNNING", calc.getJobStatus()); - assertEquals(sdf.parse("2012-02-07"), calc.getActualStart()); - assertNull(calc.getActualEnd()); - assertEquals(-1, calc.getActualDuration()); - assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus()); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, slaSummaryBean.getId()); + //since job is already running and it's a old job + assertEquals(7, slaSummary.getEventProcessed()); + assertEquals("RUNNING", slaSummary.getJobStatus()); + assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart()); + assertNull(slaSummary.getActualEnd()); + assertEquals(-1, slaSummary.getActualDuration()); + assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus()); } @Test @@ -309,7 +340,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.WORKFLOW_ACTION); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-W@1", AppType.WORKFLOW_ACTION); String jobId1 = slaRegBean1.getId(); slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); @@ -334,11 +365,10 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - - // As job succeeded, it should not be in memory + slaCalcMemory.updateAllSlaStatus(); assertEquals(0, slaCalcMemory.size()); SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); - assertEquals("job@1", slaSummary.getId()); + assertEquals("job-W@1", slaSummary.getId()); assertEquals(8, slaSummary.getEventProcessed()); assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType()); assertEquals("OK", slaSummary.getJobStatus()); @@ -355,7 +385,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - SLARegistrationBean slaRegBean1 = _createSLARegistration("job@1", AppType.COORDINATOR_ACTION); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION); String jobId1 = slaRegBean1.getId(); slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); @@ -374,23 +404,23 @@ public class TestSLACalculatorMemory extends XDataTestCase { cab.setId(jobId1); cab.setStatus(CoordinatorAction.Status.FAILED); cab.setLastModifiedTime(sdf.parse("2013-02-07")); - cab.setExternalId("wf_job"); + cab.setExternalId("wf_job-W"); CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab); jpaService.execute(caInsertCmd); WorkflowJobBean wjb = new WorkflowJobBean(); - wjb.setId("wf_job"); + wjb.setId("wf_job-W"); wjb.setStartTime(sdf.parse("2012-02-07")); wjb.setLastModifiedTime(new Date()); WorkflowJobQueryExecutor.getInstance().insert(wjb); slaCalcMemory = new SLACalculatorMemory(); slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); - + slaCalcMemory.updateAllSlaStatus(); // As job succeeded, it should not be in memory assertEquals(0, slaCalcMemory.size()); SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); - assertEquals("job@1", slaSummary.getId()); + assertEquals("job-C@1", slaSummary.getId()); assertEquals(8, slaSummary.getEventProcessed()); assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType()); assertEquals("FAILED", slaSummary.getJobStatus()); @@ -402,6 +432,68 @@ public class TestSLACalculatorMemory extends XDataTestCase { } @Test + public void testEventMissOnRestart() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + CoordinatorActionBean coordAction = new CoordinatorActionBean(); + coordAction.setId("coordActionId-C@1"); + coordAction.setStatus(CoordinatorAction.Status.RUNNING); + coordAction.setLastModifiedTime(sdf.parse("2013-02-07")); + CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction); + jpaService.execute(caInsertCmd); + + CoordinatorActionBean coordAction2 = new CoordinatorActionBean(); + coordAction2.setId("coordActionId-C@2"); + coordAction2.setStatus(CoordinatorAction.Status.RUNNING); + coordAction2.setLastModifiedTime(sdf.parse("2013-02-07")); + caInsertCmd = new CoordActionInsertJPAExecutor(coordAction2); + jpaService.execute(caInsertCmd); + + SLARegistrationBean slaRegBean1 = _createSLARegistration("coordActionId-C@1", AppType.COORDINATOR_ACTION); + String jobId1 = slaRegBean1.getId(); + slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); + slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); + slaRegBean1.setExpectedDuration(100000); // long duration; + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + slaCalcMemory.updateAllSlaStatus(); + + SLARegistrationBean slaRegBean2 = _createSLARegistration("coordActionId-C@2", AppType.COORDINATOR_ACTION); + String jobId2 = slaRegBean2.getId(); + slaRegBean2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour + slaRegBean2.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); // 2 hour + slaRegBean2.setExpectedDuration(100000); // long duration; + slaCalcMemory.addRegistration(jobId2, slaRegBean2); + slaCalcMemory.updateAllSlaStatus(); + assertEquals(2, slaCalcMemory.size()); + + SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); + SLASummaryBean slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2); + + assertEquals("coordActionId-C@1", slaSummary.getId()); + assertEquals(5, slaSummary.getEventProcessed()); + assertEquals(-1, slaSummary.getActualDuration()); + + assertEquals("coordActionId-C@2", slaSummary2.getId()); + assertEquals(0, slaSummary2.getEventProcessed()); + assertEquals(-1, slaSummary2.getActualDuration()); + + coordAction.setStatusStr("FAILED"); + coordAction2.setStatusStr("FAILED"); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_FOR_START, coordAction2); + + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId1); + slaSummary2 = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId2); + + assertEquals("coordActionId-C@1", slaSummary.getId()); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals("coordActionId-C@2", slaSummary2.getId()); + assertEquals(8, slaSummary2.getEventProcessed()); + + } + @Test public void testSLAEvents1() throws Exception { SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); EventHandlerService ehs = Services.get().get(EventHandlerService.class); @@ -428,13 +520,24 @@ public class TestSLACalculatorMemory extends XDataTestCase { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); - slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, - sdf.parse("2012-01-01"), null); + + job1.setStatusStr(WorkflowJob.Status.SUSPENDED.toString()); + job1.setLastModifiedTime(new Date()); + WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND, sdf.parse("2012-01-01"), null); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus()); + assertEquals(5, slaSummary.getEventProcessed()); + job1.setStatusStr(WorkflowJob.Status.SUCCEEDED.toString()); + job1.setLastModifiedTime(new Date()); + job1.setStartTime(sdf.parse("2012-01-01")); + job1.setEndTime(sdf.parse("2012-01-02")); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, sdf.parse("2012-01-01"), sdf.parse("2012-01-02")); @@ -442,7 +545,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); // All events processed and actual times stored (1000) assertEquals(8, slaSummary.getEventProcessed()); - assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(SLAStatus.MET, slaSummary.getSLAStatus()); assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus()); assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus()); assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime()); @@ -479,6 +582,12 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + job1.setStatus(WorkflowJob.Status.SUCCEEDED); + job1.setStartTime(sdf.parse("2012-01-01")); + job1.setEndTime(sdf.parse("2012-01-02")); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, sdf.parse("2012-01-01"), sdf.parse("2012-01-02")); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); @@ -490,16 +599,21 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaSummary.setEventProcessed(1); SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummary); + WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP); - slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB); + slaRegBean = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB); slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); jobId = slaRegBean.getId(); slaCalcMemory.addRegistration(jobId, slaRegBean); assertEquals(1, slaCalcMemory.size()); + job2.setStatus(WorkflowJob.Status.KILLED); + job2.setEndTime(sdf.parse("2012-01-02")); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2); - slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null, + slaCalcMemory.addJobStatus(job2.getId(), WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null, sdf.parse("2012-01-02")); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); // Actual start null, so all events processed @@ -521,7 +635,7 @@ public class TestSLACalculatorMemory extends XDataTestCase { SLARegistrationBean slaRegBean = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB); Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back slaRegBean.setExpectedStart(startTime); - slaRegBean.setExpectedDuration(3600 * 1000); + slaRegBean.setExpectedDuration(2* 3600 * 1000); //to avoid duration miss slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 hour ahead String jobId = slaRegBean.getId(); slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); @@ -530,11 +644,15 @@ public class TestSLACalculatorMemory extends XDataTestCase { assertEquals(1, slaSummary.getEventProcessed()); assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); + job1.setStatus(WorkflowJob.Status.RUNNING); + job1.setStartTime(startTime); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date(System.currentTimeMillis()), null); - slaCalcMemory.updateJobSla(jobId); - slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); assertEquals(1, slaSummary.getEventProcessed()); assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus()); assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus()); @@ -565,13 +683,21 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); assertEquals(4, slaSummary.getEventProcessed()); assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + + job1.setId(job1.getId()); + job1.setStatus(WorkflowJob.Status.SUCCEEDED); + job1.setStartTime(new Date(System.currentTimeMillis())); + job1.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); // Only Duration sla should be processed as end is already processed // (110) - assertEquals(6, slaSummary.getEventProcessed()); + assertEquals(8, slaSummary.getEventProcessed()); assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); // Recieve start event assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, @@ -598,15 +724,32 @@ public class TestSLACalculatorMemory extends XDataTestCase { String jobId = slaRegBean.getId(); slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); slaCalcMemory.updateJobSla(jobId); + + job1.setId(job1.getId()); + job1.setStatus(WorkflowJob.Status.RUNNING); + job1.setStartTime(new Date(System.currentTimeMillis() - 3600 * 1000)); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date( System.currentTimeMillis() - 3600 * 1000), null); - slaCalcMemory.updateJobSla(jobId); SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); // The actual end times are not stored, but sla's processed so (111) assertEquals(7, slaSummary.getEventProcessed()); // Moved from map to history set assertEquals(0, slaCalcMemory.size()); // Add terminal state event so actual end time is stored + + job1.setId(job1.getId()); + job1.setStatus(WorkflowJob.Status.SUCCEEDED); + job1.setEndTime(new Date(System.currentTimeMillis() - 3600 * 1000)); + job1.setStartTime(new Date(System.currentTimeMillis())); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date( System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis())); slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); @@ -634,16 +777,21 @@ public class TestSLACalculatorMemory extends XDataTestCase { String jobId = slaRegBean.getId(); slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); slaCalcMemory.updateJobSla(jobId); + job1.setStatusStr("RUNNING"); + job1.setLastModifiedTime(new Date()); + job1.setStartTime(startTime); + WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date( System.currentTimeMillis() - 3600 * 1000), null); - slaCalcMemory.updateJobSla(jobId); SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); // The actual end times are not stored, but sla's processed so (111) assertEquals(7, slaSummary.getEventProcessed()); assertTrue(slaCalcMemory.isJobIdInHistorySet(job1.getId())); job1.setStatusStr("SUCCEEDED"); job1.setLastModifiedTime(new Date()); - WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_MODTIME, job1); + job1.setStartTime(startTime); + WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job1); slaCalcMemory.new HistoryPurgeWorker().run(); assertFalse(slaCalcMemory.isJobIdInHistorySet(job1.getId())); } @@ -742,5 +890,85 @@ public class TestSLACalculatorMemory extends XDataTestCase { slaCalculator.addRegistration(slaRegBean.getId(), slaRegBean); return action.getId(); } + @Test + public void testEventOutOfOrder() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + SLARegistrationBean slaRegBean = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB); + Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead + slaRegBean.setExpectedStart(startTime); + slaRegBean.setExpectedDuration(3600 * 1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 hour back + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + slaCalcMemory.updateJobSla(jobId); + SLASummaryBean slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); + slaRegBean = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId); + assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.RUNNING.toString()); + + wfJob.setStatus(WorkflowJob.Status.SUCCEEDED); + wfJob.setEndTime(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); + WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_END, wfJob); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); + assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString()); + + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.SUCCESS, + new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); + + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); + assertEquals(slaSummary.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString()); + } + + public void testWFEndNotCoord() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(Services.get().get(ConfigurationService.class).getConf()); + SLARegistrationBean slaRegBean = _createSLARegistration("job-C@1", AppType.COORDINATOR_ACTION); + String coordActionId = slaRegBean.getId(); + slaRegBean.setExpectedEnd(sdf.parse("2013-03-07")); + slaRegBean.setExpectedStart(sdf.parse("2012-03-07")); + slaCalcMemory.addRegistration(coordActionId, slaRegBean); + SLACalcStatus calc1 = slaCalcMemory.get(coordActionId); + calc1.setEventProcessed(1); + calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS); + calc1.setJobStatus(WorkflowAction.Status.RUNNING.name()); + calc1.setLastModifiedTime(new Date()); + SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1); + + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, slaSummaryBean); + + // Simulate a lost failed event + CoordinatorActionBean coordAction = new CoordinatorActionBean(); + coordAction.setId(coordActionId); + coordAction.setStatus(CoordinatorAction.Status.RUNNING); + coordAction.setLastModifiedTime(sdf.parse("2013-02-07")); + coordAction.setExternalId("wf_job-W"); + CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(coordAction); + jpaService.execute(caInsertCmd); + WorkflowJobBean wjb = new WorkflowJobBean(); + wjb.setId("wf_job-W"); + wjb.setStartTime(sdf.parse("2012-02-07")); + wjb.setLastModifiedTime(new Date()); + wjb.setStatus(WorkflowJob.Status.SUCCEEDED); + WorkflowJobQueryExecutor.getInstance().insert(wjb); + + calc1 = slaCalcMemory.get(coordActionId); + slaCalcMemory.updateJobSla(coordActionId); + slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId); + //cord action is running and wf job is completed + assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.RUNNING.name()); + + coordAction.setStatus(CoordinatorAction.Status.SUCCEEDED); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction); + + slaCalcMemory.addJobStatus(coordActionId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + sdf.parse("2012-02-07"), sdf.parse("2012-03-07")); + + slaSummaryBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, coordActionId); + assertEquals(slaSummaryBean.getJobStatus(), WorkflowInstance.Status.SUCCEEDED.toString()); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java index 7a710c2..06f54f2 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java @@ -513,6 +513,7 @@ public class TestSLAEventGeneration extends XDataTestCase { wf.setId(action.getExternalId()); wf.setStatus(WorkflowJob.Status.KILLED); wf.setParentId(action.getId()); + wf.setEndTime(new Date()); jpa.execute(new WorkflowJobInsertJPAExecutor(wf)); new CoordActionUpdateXCommand(wf).call(); assertEquals(1, ehs.getEventQueue().size()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java index ebb12f7..7d40e31 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java @@ -18,22 +18,32 @@ package org.apache.oozie.sla; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.AppType; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.event.SLAEvent.EventStatus; +import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.event.CoordinatorActionEvent; -import org.apache.oozie.event.CoordinatorJobEvent; import org.apache.oozie.event.WorkflowActionEvent; import org.apache.oozie.event.WorkflowJobEvent; import org.apache.oozie.event.listener.JobEventListener; +import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.Services; import org.apache.oozie.sla.listener.SLAJobEventListener; @@ -81,61 +91,79 @@ public class TestSLAJobEventListener extends XTestCase { SLAJobEventListener listener = new SLAJobEventListener(); listener.init(services.getConf()); // add dummy registration events to the SLAService map - SLARegistrationBean job = _createSLARegBean("wf1", AppType.WORKFLOW_JOB); + SLARegistrationBean job = _createSLARegBean("wf1-W", AppType.WORKFLOW_JOB); job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z")); + job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-23T00:00Z")); slas.addRegistrationEvent(job); assertEquals(1, slas.getSLACalculator().size()); Date actualStart = DateUtils.parseDateUTC("2012-07-22T01:00Z"); - WorkflowJobEvent wfe = new WorkflowJobEvent("wf1", "caId1", WorkflowJob.Status.RUNNING, "user1", + + createWorkflow("wf1-W", actualStart); + WorkflowJobEvent wfe = new WorkflowJobEvent("wf1-W", "caId1", WorkflowJob.Status.RUNNING, "user1", "wf-app-name1", actualStart, null); listener.onWorkflowJobEvent(wfe); - SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1"); + SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1-W"); + + // job will be checked against DB.. since it's old job. all event will get evaluted and job will move to history set. // check that start sla has been calculated - assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus()); - assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially - //sla processed. so state = 1 + assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus()); + assertEquals(7, serviceObj.getEventProcessed()); //Job switching to running is only partially + assertEquals(0, slas.getSLACalculator().size()); + + + createWorkflowAction("wfId1-W@wa1", "wf1-W"); + job = _createSLARegBean("wfId1-W@wa1", AppType.WORKFLOW_ACTION); + job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z")); - job = _createSLARegBean("wfId1@wa1", AppType.WORKFLOW_ACTION); slas.addRegistrationEvent(job); - assertEquals(2, slas.getSLACalculator().size()); + assertEquals(1, slas.getSLACalculator().size()); job.setExpectedStart(DateUtils.parseDateUTC("2012-07-22T00:00Z")); - WorkflowActionEvent wae = new WorkflowActionEvent("wfId1@wa1", "wfId1", WorkflowAction.Status.RUNNING, "user1", + WorkflowActionEvent wae = new WorkflowActionEvent("wfId1-W@wa1", "wf1-W", WorkflowAction.Status.RUNNING, "user1", "wf-app-name1", actualStart, null); listener.onWorkflowActionEvent(wae); - serviceObj = slas.getSLACalculator().get("wfId1@wa1"); + serviceObj = slas.getSLACalculator().get("wfId1-W@wa1"); // check that start sla has been calculated - assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus()); + assertEquals(EventStatus.END_MISS, serviceObj.getEventStatus()); + createCoord("cj1-C"); - job = _createSLARegBean("cj1", AppType.COORDINATOR_JOB); + CoordinatorActionBean coordAction= createCoordAction("cj1-C@ca1", "cj1-C"); + job = _createSLARegBean("cj1-C@ca1", AppType.COORDINATOR_ACTION); job.setExpectedEnd(DateUtils.parseDateUTC("2012-07-22T01:00Z")); + Date actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z"); slas.addRegistrationEvent(job); - assertEquals(3, slas.getSLACalculator().size()); - Date actualEnd = DateUtils.parseDateUTC("2012-07-22T00:00Z"); - CoordinatorJobEvent cje = new CoordinatorJobEvent("cj1", "bj1", CoordinatorJob.Status.SUCCEEDED, "user1", - "coord-app-name1", actualStart, actualEnd); - listener.onCoordinatorJobEvent(cje); - - SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1"); - // check that end and duration sla has been calculated - assertEquals(6, summary.getEventProcessed()); - - assertEquals(EventStatus.END_MET, summary.getEventStatus()); - - job = _createSLARegBean("cj1@ca1", AppType.COORDINATOR_ACTION); - actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z"); - slas.addRegistrationEvent(job); - assertEquals(4, slas.getSLACalculator().size()); - CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1", + assertEquals(1, slas.getSLACalculator().size()); + CoordinatorActionEvent cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.RUNNING, "user1", "coord-app-name1", null, actualEnd, null); listener.onCoordinatorActionEvent(cae); - cae = new CoordinatorActionEvent("cj1@ca1", "cj1", CoordinatorAction.Status.KILLED, "user1", + coordAction.setStatus(CoordinatorAction.Status.KILLED); + coordAction.setLastModifiedTime(new Date()); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, coordAction); + + cae = new CoordinatorActionEvent("cj1-C@ca1", "cj1-C", CoordinatorAction.Status.KILLED, "user1", "coord-app-name1", null, actualEnd, null); listener.onCoordinatorActionEvent(cae); - summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1@ca1"); + SLASummaryBean summary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, "cj1-C@ca1"); // check that all events are processed assertEquals(8, summary.getEventProcessed()); assertEquals(EventStatus.END_MISS, summary.getEventStatus()); - assertEquals(3, slas.getSLACalculator().size()); + //all jobs are processed + assertEquals(0, slas.getSLACalculator().size()); + + job = _createSLARegBean("wf2-W", AppType.WORKFLOW_JOB); + job.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hour before + job.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hours after + slas.addRegistrationEvent(job); + assertEquals(1, slas.getSLACalculator().size()); + + createWorkflow("wf2-W", new Date()); + wfe = new WorkflowJobEvent("wf2-W", "caId2", WorkflowJob.Status.RUNNING, "user1", + "wf-app-name1", null, null); + listener.onWorkflowJobEvent(wfe); + serviceObj = slas.getSLACalculator().get("wf2-W"); + + assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus()); + assertEquals(3, serviceObj.getEventProcessed()); //Only duration and start are processed. Duration = -1 + assertEquals(1, slas.getSLACalculator().size()); } @@ -145,4 +173,44 @@ public class TestSLAJobEventListener extends XTestCase { reg.setAppType(appType); return reg; } + private WorkflowJobBean createWorkflow(String id, Date actualStart) throws Exception { + List<JsonBean> insertList = new ArrayList<JsonBean>(); + WorkflowJobBean workflow = new WorkflowJobBean(); + workflow.setId(id); + workflow.setStatusStr("PREP"); + workflow.setStartTime(actualStart); + workflow.setSlaXml("<sla></sla>"); + insertList.add(workflow); + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return workflow; + } + + private WorkflowActionBean createWorkflowAction(String id, String parentId) throws Exception { + List<JsonBean> insertList = new ArrayList<JsonBean>(); + WorkflowActionBean action = new WorkflowActionBean(); + action.setId(id); + action.setJobId(parentId); + insertList.add(action); + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return action; + } + + private CoordinatorActionBean createCoordAction(String id, String parentId) throws Exception { + List<JsonBean> insertList = new ArrayList<JsonBean>(); + CoordinatorActionBean action = new CoordinatorActionBean(); + action.setId(id); + action.setJobId(parentId); + insertList.add(action); + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return action; + } + + private CoordinatorJobBean createCoord(String id) throws Exception { + List<JsonBean> insertList = new ArrayList<JsonBean>(); + CoordinatorJobBean job = new CoordinatorJobBean(); + job.setId(id); + insertList.add(job); + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return job; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/sla/TestSLAService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java index c3bc110..1e19923 100644 --- a/core/src/test/java/org/apache/oozie/sla/TestSLAService.java +++ b/core/src/test/java/org/apache/oozie/sla/TestSLAService.java @@ -25,16 +25,17 @@ import org.apache.oozie.AppType; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.event.JobEvent.EventStatus; import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.client.event.SLAEvent.SLAStatus; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; @@ -100,7 +101,9 @@ public class TestSLAService extends XDataTestCase { EventHandlerService ehs = Services.get().get(EventHandlerService.class); // test start-miss - SLARegistrationBean sla1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + + SLARegistrationBean sla1 = _createSLARegistration(wfJob.getId(), AppType.WORKFLOW_JOB); sla1.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); //1 hour back sla1.setExpectedDuration(10 * 60 * 1000); //10 mins @@ -113,22 +116,48 @@ public class TestSLAService extends XDataTestCase { output.setLength(0); // test different jobs and events start-met and end-miss - sla1 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB); + WorkflowJobBean wfJob2 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + + sla1 = _createSLARegistration(wfJob2.getId(), AppType.WORKFLOW_JOB); sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead slas.addRegistrationEvent(sla1); + + wfJob2.setStatusStr("RUNNING"); + wfJob2.setLastModifiedTime(new Date()); + wfJob2.setStartTime(new Date()); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2); + slas.addStatusEvent(sla1.getId(), WorkflowJob.Status.RUNNING.name(), EventStatus.STARTED, new Date(), new Date()); - SLARegistrationBean sla2 = _createSLARegistration("job-3", AppType.COORDINATOR_JOB); + CoordinatorActionBean action = addRecordToCoordActionTable("coord_id-C", 1, + CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0); + + SLARegistrationBean sla2 = _createSLARegistration(action.getId(), AppType.COORDINATOR_ACTION); sla2.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead only for testing sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back sla2.setExpectedDuration(10); //to process duration too slas.addRegistrationEvent(sla2); assertEquals(3, slas.getSLACalculator().size()); + Date startTime = new Date(); - slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.RUNNING.name(), EventStatus.STARTED, startTime, - null); - slas.addStatusEvent(sla2.getId(), CoordinatorJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime, + WorkflowJobBean wfJob3 = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + wfJob3.setStatusStr("SUCCEEDED"); + wfJob3.setLastModifiedTime(new Date()); + wfJob3.setStartTime(startTime); + wfJob3.setEndTime(startTime); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob3); + + action.setCreatedTime(startTime); + action.setStatus(CoordinatorAction.Status.SUCCEEDED); + action.setLastModifiedTime(new Date()); + action.setExternalId(wfJob3.getId()); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action); + slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), + new Date()); + slas.addStatusEvent(sla2.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, startTime, new Date()); slas.runSLAWorker(); ehs.new EventWorker().run(); @@ -139,6 +168,11 @@ public class TestSLAService extends XDataTestCase { // test same job multiple events (start-miss, end-miss) through regular check WorkflowJobBean job4 = addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED); + job4.setLastModifiedTime(new Date()); + job4.setEndTime(new Date()); + job4.setStartTime(new Date()); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job4); sla2 = _createSLARegistration(job4.getId(), AppType.WORKFLOW_JOB); sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); //2 hours back sla2.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); //1 hour back @@ -151,14 +185,13 @@ public class TestSLAService extends XDataTestCase { output.setLength(0); // As expected duration is not set, duration shall be processed and job removed from map assertEquals(2, slas.getSLACalculator().size()); + // test same job multiple events (start-met, end-met) through job status event - sla1 = _createSLARegistration("action@1", AppType.COORDINATOR_ACTION); + sla1 = _createCoordActionSLARegistration(CoordinatorAction.Status.SUCCEEDED.name()); sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead slas.addRegistrationEvent(sla1); assertEquals(3, slas.getSLACalculator().size()); - slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), - new Date()); slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), new Date()); slas.runSLAWorker(); @@ -174,39 +207,40 @@ public class TestSLAService extends XDataTestCase { EventHandlerService ehs = Services.get().get(EventHandlerService.class); JPAService jpaService = Services.get().get(JPAService.class); + Date date = new Date(); // CASE 1: positive test WF job WorkflowJobBean job1 = addRecordToWfJobTable(WorkflowJob.Status.PREP, WorkflowInstance.Status.PREP); SLARegistrationBean sla = _createSLARegistration(job1.getId(), AppType.WORKFLOW_JOB); - sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); // half hour back + sla.setExpectedEnd(new Date(date.getTime() - 1 * 1800 * 1000)); // half hour back slas.addRegistrationEvent(sla); // CASE 2: negative test WF job WorkflowJobBean job2 = addRecordToWfJobTable(WorkflowJob.Status.SUCCEEDED, WorkflowInstance.Status.SUCCEEDED); - job2.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); - job2.setStartTime(new Date(System.currentTimeMillis() - 1 * 2000 * 1000)); + job2.setEndTime(new Date(date.getTime() - 1 * 1800 * 1000)); + job2.setStartTime(new Date(date.getTime() - 1 * 2000 * 1000)); job2.setLastModifiedTime(new Date()); WorkflowJobQueryExecutor.getInstance().executeUpdate(WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, job2); sla = _createSLARegistration(job2.getId(), AppType.WORKFLOW_JOB); - sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1500 * 1000)); // in past but > actual end + sla.setExpectedEnd(new Date(date.getTime() - 1 * 1500 * 1000)); // in past but > actual end sla.setExpectedDuration(100); //unreasonable to cause MISS slas.addRegistrationEvent(sla); + slas.runSLAWorker(); + // CASE 3: positive test Coord action - CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-1", 1, + CoordinatorActionBean action1 = addRecordToCoordActionTable("coord-action-C@1", 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); - WorkflowJobBean extWf = new WorkflowJobBean(); - extWf.setId(action1.getExternalId()); - extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); - extWf.setStartTime(new Date(System.currentTimeMillis() - 1 * 2100 * 1000)); - jpaService.execute(new WorkflowJobInsertJPAExecutor(extWf)); + action1.setExternalId(null); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1); + sla = _createSLARegistration(action1.getId(), AppType.COORDINATOR_ACTION); sla.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 2000 * 1000)); // past slas.addRegistrationEvent(sla); // CASE 4: positive test coord action - CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-2", 1, + CoordinatorActionBean action2 = addRecordToCoordActionTable("coord-action-C@2", 1, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0); - extWf = new WorkflowJobBean(); + WorkflowJobBean extWf = new WorkflowJobBean(); extWf.setId(action2.getExternalId()); // actual end before expected. but action is failed extWf.setEndTime(new Date(System.currentTimeMillis() - 1 * 1800 * 1000)); @@ -217,7 +251,7 @@ public class TestSLAService extends XDataTestCase { slas.addRegistrationEvent(sla); // CASE 5: negative test coord action - CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-3", 1, + CoordinatorActionBean action3 = addRecordToCoordActionTable("coord-action-C@3", 1, CoordinatorAction.Status.SUCCEEDED, "coord-action-get.xml", 0); extWf = new WorkflowJobBean(); extWf.setId(action3.getExternalId()); @@ -271,16 +305,25 @@ public class TestSLAService extends XDataTestCase { assertNull(slas.getSLACalculator().get(action2.getId())); //removed from memory slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId()); - extWf = jpaService.execute(new WorkflowJobGetJPAExecutor(action1.getExternalId())); - assertEquals(extWf.getStartTime(), slaSummary.getActualStart()); - assertEquals(extWf.getEndTime(), slaSummary.getActualEnd()); - assertEquals(extWf.getEndTime().getTime() - extWf.getStartTime().getTime(), slaSummary.getActualDuration()); + assertNull(slaSummary.getActualStart()); + assertNull(slaSummary.getActualEnd()); assertEquals(action1.getStatusStr(), slaSummary.getJobStatus()); assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus()); assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); - assertEquals(8, slaSummary.getEventProcessed()); - assertNull(slas.getSLACalculator().get(action1.getId())); //removed from memory + assertEquals(7, slaSummary.getEventProcessed()); + assertNotNull(slas.getSLACalculator().get(action1.getId())); + //From waiting to TIMEOUT with wf jobid + action1.setStatus(CoordinatorAction.Status.TIMEDOUT); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action1); + slas.getSLACalculator().addJobStatus(action1.getId(), null, null, null, null); + slaSummary = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, action1.getId()); + assertNull(slaSummary.getActualStart()); + assertNotNull(slaSummary.getActualEnd()); + assertEquals("TIMEDOUT", slaSummary.getJobStatus()); + assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(8, slaSummary.getEventProcessed()); } /** @@ -307,6 +350,28 @@ public class TestSLAService extends XDataTestCase { return bean; } + public SLARegistrationBean _createCoordActionSLARegistration(String status) throws Exception { + WorkflowJobBean wfJob = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING); + wfJob.setLastModifiedTime(new Date()); + wfJob.setStartTime(new Date()); + wfJob.setEndTime(new Date()); + wfJob.setStatusStr(status); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob); + + CoordinatorActionBean action = addRecordToCoordActionTable(new Date().getTime() + "-C", 1, + CoordinatorAction.Status.TIMEDOUT, "coord-action-get.xml", 0); + action.setExternalId(wfJob.getId()); + action.setStatusStr(status); + CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_RERUN, action); + + SLARegistrationBean bean = new SLARegistrationBean(); + bean.setId(action.getId()); + bean.setAppType(AppType.COORDINATOR_ACTION); + return bean; + } + + public static void assertEventNoDuplicates(String outputStr, String eventMsg) { int index = outputStr.indexOf(eventMsg); assertTrue(index != -1); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 412128c..8ea3c4a 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2509 SLA job status can stuck in running state (puru) OOZIE-2529 Support adding secret keys to Credentials of Launcher (satishsaley via rohini) OOZIE-1402 Increase retry interval for non-progressing coordinator action with fix value (satishsaley via puru) OOZIE-2512 ShareLibservice returns incorrect path for jar (satishsaley via puru)
