Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java?rev=1491333&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLACalculatorMemory.java Mon Jun 10 04:37:20 2013 @@ -0,0 +1,316 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.sla; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.AppType; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.client.event.SLAEvent; +import org.apache.oozie.client.event.JobEvent.EventStatus; +import org.apache.oozie.client.event.SLAEvent.SLAStatus; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; +import org.apache.oozie.service.EventHandlerService; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestSLACalculatorMemory extends XDataTestCase { + + private JPAService jpaService; + + @Override + @Before + protected void setUp() throws Exception { + super.setUp(); + Services services = new Services(); + Configuration conf = services.getConf(); + conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," + + "org.apache.oozie.sla.service.SLAService"); + services.init(); + jpaService = Services.get().get(JPAService.class); + cleanUpDBTables(); + } + + @Override + @After + protected void tearDown() throws Exception { + Services.get().destroy(); + super.tearDown(); + } + + @Test + public void testLoadOnRestart() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean1 = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + String jobId1 = slaRegBean1.getId(); + SLARegistrationBean slaRegBean2 = _createSLARegistration("job-2", AppType.WORKFLOW_JOB); + String jobId2 = slaRegBean2.getId(); + SLARegistrationBean slaRegBean3 = _createSLARegistration("job-3", AppType.WORKFLOW_JOB); + String jobId3 = slaRegBean3.getId(); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-mm-dd"); + slaRegBean1.setAppName("app-name"); + slaRegBean1.setExpectedDuration(123); + slaRegBean1.setExpectedEnd(sdf.parse("2012-02-07")); + slaRegBean1.setExpectedStart(sdf.parse("2011-02-07")); + slaRegBean1.setNominalTime(sdf.parse("2012-01-06")); + slaRegBean1.setUser("user"); + slaRegBean1.setParentId("parentId"); + slaRegBean1.setUpstreamApps("upstreamApps"); + slaRegBean1.setNotificationMsg("notificationMsg"); + slaRegBean1.setAlertContact("[email protected]"); + slaRegBean1.setAlertEvents("MISS"); + slaRegBean1.setJobData("jobData"); + + slaCalcMemory.addRegistration(jobId1, slaRegBean1); + slaCalcMemory.addRegistration(jobId2, slaRegBean2); + slaCalcMemory.addRegistration(jobId3, slaRegBean3); + + SLACalcStatus calc1 = slaCalcMemory.get(jobId1); + SLACalcStatus calc2 = slaCalcMemory.get(jobId2); + SLACalcStatus calc3 = slaCalcMemory.get(jobId3); + + calc1.setEventProcessed(5); + calc2.setEventProcessed(6); + calc3.setEventProcessed(7); + + calc1.setEventStatus(SLAEvent.EventStatus.END_MISS); + calc1.setSLAStatus(SLAEvent.SLAStatus.MISS); + calc1.setJobStatus(WorkflowJob.Status.FAILED.toString()); + // set last modified time 5 days back + Date lastModifiedTime = new Date(System.currentTimeMillis() - 5*24*60*60*1000); + calc1.setLastModifiedTime(lastModifiedTime); + + List<JsonBean> list = new ArrayList<JsonBean>(); + SLASummaryBean bean = new SLASummaryBean(calc1); + bean.setActualStart(sdf.parse("2011-03-09")); + bean.setActualEnd(sdf.parse("2011-03-10")); + bean.setActualDuration(456); + list.add(bean); + list.add(new SLASummaryBean(calc2)); + list.add(new SLASummaryBean(calc3)); + + jpaService.execute(new SLACalculationInsertUpdateJPAExecutor(null, list)); + + slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + + assertEquals(2, slaCalcMemory.size()); + + SLACalcStatus calc = slaCalcMemory.get(jobId1); + assertEquals("job-1", calc.getId()); + assertEquals(AppType.WORKFLOW_JOB, calc.getAppType()); + assertEquals("app-name", calc.getAppName()); + assertEquals(123, calc.getExpectedDuration()); + assertEquals(sdf.parse("2012-02-07"), calc.getExpectedEnd()); + assertEquals(sdf.parse("2011-02-07"), calc.getExpectedStart()); + assertEquals(sdf.parse("2012-01-06"), calc.getNominalTime()); + assertEquals("user", calc.getUser()); + assertEquals("parentId", calc.getParentId()); + assertEquals("upstreamApps", calc.getUpstreamApps()); + assertEquals("notificationMsg", calc.getNotificationMsg()); + assertEquals("[email protected]", calc.getAlertContact()); + assertEquals("MISS", calc.getAlertEvents()); + assertEquals("jobData", calc.getJobData()); + assertEquals(sdf.parse("2011-03-09"), calc.getActualStart()); + assertEquals(sdf.parse("2011-03-10"), calc.getActualEnd()); + assertEquals(456, calc.getActualDuration()); + assertEquals(SLAEvent.EventStatus.END_MISS, calc1.getEventStatus()); + assertEquals(SLAEvent.SLAStatus.MISS, calc1.getSLAStatus()); + assertEquals(WorkflowJob.Status.FAILED.toString(), calc1.getJobStatus()); + assertEquals(lastModifiedTime, calc1.getLastModifiedTime()); + + assertEquals(5, calc.getEventProcessed()); + assertEquals(6, slaCalcMemory.get(jobId2).getEventProcessed()); + // jobId3 should be in history set as eventprocessed is 7 (111) + assertNull(slaCalcMemory.get(jobId3)); + slaCalcMemory.addJobStatus(jobId3, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + sdf.parse("2011-03-09"), sdf.parse("2011-04-09")); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId3)); + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(sdf.parse("2011-03-09"), slaSummary.getActualStart()); + assertEquals(sdf.parse("2011-04-09"), slaSummary.getActualEnd()); + assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus()); + } + + @Test + public void testSLAEvents() throws Exception { + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + slaRegBean.setExpectedStart(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 + slaRegBean.setExpectedDuration(2 * 3600 * 1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 + // hour + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(jobId, slaRegBean); + assertEquals(1, slaCalcMemory.size()); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); + slaCalcMemory.updateSlaStatus(jobId); + assertEquals(2, ehs.getEventQueue().size()); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // both start miss and end miss (101) + assertEquals(5, slaSummary.getEventProcessed()); + assertEquals(SLAEvent.EventStatus.END_MISS, slaSummary.getEventStatus()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, + sdf.parse("2012-01-01"), null); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + sdf.parse("2012-01-01"), sdf.parse("2012-01-02")); + + assertEquals(3, ehs.getEventQueue().size()); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // All events processed and actual times stored (1000) + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(WorkflowJob.Status.SUCCEEDED.toString(), slaSummary.getJobStatus()); + assertEquals(SLAEvent.EventStatus.DURATION_MISS, slaSummary.getEventStatus()); + assertEquals(sdf.parse("2012-01-01").getTime(), slaSummary.getActualStart().getTime()); + assertEquals(sdf.parse("2012-01-02").getTime(), slaSummary.getActualEnd().getTime()); + assertEquals(sdf.parse("2012-01-02").getTime() - sdf.parse("2012-01-01").getTime(), + slaSummary.getActualDuration()); + assertEquals(0, slaCalcMemory.size()); + } + + @Test + public void testDuplicateStartMiss() throws Exception { + // test start-miss + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 + // hour + // back + slaRegBean.setExpectedStart(startTime); + slaRegBean.setExpectedDuration(3600 * 1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); // 1 + // hour + // ahead + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + slaCalcMemory.updateSlaStatus(jobId); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals(1, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, + new Date(System.currentTimeMillis()), null); + slaCalcMemory.updateSlaStatus(jobId); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals(1, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.IN_PROCESS, slaSummary.getSLAStatus()); + assertEquals(WorkflowJob.Status.RUNNING.toString(), slaSummary.getJobStatus()); + assertEquals(1, ehs.getEventQueue().size()); + } + + @Test + public void testDuplicateEndMiss() throws Exception { + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + Date startTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); // 1 hour ahead + slaRegBean.setExpectedStart(startTime); + slaRegBean.setExpectedDuration(3600 * 1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); // 1 + // hour + // back + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + slaCalcMemory.updateSlaStatus(jobId); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // Only end sla should be processed (100) + assertEquals(4, slaSummary.getEventProcessed()); + slaCalcMemory.updateSlaStatus(jobId); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals(4, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, + new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000)); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // Only Duration sla should be processed as end is already processed + // (110) + assertEquals(6, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + // Recieve start event + assertTrue(slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, + new Date(System.currentTimeMillis()), new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000))); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // Start event received so all bits should be processed (111) + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(SLAStatus.MISS, slaSummary.getSLAStatus()); + assertEquals(0, slaCalcMemory.size()); + assertEquals(3, ehs.getEventQueue().size()); + + } + + public void testSLAHistorySet() throws Exception { + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + SLACalculatorMemory slaCalcMemory = new SLACalculatorMemory(); + slaCalcMemory.init(new Configuration(false)); + SLARegistrationBean slaRegBean = _createSLARegistration("job-1", AppType.WORKFLOW_JOB); + Date startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); + slaRegBean.setExpectedStart(startTime); // 1 hour back + slaRegBean.setExpectedDuration(1000); + slaRegBean.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000)); + String jobId = slaRegBean.getId(); + slaCalcMemory.addRegistration(slaRegBean.getId(), slaRegBean); + slaCalcMemory.updateSlaStatus(jobId); + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.RUNNING.toString(), EventStatus.STARTED, new Date( + System.currentTimeMillis() - 3600 * 1000), null); + slaCalcMemory.updateSlaStatus(jobId); + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // The actual end times are not stored, but sla's processed so (111) + assertEquals(7, slaSummary.getEventProcessed()); + // Moved from map to history set + assertEquals(0, slaCalcMemory.size()); + // Add terminal state event so actual end time is stored + slaCalcMemory.addJobStatus(jobId, WorkflowJob.Status.SUCCEEDED.toString(), EventStatus.SUCCESS, new Date( + System.currentTimeMillis() - 3600 * 1000), new Date(System.currentTimeMillis())); + slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + // The actual times are stored, so event processed(1000) + assertEquals(8, slaSummary.getEventProcessed()); + assertEquals(3, ehs.getEventQueue().size()); + + } + + private SLARegistrationBean _createSLARegistration(String jobId, AppType appType) { + SLARegistrationBean bean = new SLARegistrationBean(); + bean.setJobId(jobId); + bean.setAppType(appType); + return bean; + } + +}
Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java?rev=1491333&r1=1491332&r2=1491333&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAEventGeneration.java Mon Jun 10 04:37:20 2013 @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.fs.Path; import org.apache.oozie.AppType; import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; @@ -36,11 +37,15 @@ import org.apache.oozie.command.coord.Co import org.apache.oozie.command.coord.CoordKillXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.wf.KillXCommand; +import org.apache.oozie.command.wf.ReRunXCommand; import org.apache.oozie.command.wf.StartXCommand; import org.apache.oozie.command.wf.SubmitXCommand; import org.apache.oozie.event.listener.JobEventListener; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; @@ -121,6 +126,162 @@ public class TestSLAEventGeneration exte _testWorkflowJobCommands(conf, ehs, slas, true); } + /** + * Test for SLA Events generated through wf rerun + * + * @throws Exception + */ + @Test + public void testWorkflowJobSLARerun() throws Exception { + EventHandlerService ehs = services.get(EventHandlerService.class); + SLAService slas = services.get(SLAService.class); + + String wfXml = IOUtils.getResourceAsString("wf-job-sla.xml", -1); + Path appPath = getFsTestCaseDir(); + writeToFile(wfXml, appPath, "workflow.xml"); + Configuration conf = new XConfiguration(); + conf.set(OozieClient.APP_PATH, appPath.toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + + cal.setTime(new Date()); + cal.add(Calendar.MINUTE, -40); // for start_miss + Date nominal = cal.getTime(); + String nominalTime = DateUtils.formatDateOozieTZ(nominal); + conf.set("nominal_time", nominalTime); + cal.setTime(nominal); + cal.add(Calendar.MINUTE, 10); // as per the sla xml + String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime()); + cal.setTime(nominal); + cal.add(Calendar.MINUTE, 30); // as per the sla xml + String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime()); + + // Call SubmitX + SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING"); + String jobId = sc.call(); + SLACalcStatus slaEvent = slas.getSLACalculator().get(jobId); + assertEquals(jobId, slaEvent.getId()); + assertEquals("test-wf-job-sla", slaEvent.getAppName()); + assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType()); + assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime())); + assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart())); + assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd())); + + slas.runSLAWorker(); + slaEvent = (SLACalcStatus) ehs.getEventQueue().poll(); + assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus()); + assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus()); + slas.getSLACalculator().clear(); + + JPAService jpaService = Services.get().get(JPAService.class); + WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); + // set job status to succeeded, so rerun doesn't fail + wfBean.setStatus(WorkflowJob.Status.SUCCEEDED); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean)); + + // change conf for rerun + cal.setTime(new Date()); + cal.add(Calendar.MINUTE, -20); // for start_miss + nominalTime = DateUtils.formatDateOozieTZ(cal.getTime()); + + conf.set("nominal_time", nominalTime); + nominal = cal.getTime(); + cal.add(Calendar.MINUTE, 10); // as per the sla xml + expectedStart = DateUtils.formatDateOozieTZ(cal.getTime()); + cal.setTime(nominal); + cal.add(Calendar.MINUTE, 30); // as per the sla xml + expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime()); + + ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING"); + rerun.call(); + slaEvent = slas.getSLACalculator().get(jobId); + // assert for new conf + assertNotNull(slaEvent); + assertEquals(jobId, slaEvent.getId()); + assertEquals("test-wf-job-sla", slaEvent.getAppName()); + assertEquals(AppType.WORKFLOW_JOB, slaEvent.getAppType()); + + // assert for new conf + assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime())); + assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart())); + assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd())); + + // assert for values in summary bean to be reset + SLASummaryBean slaSummary = jpaService.execute(new SLASummaryGetJPAExecutor(jobId)); + assertEquals( 0, slaSummary.getEventProcessed()); + assertEquals(-1, slaSummary.getActualDuration()); + assertNull(slaSummary.getActualStart()); + assertNull(slaSummary.getActualEnd()); + assertNull(slaSummary.getJobStatus()); + assertEquals(SLAStatus.NOT_STARTED, slaSummary.getSLAStatus()); + assertNull(slaEvent.getEventStatus()); + + ehs.getEventQueue().clear(); + slas.runSLAWorker(); + slaEvent = (SLACalcStatus) ehs.getEventQueue().poll(); + assertEquals(SLAStatus.NOT_STARTED, slaEvent.getSLAStatus()); + assertEquals(EventStatus.START_MISS, slaEvent.getEventStatus()); + + } + + /** + * Test for SLA Events generated through wf action rerun + * + * @throws Exception + */ + @Test + public void testWorkflowActionSLARerun() throws Exception { + SLAService slas = services.get(SLAService.class); + String wfXml = IOUtils.getResourceAsString("wf-action-sla.xml", -1); + Path appPath = getFsTestCaseDir(); + writeToFile(wfXml, appPath, "workflow.xml"); + Configuration conf = new XConfiguration(); + conf.set(OozieClient.APP_PATH, appPath.toString()); + conf.set(OozieClient.USER_NAME, getTestUser()); + + cal.setTime(new Date()); + cal.add(Calendar.MINUTE, -20); // for start_miss + Date nominal = cal.getTime(); + String nominalTime = DateUtils.formatDateOozieTZ(nominal); + conf.set("nominal_time", nominalTime); + + // Call SubmitX + SubmitXCommand sc = new SubmitXCommand(conf, "UNIT_TESTING"); + String jobId = sc.call(); + String actionId = jobId+"@grouper"; + + slas.getSLACalculator().clear(); + JPAService jpaService = Services.get().get(JPAService.class); + WorkflowJobBean wfBean = jpaService.execute(new WorkflowJobGetJPAExecutor(jobId)); + // set job status to succeeded, so rerun doesn't fail + wfBean.setStatus(WorkflowJob.Status.SUCCEEDED); + jpaService.execute(new WorkflowJobUpdateJPAExecutor(wfBean)); + + // change conf for rerun + cal.setTime(new Date()); + nominalTime = DateUtils.formatDateOozieTZ(cal.getTime()); + conf.set("nominal_time", nominalTime); + nominal = cal.getTime(); + cal.add(Calendar.MINUTE, 10); // as per the sla xml + String expectedStart = DateUtils.formatDateOozieTZ(cal.getTime()); + cal.setTime(nominal); + cal.add(Calendar.MINUTE, 30); // as per the sla xml + String expectedEnd = DateUtils.formatDateOozieTZ(cal.getTime()); + + ReRunXCommand rerun = new ReRunXCommand(jobId, conf, "UNIT_TESTING"); + rerun.call(); + SLACalcStatus slaEvent = slas.getSLACalculator().get(actionId); + assertNotNull(slaEvent); + // assert for action configs + assertEquals(actionId, slaEvent.getId()); + assertEquals("test-wf-action-sla", slaEvent.getAppName()); + assertEquals(AppType.WORKFLOW_ACTION, slaEvent.getAppType()); + // assert for new conf + assertEquals(nominalTime, DateUtils.formatDateOozieTZ(slaEvent.getNominalTime())); + assertEquals(expectedStart, DateUtils.formatDateOozieTZ(slaEvent.getExpectedStart())); + assertEquals(expectedEnd, DateUtils.formatDateOozieTZ(slaEvent.getExpectedEnd())); + + } + @Test public void testSLASchema1BackwardCompatibility() throws Exception { EventHandlerService ehs = services.get(EventHandlerService.class); @@ -192,6 +353,7 @@ public class TestSLAEventGeneration exte ehs.getEventQueue().poll(); //ignore the wf-action event generated ehs.new EventWorker().run(); Thread.sleep(300); // time for listeners to run + ehs.getEventQueue().poll(); // ignore duration event slaEvent = (SLACalcStatus) ehs.getEventQueue().poll(); assertEquals(jobId, slaEvent.getId()); assertNotNull(slaEvent.getActualEnd()); @@ -278,6 +440,7 @@ public class TestSLAEventGeneration exte // test that sla processes the Job Event from Kill command new CoordKillXCommand(jobId).call(); ehs.new EventWorker().run(); + ehs.getEventQueue().poll(); //ignore duration event slaEvent = (SLACalcStatus) ehs.getEventQueue().poll(); assertEquals(actionId, slaEvent.getId()); assertNotNull(slaEvent.getActualEnd()); Modified: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java?rev=1491333&r1=1491332&r2=1491333&view=diff ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java (original) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLAJobEventListener.java Mon Jun 10 04:37:20 2013 @@ -91,8 +91,8 @@ public class TestSLAJobEventListener ext SLACalcStatus serviceObj = slas.getSLACalculator().get("wf1"); // check that start sla has been calculated assertEquals(EventStatus.START_MISS, serviceObj.getEventStatus()); - assertEquals(0, serviceObj.getSlaProcessed()); //Job switching to running is only partially - //sla processed. so state = zero + assertEquals(1, serviceObj.getEventProcessed()); //Job switching to running is only partially + //sla processed. so state = 1 job = _createSLARegBean("wa1", AppType.WORKFLOW_ACTION); slas.addRegistrationEvent(job); @@ -114,24 +114,27 @@ public class TestSLAJobEventListener ext "coord-app-name1", actualStart, actualEnd); listener.onCoordinatorJobEvent(cje); - // Since serviceObj is removed from memory after END stage SLASummaryBean summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("cj1")); - // check that end sla has been calculated - assertEquals(2, summary.getSlaProcessed()); //Job in terminal state has finished - //sla processing. so state = 2 + // check that end and duration sla has been calculated + assertEquals(6, summary.getEventProcessed()); + assertEquals(EventStatus.END_MET, summary.getEventStatus()); job = _createSLARegBean("ca1", AppType.COORDINATOR_ACTION); actualEnd = DateUtils.parseDateUTC("2012-07-22T02:00Z"); slas.addRegistrationEvent(job); - assertEquals(3, slas.getSLACalculator().size()); - CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1", + assertEquals(4, slas.getSLACalculator().size()); + CoordinatorActionEvent cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.RUNNING, "user1", + "coord-app-name1", null, actualEnd, null); + listener.onCoordinatorActionEvent(cae); + cae = new CoordinatorActionEvent("ca1", "cj1", CoordinatorAction.Status.KILLED, "user1", "coord-app-name1", null, actualEnd, null); listener.onCoordinatorActionEvent(cae); summary = Services.get().get(JPAService.class).execute(new SLASummaryGetJPAExecutor("ca1")); - // check that start sla has been calculated - assertEquals(2, summary.getSlaProcessed()); + // check that all events are processed + assertEquals(8, summary.getEventProcessed()); assertEquals(EventStatus.END_MISS, summary.getEventStatus()); + assertEquals(3, slas.getSLACalculator().size()); } Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java?rev=1491333&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLARegistrationGetRecordsOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013 @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.sla; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLARegistrationGetOnRestartJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLARegistrationBean; +import org.apache.oozie.test.XDataTestCase; + +public class TestSLARegistrationGetRecordsOnRestartJPAExecutor extends XDataTestCase { + Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + cleanUpDBTables(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testSLARegistrationGetRecordsOnRestart() throws Exception { + Date current = new Date(); + final String jobId = "0000000-" + current.getTime() + "-TestSLARegGetRestartJPAExecutor-W"; + SLARegistrationBean reg = new SLARegistrationBean(); + reg.setJobId(jobId); + reg.setNotificationMsg("dummyMessage"); + reg.setUpstreamApps("upApps"); + reg.setAlertEvents("miss"); + reg.setAlertContact("[email protected]"); + reg.setJobData("jobData"); + JPAService jpaService = Services.get().get(JPAService.class); + List<JsonBean> insert = new ArrayList<JsonBean>(); + insert.add(reg); + SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null); + jpaService.execute(slaInsertCmd); + assertNotNull(jpaService); + SLARegistrationGetOnRestartJPAExecutor readCmd = new SLARegistrationGetOnRestartJPAExecutor(jobId); + SLARegistrationBean bean = jpaService.execute(readCmd); + assertEquals("dummyMessage", bean.getNotificationMsg()); + assertEquals ("upApps", bean.getUpstreamApps()); + assertEquals ("miss", bean.getAlertEvents()); + assertEquals ("[email protected]", bean.getAlertContact()); + assertEquals ("jobData", bean.getJobData()); + } + +} Added: oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java URL: http://svn.apache.org/viewvc/oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java?rev=1491333&view=auto ============================================================================== --- oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java (added) +++ oozie/trunk/core/src/test/java/org/apache/oozie/sla/TestSLASummaryGetOnRestartJPAExecutor.java Mon Jun 10 04:37:20 2013 @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.sla; + +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +import org.apache.oozie.AppType; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.sla.SLACalculationInsertUpdateJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLARegistrationGetJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLASummaryGetJPAExecutor; +import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; +import org.apache.oozie.service.JPAService; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLARegistrationBean; +import org.apache.oozie.test.XDataTestCase; + +public class TestSLASummaryGetOnRestartJPAExecutor extends XDataTestCase { + Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + cleanUpDBTables(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testSLARegistrationGet() throws Exception { + JPAService jpaService = Services.get().get(JPAService.class); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); + SLASummaryBean sla1 = new SLASummaryBean(); + sla1.setJobId("jobId"); + sla1.setAppName("appName"); + sla1.setUser("user"); + sla1.setParentId("parent"); + sla1.setEventProcessed(7); + // set to 5 days back from now + sla1.setLastModifiedTime(new Date(System.currentTimeMillis() - 5*24*60*60*1000)); + + SLASummaryBean sla2 = new SLASummaryBean(); + sla2.setJobId("jobId2"); + sla2.setEventProcessed(6); + // set to long time back + sla2.setLastModifiedTime(sdf.parse("2009-06-03")); + + List<JsonBean> insert = new ArrayList<JsonBean>(); + insert.add(sla1); + insert.add(sla2); + SLACalculationInsertUpdateJPAExecutor slaInsertCmd = new SLACalculationInsertUpdateJPAExecutor(insert, null); + jpaService.execute(slaInsertCmd); + // get all records modified in last 7 days + SLASummaryGetRecordsOnRestartJPAExecutor slaGetOnRestart = new SLASummaryGetRecordsOnRestartJPAExecutor(7); + List<SLASummaryBean> beans = jpaService.execute(slaGetOnRestart); + assertEquals(1, beans.size()); + assertEquals("jobId", beans.get(0).getJobId()); + assertEquals("appName", beans.get(0).getAppName()); + assertEquals("user", beans.get(0).getUser()); + assertEquals("parent", beans.get(0).getParentId()); + assertEquals(7, beans.get(0).getEventProcessed()); + } + +} Modified: oozie/trunk/release-log.txt URL: http://svn.apache.org/viewvc/oozie/trunk/release-log.txt?rev=1491333&r1=1491332&r2=1491333&view=diff ============================================================================== --- oozie/trunk/release-log.txt (original) +++ oozie/trunk/release-log.txt Mon Jun 10 04:37:20 2013 @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1339 Implement SLA Bootstrap Service and fix bugs in SLACalculator (virag) OOZIE-1400 REST API to fetch SLA (rohini) OOZIE-1375 Generate Job notification events for Workflow Actions (mona) OOZIE-1357 Can't view more than 1000 actions of a coordinator and paging does not work (ryota)
