Repository: oozie Updated Branches: refs/heads/master bdc41b258 -> d5b13db2b
OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mona) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d5b13db2 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d5b13db2 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d5b13db2 Branch: refs/heads/master Commit: d5b13db2b17578a13afcab7de267ce48000410cd Parents: bdc41b2 Author: mona <[email protected]> Authored: Fri Jul 11 13:02:22 2014 -0700 Committer: mona <[email protected]> Committed: Fri Jul 11 13:02:22 2014 -0700 ---------------------------------------------------------------------- .../oozie/service/EventHandlerService.java | 27 ++- .../org/apache/oozie/sla/SLACalcStatus.java | 22 ++- .../apache/oozie/sla/SLACalculatorMemory.java | 176 ++++++++++--------- .../java/org/apache/oozie/util/LogUtils.java | 6 + .../apache/oozie/service/TestHASLAService.java | 93 +++++++--- release-log.txt | 1 + 6 files changed, 208 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/service/EventHandlerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java index 4207a07..6c075ab 100644 --- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java +++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java @@ -33,6 +33,7 @@ import org.apache.oozie.event.WorkflowJobEvent; import org.apache.oozie.event.listener.JobEventListener; import org.apache.oozie.sla.listener.SLAEventListener; import org.apache.oozie.client.event.SLAEvent; +import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; import java.util.ArrayList; @@ -71,6 +72,7 @@ public class EventHandlerService implements Service { try { Configuration conf = services.getConf(); LOG = XLog.getLog(getClass()); + LOG = XLog.resetPrefix(LOG); Class<? extends EventQueue> queueImpl = (Class<? extends EventQueue>) conf.getClass(CONF_EVENT_QUEUE, null); eventQueue = queueImpl == null ? new MemoryEventQueue() : (EventQueue) queueImpl.newInstance(); eventQueue.init(conf); @@ -206,7 +208,8 @@ public class EventHandlerService implements Service { return listenerMap.toString(); } - public void queueEvent(Event event) { + public synchronized void queueEvent(Event event) { + setLogPrefix(LOG, event); LOG.debug("Queueing event : {0}", event); LOG.trace("Stack trace while queueing event : {0}", event, new Throwable()); eventQueue.add(event); @@ -216,7 +219,24 @@ public class EventHandlerService implements Service { return eventQueue; } + private void setLogPrefix(XLog logObj, Event event) { + logObj = XLog.resetPrefix(logObj); + if (event instanceof JobEvent) { + JobEvent je = (JobEvent) event; + LogUtils.setLogPrefix(je.getId(), je.getAppName(), new XLog.Info()); + } + else if (event instanceof SLAEvent) { + SLAEvent se = (SLAEvent) event; + LogUtils.setLogPrefix(se.getId(), se.getAppName(), new XLog.Info()); + } + } + public class EventWorker implements Runnable { + private XLog workerLog; + + public EventWorker() { + workerLog = XLog.getLog(getClass()); + } @Override public void run() { @@ -227,7 +247,10 @@ public class EventHandlerService implements Service { if (!eventQueue.isEmpty()) { List<Event> work = eventQueue.pollBatch(); for (Event event : work) { - LOG.debug("Processing event : {0}", event); + synchronized (workerLog) { + setLogPrefix(workerLog, event); + LOG.debug("Processing event : {0}", event); + } MessageType msgType = event.getMsgType(); List<?> listeners = listenerMap.get(msgType); if (listeners != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java index f148db3..5349b33 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java @@ -21,16 +21,13 @@ package org.apache.oozie.sla; import java.util.Date; import org.apache.oozie.AppType; -import org.apache.oozie.ErrorCode; import org.apache.oozie.client.event.SLAEvent; -import org.apache.oozie.command.CommandException; import org.apache.oozie.lock.LockToken; -import org.apache.oozie.service.InstrumentationService; import org.apache.oozie.service.JobsConcurrencyService; import org.apache.oozie.service.MemoryLocksService; import org.apache.oozie.service.Services; import org.apache.oozie.sla.service.SLAService; -import org.apache.oozie.util.Instrumentation; +import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; /** @@ -51,9 +48,12 @@ public class SLACalcStatus extends SLAEvent { private byte eventProcessed; private LockToken lock; + private XLog LOG; + public SLACalcStatus(SLARegistrationBean reg) { this(); setSLARegistrationBean(reg); + setLogPrefix(); } public SLACalcStatus(SLASummaryBean summary, SLARegistrationBean regBean) { @@ -82,6 +82,7 @@ public class SLACalcStatus extends SLAEvent { setEventStatus(summary.getEventStatus()); setLastModifiedTime(summary.getLastModifiedTime()); setEventProcessed(summary.getEventProcessed()); + setLogPrefix(); } /** @@ -98,11 +99,13 @@ public class SLACalcStatus extends SLAEvent { setActualEnd(a.getActualEnd()); setActualDuration(a.getActualDuration()); setEventProcessed(a.getEventProcessed()); + setLogPrefix(); } public SLACalcStatus() { setMsgType(MessageType.SLA); setLastModifiedTime(new Date()); + LOG = XLog.getLog(getClass()); } public SLARegistrationBean getSLARegistrationBean() { @@ -292,10 +295,10 @@ public class SLACalcStatus extends SLAEvent { if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { lock = Services.get().get(MemoryLocksService.class).getWriteLock(getEntityKey(), getLockTimeOut()); if (lock == null) { - XLog.getLog(getClass()).debug("Could not aquire lock for [{0}]", getEntityKey()); + LOG.debug("Could not aquire lock for [{0}]", getEntityKey()); } else { - XLog.getLog(getClass()).debug("Acquired lock for [{0}]", getEntityKey()); + LOG.debug("Acquired lock for [{0}]", getEntityKey()); } } else { @@ -321,11 +324,16 @@ public class SLACalcStatus extends SLAEvent { if (lock != null) { lock.release(); lock = null; - XLog.getLog(getClass()).debug("Released lock for [{0}]", getEntityKey()); + LOG.debug("Released lock for [{0}]", getEntityKey()); } } public long getLockTimeOut() { return Services.get().getConf().getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000); } + + private void setLogPrefix() { + LOG = XLog.resetPrefix(LOG); + LogUtils.setLogPrefix(this.getId(), this.getAppName(), new XLog.Info()); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index 47c723d..5b30fc0 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -86,7 +86,7 @@ public class SLACalculatorMemory implements SLACalculator { private static final XLog LOG = XLog.getLog(SLACalculatorMemory.class); // TODO optimization priority based insertion/processing/bumping up-down protected Map<String, SLACalcStatus> slaMap; - protected static Set<String> historySet; + protected Set<String> historySet; private static int capacity; private static JPAService jpaService; protected EventHandlerService eventHandler; @@ -477,105 +477,108 @@ public class SLACalculatorMemory implements SLACalculator { Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId); byte eventProc = ((Byte) eventProcObj).byteValue(); - slaCalc.setEventProcessed(eventProc); - SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); - // calculation w.r.t current time and status - if ((eventProc & 1) == 0) { // first bit (start-processed) unset - if (reg.getExpectedStart() != null) { - if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { - confirmWithDB(slaCalc); - eventProc = slaCalc.getEventProcessed(); - if (eventProc != 8 && (eventProc & 1) == 0) { - // Some DB exception - slaCalc.setEventStatus(EventStatus.START_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - eventProc++; + if (eventProc >= 7) { + if (eventProc == 7) { + historySet.add(jobId); + } + slaMap.remove(jobId); + LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); + } + else { + slaCalc.setEventProcessed(eventProc); + SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); + // calculation w.r.t current time and status + if ((eventProc & 1) == 0) { // first bit (start-processed) unset + if (reg.getExpectedStart() != null) { + if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { + confirmWithDB(slaCalc); + eventProc = slaCalc.getEventProcessed(); + if (eventProc != 8 && (eventProc & 1) == 0) { + // Some DB exception + slaCalc.setEventStatus(EventStatus.START_MISS); + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + eventProc++; + } + change = true; } + } + else { + eventProc++; // disable further processing for optional start sla condition change = true; } } - else { - eventProc++; // disable further processing for optional - // start sla condition - change = true; - } - } - // check if second bit (duration-processed) is unset - if (((eventProc >> 1) & 1) == 0 && eventProc != 8) { - if (reg.getExpectedDuration() == -1) { - eventProc += 2; - change = true; + // check if second bit (duration-processed) is unset + if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { + if (reg.getExpectedDuration() == -1) { + eventProc += 2; + change = true; + } + else if (slaCalc.getActualStart() != null) { + if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc + .getActualStart().getTime())) { + slaCalc.setEventProcessed(eventProc); + confirmWithDB(slaCalc); + eventProc = slaCalc.getEventProcessed(); + if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { + // Some DB exception + slaCalc.setEventStatus(EventStatus.DURATION_MISS); + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + eventProc += 2; + } + change = true; + } + } } - else if (slaCalc.getActualStart() != null) { - if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc - .getActualStart().getTime())) { + if (eventProc < 4) { + if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { slaCalc.setEventProcessed(eventProc); confirmWithDB(slaCalc); eventProc = slaCalc.getEventProcessed(); - if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { - // Some DB exception - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - eventProc += 2; - } change = true; } } - } - if (eventProc < 4) { - if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { - slaCalc.setEventProcessed(eventProc); - confirmWithDB(slaCalc); - eventProc = slaCalc.getEventProcessed(); - change = true; - } - } - if (change) { - try { - boolean locked = true; - slaCalc.acquireLock(); - locked = slaCalc.isLocked(); - if (locked) { - // no more processing, no transfer to history set - if (slaCalc.getEventProcessed() >= 8) { - eventProc = 8; - // Should not be > 8. But to handle any corner cases - slaCalc.setEventProcessed(8); - slaMap.remove(jobId); - } - else { - slaCalc.setEventProcessed(eventProc); - } - SLASummaryBean slaSummaryBean = new SLASummaryBean(); - slaSummaryBean.setId(slaCalc.getId()); - slaSummaryBean.setEventProcessed(eventProc); - slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); - slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); - slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); - slaSummaryBean.setActualStart(slaCalc.getActualStart()); - slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); - slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); - slaSummaryBean.setLastModifiedTime(new Date()); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean); - if (eventProc == 7) { - historySet.add(jobId); - slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map after End-processed", jobId); + if (change) { + try { + boolean locked = true; + slaCalc.acquireLock(); + locked = slaCalc.isLocked(); + if (locked) { + // no more processing, no transfer to history set + if (slaCalc.getEventProcessed() >= 8) { + eventProc = 8; + // Should not be > 8. But to handle any corner cases + slaCalc.setEventProcessed(8); + slaMap.remove(jobId); + } + else { + slaCalc.setEventProcessed(eventProc); + } + SLASummaryBean slaSummaryBean = new SLASummaryBean(); + slaSummaryBean.setId(slaCalc.getId()); + slaSummaryBean.setEventProcessed(eventProc); + slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); + slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); + slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); + slaSummaryBean.setActualStart(slaCalc.getActualStart()); + slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); + slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); + slaSummaryBean.setLastModifiedTime(new Date()); + SLASummaryQueryExecutor.getInstance().executeUpdate( + SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean); + if (eventProc == 7) { + historySet.add(jobId); + slaMap.remove(jobId); + LOG.trace("Removed Job [{0}] from map after End-processed", jobId); + } } } + catch (InterruptedException e) { + throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut()); + } + finally { + slaCalc.releaseLock(); + } } - catch (InterruptedException e) { - throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut()); - } - finally { - slaCalc.releaseLock(); - } - } - else if (eventProc >= 7) { - historySet.add(jobId); - slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map after End-processed", jobId); } } } @@ -965,7 +968,6 @@ public class SLACalculatorMemory implements SLACalculator { CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId())); if (ca.isTerminalWithFailure()) { isEndMiss = ended = true; - slaCalc.setActualStart(null); slaCalc.setActualEnd(ca.getLastModifiedTime()); } if (ca.getExternalId() != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/main/java/org/apache/oozie/util/LogUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/LogUtils.java b/core/src/main/java/org/apache/oozie/util/LogUtils.java index fd5b5b6..723ac36 100644 --- a/core/src/main/java/org/apache/oozie/util/LogUtils.java +++ b/core/src/main/java/org/apache/oozie/util/LogUtils.java @@ -125,4 +125,10 @@ public class LogUtils { XLog.Info.get().setParameters(logInfo); } + public static void setLogPrefix(String jobId, String appName, XLog.Info logInfo) { + logInfo.setParameter(DagXLogInfoService.JOB, jobId); + logInfo.setParameter(DagXLogInfoService.APP, appName); + XLog.Info.get().setParameters(logInfo); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/core/src/test/java/org/apache/oozie/service/TestHASLAService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java index eec5369..419e98b 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java @@ -30,14 +30,15 @@ import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.WorkflowJob; -import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.client.event.JobEvent.EventStatus; +import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.client.event.SLAEvent.SLAStatus; import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.event.EventQueue; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; -import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; @@ -164,27 +165,26 @@ public class TestHASLAService extends ZKXTestCase { public void testSLAUpdateWithHA() throws Exception { - String id1 = "0000000-130521183438837-oozie-test-C@1"; - String id2 = "0000001-130521183438837-oozie-test-C@1"; - String id3 = "0000002-130521183438837-oozie-test-C@1"; - String id4 = "0000003-130521183438837-oozie-test-C@1"; - String id5 = "0000004-130521183438837-oozie-test-C@1"; - String id6 = "0000005-130521183438837-oozie-test-C@1"; - Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); + String id1 = "0000001-130521183438837-oozie-test-C@1"; + String id2 = "0000002-130521183438837-oozie-test-C@1"; + String id3 = "0000003-130521183438837-oozie-test-C@1"; + String id4 = "0000004-130521183438837-oozie-test-C@1"; + String id5 = "0000005-130521183438837-oozie-test-C@1"; + String id6 = "0000006-130521183438837-oozie-test-C@1"; + Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // 2 hrs passed Date expectedEndTS1 = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // 1 hour ahead Date expectedEndTS2 = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // 1 hour passed - // Coord Action 1-4 not started yet + // Coord Action of jobs 1-4 not started yet createDBEntry(id1, expectedStartTS, expectedEndTS1); createDBEntry(id2, expectedStartTS, expectedEndTS1); createDBEntry(id3, expectedStartTS, expectedEndTS1); createDBEntry(id4, expectedStartTS, expectedEndTS1); - // Coord Action 5-6 already started and currently running (to test history set) + // Coord Action of jobs 5-6 already started and currently running (to test history set) createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2); createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2); SLAService slas = Services.get().get(SLAService.class); SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); - EventHandlerService ehs = Services.get().get(EventHandlerService.class); slaCalcMem.init(Services.get().getConf()); List<String> slaMapKeys = new ArrayList<String>(); Iterator<String> itr = slaCalcMem.iterator(); @@ -209,14 +209,14 @@ public class TestHASLAService extends ZKXTestCase { } assertEquals(6, slaMapKeys.size()); - // Coord Action 1,3 run and update status on non-dummy server + // Coord Action 1,3 run and update status on *non-dummy* server updateCoordAction(id1, "RUNNING"); slaCalcMem .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); updateCoordAction(id3, "FAILED"); slaCalcMem.addJobStatus(id3, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, null, new Date()); - // Coord Action 2,4 run and update status on dummy server + // Coord Action 2,4 run and update status on *dummy* server updateCoordAction(id2, "RUNNING"); dummySlaCalcMem.addJobStatus(id2, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); @@ -244,10 +244,10 @@ public class TestHASLAService extends ZKXTestCase { Byte eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id3); - assertEquals(8, eventProc.intValue()); + assertEquals(8, eventProc.byteValue()); eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id4); - assertEquals(8, eventProc.intValue()); + assertEquals(8, eventProc.byteValue()); // Action 5 was processed as END_MISS in updateAllSlaStatus, put into history set assertTrue(slaCalcMem.isJobIdInHistorySet(id5)); @@ -258,10 +258,10 @@ public class TestHASLAService extends ZKXTestCase { eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id5); - assertEquals(7, eventProc.intValue()); + assertEquals(7, eventProc.byteValue()); eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id6); - assertEquals(7, eventProc.intValue()); + assertEquals(7, eventProc.byteValue()); // Action 1 Succeeded on non-dummy server updateCoordAction(id1, "SUCCEEDED"); @@ -284,10 +284,10 @@ public class TestHASLAService extends ZKXTestCase { assertNull(dummySlaCalcMem.get(id2)); eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id1); - assertEquals(8, eventProc.intValue()); + assertEquals(8, eventProc.byteValue()); eventProc = (Byte) SLASummaryQueryExecutor.getInstance().getSingleValue( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, id2); - assertEquals(8, eventProc.intValue()); + assertEquals(8, eventProc.byteValue()); // Test HistoryPurgeWorker purges Action 5,6 from history set updateCoordAction(id5, "SUCCEEDED"); @@ -295,8 +295,59 @@ public class TestHASLAService extends ZKXTestCase { assertFalse(slaCalcMem.isJobIdInHistorySet(id5)); updateCoordAction(id6, "SUCCEEDED"); dummySlaCalcMem.new HistoryPurgeWorker().run(); - assertFalse(slaCalcMem.isJobIdInHistorySet(id6)); + assertFalse(dummySlaCalcMem.isJobIdInHistorySet(id6)); + + } + finally { + if (dummyOozie_1 != null) { + dummyOozie_1.teardown(); + } + } + } + + public void testNoDuplicateEventsInHA() throws Exception { + String id1 = "0000001-130521183438837-oozie-test-C@1"; + Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS + Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS + createDBEntry(id1, expectedStartTS, expectedEndTS); + + SLAService slas = Services.get().get(SLAService.class); + SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); + slaCalcMem.init(Services.get().getConf()); // loads the job in sla map + + EventHandlerService ehs = Services.get().get(EventHandlerService.class); + EventQueue ehs_q = ehs.getEventQueue(); + DummyZKOozie dummyOozie_1 = null; + try { + // start another dummy oozie instance (dummy sla and event handler services) + dummyOozie_1 = new DummyZKOozie("a", "http://blah"); + DummySLACalculatorMemory dummySlaCalcMem = new DummySLACalculatorMemory(); + dummySlaCalcMem.init(Services.get().getConf()); + EventHandlerService dummyEhs = new EventHandlerService(); + dummySlaCalcMem.setEventHandlerService(dummyEhs); + dummyEhs.init(Services.get()); + EventQueue dummyEhs_q = dummyEhs.getEventQueue(); + + // Action started on Server 1 + updateCoordAction(id1, "RUNNING"); + slaCalcMem + .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); + SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll(); + assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus()); + + // Action ended on Server 2 + updateCoordAction(id1, "FAILED"); + dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date( + System.currentTimeMillis() - 1800 * 1000), + new Date()); + dummyEhs_q.poll(); // getting rid of the duration_miss event + SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll(); + assertEquals(SLAStatus.MISS, s2.getSLAStatus()); + + slaCalcMem.updateAllSlaStatus(); + dummySlaCalcMem.updateAllSlaStatus(); + assertEquals(0, ehs_q.size()); // no dupe event should be created again by Server 1 } finally { if (dummyOozie_1 != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/d5b13db2/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index a4b456f..81559eb 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.1.0 release (trunk - unreleased) +OOZIE-1911 SLA calculation in HA mode does wrong bit comparison for 'start' and 'duration' (mna) OOZIE-1926 make gz blob compression as default (ryota) OOZIE-1916 Use Curator leader latch instead of checking the order of Oozie servers (rkanter) OOZIE-1886 Queue operation talking longer time (shwethags via rohini)
