Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1495272&r1=1495271&r2=1495272&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Fri Jun 21 02:05:56 2013 @@ -24,11 +24,20 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.AppType; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.client.event.JobEvent.EventStatus; import org.apache.oozie.client.event.SLAEvent.SLAStatus; import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.executor.jpa.CoordActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; import org.apache.oozie.service.EventHandlerService; @@ -159,8 +168,208 @@ public class TestSLACalculatorMemory ext assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus()); } + + @Test + public void testWorkflowJobSLAStatusOnRestart() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + String jobId1 = slaRegBean1.getId(); + slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); + slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + SLACalcStatus calc1 = slaCalcMemory.get(jobId1); + calc1.setEventProcessed(1); + calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS); + calc1.setJobStatus(WorkflowJob.Status.RUNNING.name()); + calc1.setLastModifiedTime(new Date()); + SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1); + + List<JsonBean> list = new ArrayList<JsonBean>(); + list.add(slaSummaryBean); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1)); + + // Simulate a lost success event + WorkflowJobBean wjb = new WorkflowJobBean(); + wjb.setId(jobId1); + wjb.setStatus(WorkflowJob.Status.SUCCEEDED); + wjb.setStartTime(sdf.parse("2012-02-07")); + wjb.setEndTime(sdf.parse("2013-02-07")); + WorkflowJobInsertJPAExecutor wfInsertCmd = new WorkflowJobInsertJPAExecutor(wjb); + jpaService.execute(wfInsertCmd); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + // As job succeeded, it should not be in memory + assertEquals(0, slaCalcMemory.size()); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1)); + assertEquals("job-1", slaSummary.getJobId()); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(AppType.WORKFLOW_JOB, slaSummary.getAppType()); + assertEquals("SUCCEEDED", slaSummary.getJobStatus()); + assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus()); + assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart()); + assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd()); + assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(), + slaSummary.getActualDuration()); + + // Simulate a lost failed event + wjb.setStatus(WorkflowJob.Status.FAILED); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb)); + + // Reset the summary Bean + calc1.setEventProcessed(1); + calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS); + calc1.setJobStatus(WorkflowJob.Status.RUNNING.name()); + slaSummaryBean = new SLASummaryBean(calc1); + + list = new ArrayList<JsonBean>(); + list.add(slaSummaryBean); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + assertEquals(0, slaCalcMemory.size()); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1)); + assertEquals("FAILED", slaSummary.getJobStatus()); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart()); + assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd()); + assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus()); + + // Simulate a lost RUNNING event + wjb.setStatus(WorkflowJob.Status.RUNNING); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb)); + + // Reset the summary Bean + calc1.setEventProcessed(0); + calc1.setSLAStatus(SLAEvent.SLAStatus.NOT_STARTED); + calc1.setJobStatus(null); + slaSummaryBean = new SLASummaryBean(calc1); + + list = new ArrayList<JsonBean>(); + list.add(slaSummaryBean); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + assertEquals(1, slaCalcMemory.size()); + SLACalcStatus calc = slaCalcMemory.get(jobId1); + assertEquals(1, calc.getEventProcessed()); + assertEquals("RUNNING", calc.getJobStatus()); + assertEquals(sdf.parse("2012-02-07"), calc.getActualStart()); + assertNull(calc.getActualEnd()); + assertEquals(-1, calc.getActualDuration()); + assertEquals(SLAEvent.SLAStatus.IN_PROCESS, calc.getSLAStatus()); + } + + @Test + public void testWorkflowActionSLAStatusOnRestart() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_ACTION); + String jobId1 = slaRegBean1.getId(); + slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); + slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + SLACalcStatus calc1 = slaCalcMemory.get(jobId1); + calc1.setEventProcessed(1); + calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS); + calc1.setJobStatus(WorkflowAction.Status.RUNNING.name()); + calc1.setLastModifiedTime(new Date()); + SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1); + + List<JsonBean> list = new ArrayList<JsonBean>(); + list.add(slaSummaryBean); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + // Simulate a lost success event + WorkflowActionBean wab = new WorkflowActionBean(); + wab.setId(jobId1); + wab.setStatus(WorkflowAction.Status.OK); + wab.setStartTime(sdf.parse("2012-02-07")); + wab.setEndTime(sdf.parse("2013-02-07")); + WorkflowActionInsertJPAExecutor wfInsertCmd = new WorkflowActionInsertJPAExecutor(wab); + jpaService.execute(wfInsertCmd); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + // As job succeeded, it should not be in memory + assertEquals(0, slaCalcMemory.size()); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1)); + assertEquals("job-1", slaSummary.getJobId()); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(AppType.WORKFLOW_ACTION, slaSummary.getAppType()); + assertEquals("OK", slaSummary.getJobStatus()); + assertEquals(SLAEvent.SLAStatus.MET, slaSummary.getSLAStatus()); + assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart()); + assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd()); + assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(), + slaSummary.getActualDuration()); + + } + + @Test + public void testCoordinatorActionSLAStatusOnRestart() throws Exception { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.COORDINATOR_ACTION); + String jobId1 = slaRegBean1.getId(); + slaRegBean1.setExpectedEnd(sdf.parse("2013-03-07")); + slaRegBean1.setExpectedStart(sdf.parse("2012-03-07")); + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + SLACalcStatus calc1 = slaCalcMemory.get(jobId1); + calc1.setEventProcessed(1); + calc1.setSLAStatus(SLAEvent.SLAStatus.IN_PROCESS); + calc1.setJobStatus(WorkflowAction.Status.RUNNING.name()); + calc1.setLastModifiedTime(new Date()); + SLASummaryBean slaSummaryBean = new SLASummaryBean(calc1); + + List<JsonBean> list = new ArrayList<JsonBean>(); + list.add(slaSummaryBean); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + // Simulate a lost failed event + CoordinatorActionBean cab = new CoordinatorActionBean(); + cab.setId(jobId1); + cab.setStatus(CoordinatorAction.Status.FAILED); + cab.setLastModifiedTime(sdf.parse("2013-02-07")); + cab.setExternalId("wf_job"); + CoordActionInsertJPAExecutor caInsertCmd = new CoordActionInsertJPAExecutor(cab); + jpaService.execute(caInsertCmd); + WorkflowJobBean wjb = new WorkflowJobBean(); + wjb.setId("wf_job"); + wjb.setStartTime(sdf.parse("2012-02-07")); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wjb)); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + // As job succeeded, it should not be in memory + assertEquals(0, slaCalcMemory.size()); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId1)); + assertEquals("job-1", slaSummary.getJobId()); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(AppType.COORDINATOR_ACTION, slaSummary.getAppType()); + assertEquals("FAILED", slaSummary.getJobStatus()); + assertEquals(SLAEvent.SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(sdf.parse("2012-02-07"), slaSummary.getActualStart()); + assertEquals(sdf.parse("2013-02-07"), slaSummary.getActualEnd()); + assertEquals(sdf.parse("2013-02-07").getTime() - sdf.parse("2012-02-07").getTime(), + slaSummary.getActualDuration()); + } + @Test - public void testSLAEvents() throws Exception { + public void testSLAEvents1() throws Exception { SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); EventHandlerService ehs = Services.get().get(EventHandlerService.class); slaCalcMemory.init(new Configuration(false)); @@ -174,6 +383,7 @@ public class TestSLACalculatorMemory ext assertEquals(1, slaCalcMemory.size()); SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); + assertEquals("PREP", slaSummary.getJobStatus()); slaCalcMemory.updateSlaStatus(jobId); assertEquals(2, ehs.getEventQueue().size()); slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); @@ -187,6 +397,11 @@ public class TestSLACalculatorMemory ext assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, sdf.parse("2012-01-01"), null); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUSPENDED.toString(), EventStatus.SUSPEND, + sdf.parse("2012-01-01"), null); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals(WorkflowJob.Status.SUSPENDED.toString(), slaSummary.getJobStatus()); + assertEquals(5, slaSummary.getEventProcessed()); slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, sdf.parse("2012-01-01"), sdf.parse("2012-01-02")); @@ -205,6 +420,67 @@ public class TestSLACalculatorMemory ext } @Test + public void testSLAEvents2() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); + + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(jobId, slaRegBean); + assertEquals(1, slaCalcMemory.size()); + slaCalcMemory.updateSlaStatus(jobId); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // Duration bit should be processed as expected duration is not set + assertEquals(3, slaSummary.getEventProcessed()); + // check only start event in queue + assertEquals(1, ehs.getEventQueue().size()); + ehs.getEventQueue().clear(); + + // set back to 1, to make duration event not processed + slaSummary.setEventProcessed(1); + List<JsonBean> list = new ArrayList<JsonBean>(); + list.add(slaSummary); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + sdf.parse("2012-01-01"), sdf.parse("2012-01-02")); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // all should be processed + assertEquals(8, slaSummary.getEventProcessed()); + // check only end event is in queue + assertEquals(1, ehs.getEventQueue().size()); + ehs.getEventQueue().clear(); + + slaSummary.setEventProcessed(1); + list = new ArrayList<JsonBean>(); + list.add(slaSummary); + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + slaRegBean = _createSLARegistration("job-2", AppType.WORKFLOW_JOB); + slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 1 * 3600 * 1000)); + + jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(jobId, slaRegBean); + assertEquals(1, slaCalcMemory.size()); + + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.KILLED.toString(), EventStatus.FAILURE, null, + sdf.parse("2012-01-02")); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // Actual start null, so all events processed + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(1, ehs.getEventQueue().size()); + assertNull(slaSummary.getActualStart()); + assertEquals(sdf.parse("2012-01-02"), slaSummary.getActualEnd()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus()); + } + + @Test public void testDuplicateStartMiss() throws Exception { // test start-miss EventHandlerService ehs = Services.get().get(EventHandlerService.class);
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1495272&r1=1495271&r2=1495272&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Fri Jun 21 02:05:56 2013 @@ -216,7 +216,7 @@ public class TestSLAEventGeneration exte assertEquals(-1, slaSummary.getActualDuration()); assertNull(slaSummary.getActualStart()); assertNull(slaSummary.getActualEnd()); - assertNull(slaSummary.getJobStatus()); + assertEquals("PREP", slaSummary.getJobStatus()); assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); assertNull(slaEvent.getEventStatus()); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java?rev=1495272&r1=1495271&r2=1495272&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAService.java Fri Jun 21 02:05:56 2013 @@ -128,19 +128,20 @@ public class TestSLAService extends XDat assertTrue(output.toString().contains(sla2.getId() + " Sla START - MISS!!!")); assertTrue(output.toString().contains(sla2.getId() + " Sla END - MISS!!!")); output.setLength(0); - + // As expected duration is not set, duration shall be processed and job removed from map + assertEquals(2, slas.getSLACalculator().size()); // test same job multiple events (start-met, end-met) through job status event sla1 = _createSLARegistration("action-1", AppType.COORDINATOR_ACTION); sla1.setExpectedStart(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); //1 hour ahead sla1.setExpectedEnd(new Date(System.currentTimeMillis() + 2 * 3600 * 1000)); //2 hours ahead slas.addRegistrationEvent(sla1); - assertEquals(4, slas.getSLACalculator().size()); + assertEquals(3, slas.getSLACalculator().size()); slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), new Date()); slas.addStatusEvent(sla1.getId(), CoordinatorAction.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), new Date()); slas.runSLAWorker(); - assertEquals(3, ehs.getEventQueue().size()); + assertEquals(2, ehs.getEventQueue().size()); ehs.new EventWorker().run(); assertTrue(output.toString().contains(sla1.getId() + " Sla START - MET!!!")); assertTrue(output.toString().contains(sla1.getId() + " Sla END - MET!!!")); Modified: oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1495272&r1=1495271&r2=1495272&view=diff ============================================================================== --- oozie/trunk/release-log.txt (original) +++ oozie/trunk/release-log.txt Fri Jun 21 02:05:56 2013 @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1424 Improve SLA reliability on restart, fix bugs related to SLA and event generation (virag) OOZIE-1417 Exlude **/oozie/store/* **/oozie/examples/* from clover reports (dennisyv via virag) OOZIE-1426 Fix bugs in SLA UI (rohini) OOZIE-1421 UI for SLA (virag, rohini)
