http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java 
b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
index 42313fd..e8638a9 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java
@@ -31,47 +31,28 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.AppType;
-import org.apache.oozie.CoordinatorActionBean;
-import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.XException;
 import org.apache.oozie.client.CoordinatorAction;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.client.event.JobEvent;
-import org.apache.oozie.client.event.SLAEvent.EventStatus;
 import org.apache.oozie.client.event.SLAEvent.SLAStatus;
 import org.apache.oozie.client.rest.JsonBean;
 import org.apache.oozie.client.rest.RestConstants;
 import org.apache.oozie.command.CommandException;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor;
 import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry;
-import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor;
-import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery;
 import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor;
 import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor;
 import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery;
-import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor;
-import 
org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import 
org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor;
-import org.apache.oozie.lock.LockToken;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.EventHandlerService;
 import org.apache.oozie.service.JPAService;
-import org.apache.oozie.service.JobsConcurrencyService;
-import org.apache.oozie.service.MemoryLocksService;
 import org.apache.oozie.service.SchedulerService;
 import org.apache.oozie.service.ServiceException;
 import org.apache.oozie.service.Services;
@@ -80,8 +61,8 @@ import org.apache.oozie.util.DateUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.XLog;
 import org.apache.oozie.util.Pair;
-import com.google.common.annotations.VisibleForTesting;
 
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Implementation class for SLACalculator that calculates SLA related to
@@ -111,14 +92,14 @@ public class SLACalculatorMemory implements SLACalculator {
         modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7);
         loadOnRestart();
         Runnable purgeThread = new HistoryPurgeWorker();
-        // schedule runnable by default 1 day
+        // schedule runnable by default 1 hours
         Services.get()
                 .get(SchedulerService.class)
-                .schedule(purgeThread, 86400, 
Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 
86400),
+                .schedule(purgeThread, 3600, 
Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 
3600),
                         SchedulerService.Unit.SEC);
     }
 
-    public class HistoryPurgeWorker implements Runnable {
+    public class HistoryPurgeWorker extends Thread {
 
         public HistoryPurgeWorker() {
         }
@@ -131,327 +112,88 @@ public class SLACalculatorMemory implements 
SLACalculator {
             Iterator<String> jobItr = historySet.iterator();
             while (jobItr.hasNext()) {
                 String jobId = jobItr.next();
-
-                if (jobId.endsWith("-W")) {
-                    WorkflowJobBean wfJob = null;
-                    try {
-                        wfJob = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS,
 jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the workflow job: " + 
jobId, e);
-                        }
-                    }
-                    if (wfJob != null && wfJob.inTerminalState()) {
-                        try {
-                            updateSLASummary(wfJob.getId(), 
wfJob.getStartTime(), wfJob.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when 
purging history set entry for " + jobId, e);
-                        }
-
-                    }
-                }
-                else if (jobId.contains("-W@")) {
-                    WorkflowActionBean wfAction = null;
-                    try {
-                        wfAction = 
WorkflowActionQueryExecutor.getInstance().get(
-                                WorkflowActionQuery.GET_ACTION_COMPLETED, 
jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the workflow action: " + 
jobId, e);
-                        }
-                    }
-                    if (wfAction != null && (wfAction.isComplete() || 
wfAction.isTerminalWithFailure())) {
-                        try {
-                            updateSLASummary(wfAction.getId(), 
wfAction.getStartTime(), wfAction.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when 
purging history set entry for " + jobId, e);
-                        }
-                    }
-                }
-                else if (jobId.contains("-C@")) {
-                    CoordinatorActionBean cAction = null;
-                    try {
-                        cAction = 
CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, 
jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0605)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the coord action: " + 
jobId, e);
-                        }
-                    }
-                    if (cAction != null && cAction.isTerminalStatus()) {
-                        try {
-                            updateSLASummaryForCoordAction(cAction);
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            XLog.getLog(SLACalculatorMemory.class).info(
-                                    "Failed to update SLASummaryBean when 
purging history set entry for " + jobId, e);
-                        }
-
+                LOG.debug(" Running HistoryPurgeWorker for " + jobId);
+                try {
+                    boolean isDone = 
SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call();
+                    if (isDone) {
+                        LOG.debug("[{0}] job is finished and processed. 
Removing from history");
+                        jobItr.remove();
                     }
                 }
-                else if (jobId.endsWith("-C")) {
-                    CoordinatorJobBean cJob = null;
-                    try {
-                        cJob = 
CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID,
-                                jobId);
-                    }
-                    catch (JPAExecutorException e) {
-                        if (e.getErrorCode().equals(ErrorCode.E0604)) {
-                            jobItr.remove();
-                        }
-                        else {
-                            LOG.info("Failed to fetch the coord job: " + 
jobId, e);
-                        }
+                catch (CommandException e) {
+                    if (e.getErrorCode().equals(ErrorCode.E0604) || 
e.getErrorCode().equals(ErrorCode.E0605)) {
+                        LOG.warn("Job is not found in db: " + jobId, e);
+                        jobItr.remove();
                     }
-                    if (cJob != null && cJob.isTerminalStatus()) {
-                        try {
-                            updateSLASummary(cJob.getId(), 
cJob.getStartTime(), cJob.getEndTime());
-                            jobItr.remove();
-                        }
-                        catch (JPAExecutorException e) {
-                            LOG.info("Failed to update SLASummaryBean when 
purging history set entry for " + jobId, e);
-                        }
-
+                    else {
+                        LOG.error("Failed to fetch the job: " + jobId, e);
                     }
                 }
             }
         }
-
-        private void updateSLASummary(String id, Date startTime, Date endTime) 
throws JPAExecutorException {
-            SLASummaryBean sla = 
SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id);
-            if (sla != null) {
-                sla.setActualStart(startTime);
-                sla.setActualEnd(endTime);
-                if (startTime != null && endTime != null) {
-                    sla.setActualDuration(endTime.getTime() - 
startTime.getTime());
-                }
-                sla.setLastModifiedTime(new Date());
-                sla.setEventProcessed(8);
-                SLASummaryQueryExecutor.getInstance().executeUpdate(
-                        SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, 
sla);
-            }
-        }
-
-        private void updateSLASummaryForCoordAction(CoordinatorActionBean 
bean) throws JPAExecutorException {
-            String wrkflowId = bean.getExternalId();
-            if (wrkflowId != null) {
-                WorkflowJobBean wrkflow = 
WorkflowJobQueryExecutor.getInstance().get(
-                        WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, 
wrkflowId);
-                if (wrkflow != null) {
-                    updateSLASummary(bean.getId(), wrkflow.getStartTime(), 
wrkflow.getEndTime());
-                }
-            }
-        }
     }
 
     private void loadOnRestart() {
-        boolean isJobModified = false;
+        long slaPendingCount = 0;
+        long statusPendingCount = 0;
+
         try {
-            long slaPendingCount = 0;
-            long statusPendingCount = 0;
             List<SLASummaryBean> summaryBeans = jpaService.execute(new 
SLASummaryGetRecordsOnRestartJPAExecutor(
                     modifiedAfter));
             for (SLASummaryBean summaryBean : summaryBeans) {
                 String jobId = summaryBean.getId();
-                LockToken lock = null;
-                switch (summaryBean.getAppType()) {
-                    case COORDINATOR_ACTION:
-                        isJobModified = 
processSummaryBeanForCoordAction(summaryBean, jobId);
-                        break;
-                    case WORKFLOW_ACTION:
-                        isJobModified = 
processSummaryBeanForWorkflowAction(summaryBean, jobId);
-                        break;
-                    case WORKFLOW_JOB:
-                        isJobModified = 
processSummaryBeanForWorkflowJob(summaryBean, jobId);
-                        break;
-                    default:
-                        break;
-                }
-                if (isJobModified) {
-                    try {
-                        boolean update = true;
-                        if 
(Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-                            lock = Services
-                                    .get()
-                                    .get(MemoryLocksService.class)
-                                    .getWriteLock(
-                                            SLACalcStatus.SLA_ENTITYKEY_PREFIX 
+ jobId,
-                                            Services.get().getConf()
-                                                    
.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000));
-                            if (lock == null) {
-                                update = false;
-                            }
-                        }
-                        if (update) {
-                            summaryBean.setLastModifiedTime(new Date());
-                            
SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                    
SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean);
-                        }
-                    }
-                    catch (Exception e) {
-                        LOG.warn("Failed to load records for " + jobId, e);
-                    }
-                    finally {
-                        if (lock != null) {
-                            lock.release();
-                            lock = null;
-                        }
-                    }
-                }
+
+                SLARegistrationBean slaRegBean = 
SLARegistrationQueryExecutor.getInstance().get(
+                        SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
+                SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, 
slaRegBean);
+
+                // Processed missed jobs
                 try {
-                    if (summaryBean.getEventProcessed() == 7) {
-                        historySet.add(jobId);
-                        statusPendingCount++;
-                    }
-                    else if (summaryBean.getEventProcessed() <= 7) {
-                        SLARegistrationBean slaRegBean = 
SLARegistrationQueryExecutor.getInstance().get(
-                                SLARegQuery.GET_SLA_REG_ON_RESTART, jobId);
-                        SLACalcStatus slaCalcStatus = new 
SLACalcStatus(summaryBean, slaRegBean);
-                        slaMap.put(jobId, slaCalcStatus);
-                        slaPendingCount++;
-                    }
+                    
SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call();
                 }
-                catch (Exception e) {
-                    LOG.warn("Failed to fetch/update records for " + jobId, e);
+                catch (Throwable e) {
+                    LOG.error("Error while updating job {0}", 
slaCalcStatus.getId(), e);
                 }
 
+                if (slaCalcStatus.getEventProcessed() == 7) {
+                    historySet.add(jobId);
+                    statusPendingCount++;
+                    LOG.debug("Adding job [{0}] to historySet. EventProcessed 
is [{1}]", slaCalcStatus,
+                            slaCalcStatus);
+                }
+                else if (slaCalcStatus.getEventProcessed() < 7) {
+                    slaMap.put(jobId, slaCalcStatus);
+                    slaPendingCount++;
+                    LOG.debug("Adding job [{0}] to slamap. EventProcessed is 
[{1}]", slaCalcStatus,
+                            slaCalcStatus);
+
+                }
             }
             LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", 
pendingStatusUpdate=" + statusPendingCount);
-
         }
         catch (Exception e) {
             LOG.warn("Failed to retrieve SLASummary records on restart", e);
         }
     }
 
-    private boolean processSummaryBeanForCoordAction(SLASummaryBean 
summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        CoordinatorActionBean coordAction = null;
-        coordAction = jpaService.execute(new 
CoordActionGetForSLAJPAExecutor(jobId));
-        if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Coordinator action status is " + 
coordAction.getStatusStr() + " and summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(coordAction.getStatusStr());
-            if (coordAction.isTerminalStatus()) {
-                WorkflowJobBean wfJob = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(coordAction
-                        .getExternalId()));
-                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), 
coordAction.getLastModifiedTime(),
-                        coordAction.getStatusStr());
-            }
-            else if (coordAction.getStatus() != 
CoordinatorAction.Status.WAITING) {
-                WorkflowJobBean wfJob = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(coordAction
-                        .getExternalId()));
-                setStartForSLASummaryBean(summaryBean, 
summaryBean.getEventProcessed(), wfJob.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private boolean processSummaryBeanForWorkflowAction(SLASummaryBean 
summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        WorkflowActionBean wfAction = null;
-        wfAction = jpaService.execute(new 
WorkflowActionGetForSLAJPAExecutor(jobId));
-        if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Workflow action status is " + wfAction.getStatusStr() + 
"and summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(wfAction.getStatusStr());
-            if (wfAction.inTerminalState()) {
-                setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), 
wfAction.getEndTime(), wfAction.getStatusStr());
-            }
-            else if (wfAction.getStatus() != WorkflowAction.Status.PREP) {
-                setStartForSLASummaryBean(summaryBean, 
summaryBean.getEventProcessed(), wfAction.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private boolean processSummaryBeanForWorkflowJob(SLASummaryBean 
summaryBean, String jobId)
-            throws JPAExecutorException {
-        boolean isJobModified = false;
-        WorkflowJobBean wfJob = null;
-        wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId));
-        if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) {
-            LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and 
summary bean status is "
-                    + summaryBean.getJobStatus());
-            isJobModified = true;
-            summaryBean.setJobStatus(wfJob.getStatusStr());
-            if (wfJob.inTerminalState()) {
-                setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), 
wfJob.getEndTime(), wfJob.getStatusStr());
-            }
-            else if (wfJob.getStatus() != WorkflowJob.Status.PREP) {
-                setStartForSLASummaryBean(summaryBean, 
summaryBean.getEventProcessed(), wfJob.getStartTime());
-            }
-        }
-        return isJobModified;
-    }
-
-    private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date 
startTime, Date endTime, String status) {
-        byte eventProc = summaryBean.getEventProcessed();
-        summaryBean.setEventProcessed(8);
-        summaryBean.setActualStart(startTime);
-        summaryBean.setActualEnd(endTime);
-        long actualDuration = endTime.getTime() - startTime.getTime();
-        summaryBean.setActualDuration(actualDuration);
-        if (eventProc < 4) {
-            if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || 
status.equals(WorkflowAction.Status.OK.name())
-                    || 
status.equals(CoordinatorAction.Status.SUCCEEDED.name())) {
-                if (endTime.getTime() <= 
summaryBean.getExpectedEnd().getTime()) {
-                    summaryBean.setSLAStatus(SLAStatus.MET);
-                }
-                else {
-                    summaryBean.setSLAStatus(SLAStatus.MISS);
-                }
-            }
-            else {
-                summaryBean.setSLAStatus(SLAStatus.MISS);
-            }
-        }
-
-    }
-
-    private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte 
eventProc, Date startTime) {
-        if (((eventProc & 1) == 0)) {
-            eventProc += 1;
-            summaryBean.setEventProcessed(eventProc);
-        }
-        if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
-            summaryBean.setSLAStatus(SLAStatus.IN_PROCESS);
-        }
-        summaryBean.setActualStart(startTime);
-    }
-
     @Override
     public int size() {
         return slaMap.size();
     }
 
+    @VisibleForTesting
+    public Set<String> getHistorySet(){
+        return historySet;
+    }
+
     @Override
     public SLACalcStatus get(String jobId) throws JPAExecutorException {
         SLACalcStatus memObj;
         memObj = slaMap.get(jobId);
         if (memObj == null && historySet.contains(jobId)) {
-            memObj = new 
SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY,
 jobId),
-                    
SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART,
 jobId));
+            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), 
SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
         }
         return memObj;
     }
@@ -460,13 +202,13 @@ public class SLACalculatorMemory implements SLACalculator 
{
         SLACalcStatus memObj;
         memObj = slaMap.get(jobId);
         if (memObj == null) {
-            memObj = new 
SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY,
 jobId),
-                    
SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART,
 jobId));
+            memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance()
+                    .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), 
SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ON_RESTART, jobId));
         }
         return memObj;
     }
 
-
     @Override
     public Iterator<String> iterator() {
         return slaMap.keySet().iterator();
@@ -488,12 +230,17 @@ public class SLACalculatorMemory implements SLACalculator 
{
      */
     protected void updateJobSla(String jobId) throws Exception {
         SLACalcStatus slaCalc = slaMap.get(jobId);
+
+        if (slaCalc == null) {
+            // job might be processed and removed from map by addJobStatus
+            return;
+        }
         synchronized (slaCalc) {
-            boolean change = false;
             // get eventProcessed on DB for validation in HA
             SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) 
SLASummaryQueryExecutor.getInstance()).get(
                     
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
             byte eventProc = summaryBean.getEventProcessed();
+            slaCalc.setEventProcessed(eventProc);
             if (eventProc >= 7) {
                 if (eventProc == 7) {
                     historySet.add(jobId);
@@ -503,127 +250,69 @@ public class SLACalculatorMemory implements 
SLACalculator {
             }
             else {
                 if 
(!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
-                    //Update last modified time.
+                    // Update last modified time.
                     
slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
                     reloadExpectedTimeAndConfig(slaCalc);
                     LOG.debug("Last modified time has changed for job " + 
jobId + " reloading config from DB");
                 }
-                slaCalc.setEventProcessed(eventProc);
-                SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-                // calculation w.r.t current time and status
-                if ((eventProc & 1) == 0) { // first bit (start-processed) 
unset
-                    if (reg.getExpectedStart() != null) {
-                        if (reg.getExpectedStart().getTime() + jobEventLatency 
< System.currentTimeMillis()) {
-                            confirmWithDB(slaCalc);
-                            eventProc = slaCalc.getEventProcessed();
-                            if (eventProc != 8 && (eventProc & 1) == 0) {
-                                // Some DB exception
-                                slaCalc.setEventStatus(EventStatus.START_MISS);
-                                if (shouldAlert(slaCalc)) {
-                                    eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                                }
-                                eventProc++;
-                            }
-                            change = true;
-                        }
-                    }
-                    else {
-                        eventProc++; // disable further processing for 
optional start sla condition
-                        change = true;
-                    }
-                }
-                // check if second bit (duration-processed) is unset
-                if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
-                    if (reg.getExpectedDuration() == -1) {
-                        eventProc += 2;
-                        change = true;
-                    }
-                    else if (slaCalc.getActualStart() != null) {
-                        if ((reg.getExpectedDuration() + jobEventLatency) < 
(System.currentTimeMillis() - slaCalc
-                                .getActualStart().getTime())) {
-                            slaCalc.setEventProcessed(eventProc);
-                            confirmWithDB(slaCalc);
-                            eventProc = slaCalc.getEventProcessed();
-                            if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) 
{
-                                // Some DB exception
-                                
slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                                if (shouldAlert(slaCalc)) {
-                                    eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                                }
-                                eventProc += 2;
-                            }
-                            change = true;
-                        }
-                    }
-                }
-                if (eventProc < 4) {
-                    if (reg.getExpectedEnd().getTime() + jobEventLatency < 
System.currentTimeMillis()) {
-                        slaCalc.setEventProcessed(eventProc);
-                        confirmWithDB(slaCalc);
-                        eventProc = slaCalc.getEventProcessed();
-                        change = true;
-                    }
-                }
-                if (change) {
+                if (isChanged(slaCalc)) {
+                    LOG.debug("{0} job has SLA event change. EventProc = {1}, 
status = {2}", slaCalc.getId(),
+                            slaCalc.getEventProcessed(), 
slaCalc.getJobStatus());
                     try {
-                        boolean locked = true;
-                        slaCalc.acquireLock();
-                        locked = slaCalc.isLocked();
-                        if (locked) {
-                            // no more processing, no transfer to history set
-                            if (slaCalc.getEventProcessed() >= 8) {
-                                eventProc = 8;
-                                // Should not be > 8. But to handle any corner 
cases
-                                slaCalc.setEventProcessed(8);
-                                slaMap.remove(jobId);
-                                LOG.trace("Removed Job [{0}] from map after 
Event-processed=8", jobId);
-                            }
-                            else {
-                                slaCalc.setEventProcessed(eventProc);
-                            }
-                            writetoDB(slaCalc, eventProc);
-                            if (eventProc == 7) {
-                                historySet.add(jobId);
-                                slaMap.remove(jobId);
-                                LOG.trace("Removed Job [{0}] from map after 
Event-processed=7", jobId);
-                            }
-                        }
-                    }
-                    catch (InterruptedException e) {
-                        throw new XException(ErrorCode.E0606, slaCalc.getId(), 
slaCalc.getLockTimeOut());
+                        SLAXCommandFactory.getSLAEventXCommand(slaCalc).call();
+                        checkEventProc(slaCalc);
                     }
-                    finally {
-                        slaCalc.releaseLock();
+                    catch (XException e) {
+                        if (e.getErrorCode().equals(ErrorCode.E0604) || 
e.getErrorCode().equals(ErrorCode.E0605)) {
+                            LOG.debug("job [{0}] is is not in DB, removing 
from Memory", slaCalc.getId());
+                            slaMap.remove(jobId);
+                        }
                     }
                 }
+
             }
         }
     }
 
-    private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws 
JPAExecutorException {
-        SLASummaryBean slaSummaryBean = new SLASummaryBean();
-        slaSummaryBean.setId(slaCalc.getId());
-        slaSummaryBean.setEventProcessed(eventProc);
-        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-        slaSummaryBean.setActualStart(slaCalc.getActualStart());
-        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-        slaSummaryBean.setLastModifiedTime(new Date());
+    private boolean isChanged(SLACalcStatus slaCalc) {
+        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
+        byte eventProc = slaCalc.getEventProcessed();
 
-        
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
-                slaSummaryBean);
-        LOG.trace("Stored SLA SummaryBean Job [{0}] with 
Event-processed=[{1}]", slaCalc.getId(),
-                slaSummaryBean.getEventProcessed());
+        if ((eventProc & 1) == 0) { // first bit (start-processed) unset
+            if (reg.getExpectedStart() != null) {
+                if (reg.getExpectedStart().getTime() + jobEventLatency < 
System.currentTimeMillis()) {
+                    return true;
+                }
+            }
+            else {
+                return true;
+            }
+        }
+        if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) {
+            if (reg.getExpectedDuration() == -1) {
+                return true;
+            }
+            else if (slaCalc.getActualStart() != null) {
+                if ((reg.getExpectedDuration() + jobEventLatency) < 
(System.currentTimeMillis() - slaCalc
+                        .getActualStart().getTime())) {
+                    return true;
+                }
+            }
+        }
+        if (eventProc < 4) {
+            if (reg.getExpectedEnd().getTime() + jobEventLatency < 
System.currentTimeMillis()) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @SuppressWarnings("rawtypes")
-    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> 
updateList)
-            throws JPAExecutorException {
+    private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> 
updateList) throws JPAExecutorException {
         updateList.add(new 
UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, 
slaCalc.getSLARegistrationBean()));
         slaCalc.setLastModifiedTime(new Date());
-        updateList.add(new 
UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
 new SLASummaryBean(slaCalc)));
+        updateList.add(new 
UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME,
+                new SLASummaryBean(slaCalc)));
     }
 
     @SuppressWarnings("rawtypes")
@@ -641,7 +330,6 @@ public class SLACalculatorMemory implements SLACalculator {
         BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, 
updateList, null);
     }
 
-
     /**
      * Periodically run by the SLAService worker threads to update SLA status 
by
      * iterating through all the jobs in the map
@@ -770,410 +458,62 @@ public class SLACalculatorMemory implements 
SLACalculator {
     @Override
     public boolean addJobStatus(String jobId, String jobStatus, 
JobEvent.EventStatus jobEventStatus, Date startTime,
             Date endTime) throws JPAExecutorException, ServiceException {
+        LOG.debug(
+                "Received addJobStatus request for job  [{0}] jobStatus = 
[{1}], jobEventStatus = [{2}], startTime = [{3}], "
+                        + "endTime = [{4}] ", jobId, jobStatus, 
jobEventStatus, startTime, endTime);
         SLACalcStatus slaCalc = slaMap.get(jobId);
-        SLASummaryBean slaInfo = null;
-        boolean hasSla = false;
-        if (slaCalc == null) {
-            if (historySet.contains(jobId)) {
-                slaInfo = 
SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, 
jobId);
-                if (slaInfo == null) {
-                    throw new JPAExecutorException(ErrorCode.E0604, jobId);
-                }
-                slaInfo.setJobStatus(jobStatus);
-                slaInfo.setActualStart(startTime);
-                slaInfo.setActualEnd(endTime);
-                if (endTime != null) {
-                    slaInfo.setActualDuration(endTime.getTime() - 
startTime.getTime());
-                }
-                slaInfo.setEventProcessed(8);
-                historySet.remove(jobId);
-                slaInfo.setLastModifiedTime(new Date());
-                SLASummaryQueryExecutor.getInstance().executeUpdate(
-                        
SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
-                hasSla = true;
-            }
-            else if 
(Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) {
-                // jobid might not exist in slaMap in HA Setting
-                SLARegistrationBean slaRegBean = 
SLARegistrationQueryExecutor.getInstance().get(
-                        SLARegQuery.GET_SLA_REG_ALL, jobId);
-                if (slaRegBean != null) { // filter out jobs picked by SLA job 
event listener
-                                          // but not actually configured for 
SLA
-                    SLASummaryBean slaSummaryBean = 
SLASummaryQueryExecutor.getInstance().get(
-                            SLASummaryQuery.GET_SLA_SUMMARY, jobId);
-                    if (slaSummaryBean.getEventProcessed() < 7) {
-                        slaCalc = new SLACalcStatus(slaSummaryBean, 
slaRegBean);
-                        slaMap.put(jobId, slaCalc);
-                    }
-                }
-            }
-        }
-        if (slaCalc != null) {
-            synchronized (slaCalc) {
-                try {
-                    // only get ZK lock when multiple servers running
-                    boolean locked = true;
-                    slaCalc.acquireLock();
-                    locked = slaCalc.isLocked();
-                    if (locked) {
-                        // get eventProcessed on DB for validation in HA
-                        SLASummaryBean summaryBean = 
((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get(
-                                
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
-                        byte eventProc = summaryBean.getEventProcessed();
-
-                        if 
(!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
-                            //Update last modified time.
-                            
slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
-                            reloadExpectedTimeAndConfig(slaCalc);
-                            LOG.debug("Last modified time has changed for job 
" + jobId + " reloading config from DB");
-                        }
 
-                        slaCalc.setEventProcessed(eventProc);
-                        slaCalc.setJobStatus(jobStatus);
-                        switch (jobEventStatus) {
-                            case STARTED:
-                                slaInfo = processJobStartSLA(slaCalc, 
startTime);
-                                break;
-                            case SUCCESS:
-                                slaInfo = processJobEndSuccessSLA(slaCalc, 
startTime, endTime);
-                                break;
-                            case FAILURE:
-                                slaInfo = processJobEndFailureSLA(slaCalc, 
startTime, endTime);
-                                break;
-                            default:
-                                LOG.debug("Unknown Job Status for SLA 
purpose[{0}]", jobEventStatus);
-                                slaInfo = getSLASummaryBean(slaCalc);
-                        }
-                        if (slaCalc.getEventProcessed() == 7) {
-                            slaInfo.setEventProcessed(8);
-                            slaMap.remove(jobId);
-                        }
-                        slaInfo.setLastModifiedTime(new Date());
-                        SLASummaryQueryExecutor.getInstance().executeUpdate(
-                                
SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo);
-                        hasSla = true;
-                    }
-                }
-                catch (InterruptedException e) {
-                    throw new ServiceException(ErrorCode.E0606, 
slaCalc.getEntityKey(), slaCalc.getLockTimeOut());
-                }
-                finally {
-                    slaCalc.releaseLock();
-                }
+        if (slaCalc == null) {
+            SLARegistrationBean slaRegBean = 
SLARegistrationQueryExecutor.getInstance().get(
+                    SLARegQuery.GET_SLA_REG_ALL, jobId);
+            if (slaRegBean != null) { // filter out jobs picked by SLA job 
event listener
+                                      // but not actually configured for SLA
+                SLASummaryBean slaSummaryBean = 
SLASummaryQueryExecutor.getInstance().get(
+                        SLASummaryQuery.GET_SLA_SUMMARY, jobId);
+                slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean);
+                slaMap.put(jobId, slaCalc);
             }
-            LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + 
slaCalc.getSLAStatus());
         }
+        else {
+            SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) 
SLASummaryQueryExecutor.getInstance()).get(
+                    
SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId);
+            byte eventProc = summaryBean.getEventProcessed();
+            if 
(!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) {
+                // Update last modified time.
+                slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime());
+                reloadExpectedTimeAndConfig(slaCalc);
+                LOG.debug("Last modified time has changed for job " + jobId + 
" reloading config from DB");
 
-        return hasSla;
-    }
-
-    /**
-     * Process SLA for jobs that started running. Also update actual-start time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @return SLASummaryBean
-     */
-    private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date 
actualStart) {
-        slaCalc.setActualStart(actualStart);
-        if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) {
-            slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
-        }
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        Date expecStart = reg.getExpectedStart();
-        byte eventProc = slaCalc.getEventProcessed();
-        // set event proc here
-        if (((eventProc & 1) == 0)) {
-            if (expecStart != null) {
-                if (actualStart.getTime() > expecStart.getTime()) {
-                    slaCalc.setEventStatus(EventStatus.START_MISS);
-                }
-                else {
-                    slaCalc.setEventStatus(EventStatus.START_MET);
-                }
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
             }
-            eventProc += 1;
-            slaCalc.setEventProcessed(eventProc);
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    /**
-     * Process SLA for jobs that ended successfully. Also update actual-start
-     * and end time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @param actualEnd
-     * @return SLASummaryBean
-     * @throws JPAExecutorException
-     */
-    private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) throws JPAExecutorException {
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        slaCalc.setActualStart(actualStart);
-        slaCalc.setActualEnd(actualEnd);
-        long expectedDuration = reg.getExpectedDuration();
-        long actualDuration = actualEnd.getTime() - actualStart.getTime();
-        slaCalc.setActualDuration(actualDuration);
-        //check event proc
-        byte eventProc = slaCalc.getEventProcessed();
-        if (((eventProc >> 1) & 1) == 0) {
-            processDurationSLA(expectedDuration, actualDuration, slaCalc);
-            eventProc += 2;
             slaCalc.setEventProcessed(eventProc);
         }
-
-        if (eventProc < 4) {
-            Date expectedEnd = reg.getExpectedEnd();
-            if (actualEnd.getTime() > expectedEnd.getTime()) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-            }
-            else {
-                slaCalc.setEventStatus(EventStatus.END_MET);
-                slaCalc.setSLAStatus(SLAStatus.MET);
-            }
-            eventProc += 4;
-            slaCalc.setEventProcessed(eventProc);
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
+        if (slaCalc != null) {
+            try {
+                SLAXCommandFactory.getSLAEventXCommand(slaCalc,
+                        
ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 
1000)).call();
+                checkEventProc(slaCalc);
             }
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    /**
-     * Process SLA for jobs that ended in failure. Also update actual-start and
-     * end time
-     *
-     * @param slaCalc
-     * @param actualStart
-     * @param actualEnd
-     * @return SLASummaryBean
-     * @throws JPAExecutorException
-     */
-    private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date 
actualStart, Date actualEnd) throws JPAExecutorException {
-        slaCalc.setActualStart(actualStart);
-        slaCalc.setActualEnd(actualEnd);
-        if (actualStart == null) { // job failed before starting
-            if (slaCalc.getEventProcessed() < 4) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-                slaCalc.setEventProcessed(7);
-                return getSLASummaryBean(slaCalc);
+            catch (XException e) {
+                LOG.error(e);
+                throw new ServiceException(e);
             }
+            return true;
         }
-        SLARegistrationBean reg = slaCalc.getSLARegistrationBean();
-        long expectedDuration = reg.getExpectedDuration();
-        long actualDuration = actualEnd.getTime() - actualStart.getTime();
-        slaCalc.setActualDuration(actualDuration);
-
-        byte eventProc = slaCalc.getEventProcessed();
-        if (((eventProc >> 1) & 1) == 0) {
-            if (expectedDuration != -1) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-            }
-            eventProc += 2;
-            slaCalc.setEventProcessed(eventProc);
-        }
-        if (eventProc < 4) {
-            slaCalc.setEventStatus(EventStatus.END_MISS);
-            slaCalc.setSLAStatus(SLAStatus.MISS);
-            eventProc += 4;
-            slaCalc.setEventProcessed(eventProc);
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-            }
-        }
-        return getSLASummaryBean(slaCalc);
-    }
-
-    private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) {
-        SLASummaryBean slaSummaryBean = new SLASummaryBean();
-        slaSummaryBean.setActualStart(slaCalc.getActualStart());
-        slaSummaryBean.setActualEnd(slaCalc.getActualEnd());
-        slaSummaryBean.setActualDuration(slaCalc.getActualDuration());
-        slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus());
-        slaSummaryBean.setEventStatus(slaCalc.getEventStatus());
-        slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed());
-        slaSummaryBean.setId(slaCalc.getId());
-        slaSummaryBean.setJobStatus(slaCalc.getJobStatus());
-        return slaSummaryBean;
-    }
-
-    private void processDurationSLA(long expected, long actual, SLACalcStatus 
slaCalc) {
-        if (expected != -1) {
-            if (actual > expected) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-            }
-            else if (actual <= expected) {
-                slaCalc.setEventStatus(EventStatus.DURATION_MET);
-            }
-            if (shouldAlert(slaCalc)) {
-                eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-            }
+        else {
+            return false;
         }
     }
 
-    /*
-     * Confirm alerts against source of truth - DB. Also required in case of 
High Availability
-     */
-    private void confirmWithDB(SLACalcStatus slaCalc) {
-        boolean ended = false, isEndMiss = false;
-        try {
-            switch (slaCalc.getAppType()) {
-                case WORKFLOW_JOB:
-                    WorkflowJobBean wf = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (wf.getEndTime() != null) {
-                        ended = true;
-                        if (wf.getStatus() == WorkflowJob.Status.KILLED || 
wf.getStatus() == WorkflowJob.Status.FAILED
-                                || wf.getEndTime().getTime() > 
slaCalc.getExpectedEnd().getTime()) {
-                            isEndMiss = true;
-                        }
-                    }
-                    slaCalc.setActualStart(wf.getStartTime());
-                    slaCalc.setActualEnd(wf.getEndTime());
-                    slaCalc.setJobStatus(wf.getStatusStr());
-                    break;
-                case WORKFLOW_ACTION:
-                    WorkflowActionBean wa = jpaService.execute(new 
WorkflowActionGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (wa.getEndTime() != null) {
-                        ended = true;
-                        if (wa.isTerminalWithFailure()
-                                || wa.getEndTime().getTime() > 
slaCalc.getExpectedEnd().getTime()) {
-                            isEndMiss = true;
-                        }
-                    }
-                    slaCalc.setActualStart(wa.getStartTime());
-                    slaCalc.setActualEnd(wa.getEndTime());
-                    slaCalc.setJobStatus(wa.getStatusStr());
-                    break;
-                case COORDINATOR_ACTION:
-                    CoordinatorActionBean ca = jpaService.execute(new 
CoordActionGetForSLAJPAExecutor(slaCalc.getId()));
-                    if (ca.isTerminalWithFailure()) {
-                        isEndMiss = ended = true;
-                        slaCalc.setActualEnd(ca.getLastModifiedTime());
-                    }
-                    if (ca.getExternalId() != null) {
-                        wf = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(ca.getExternalId()));
-                        if (wf.getEndTime() != null) {
-                            ended = true;
-                            if (wf.getEndTime().getTime() > 
slaCalc.getExpectedEnd().getTime()) {
-                                isEndMiss = true;
-                            }
-                        }
-                        slaCalc.setActualEnd(wf.getEndTime());
-                        slaCalc.setActualStart(wf.getStartTime());
-                    }
-                    slaCalc.setJobStatus(ca.getStatusStr());
-                    break;
-                default:
-                    LOG.debug("Unsupported App-type for SLA - " + 
slaCalc.getAppType());
-            }
-
-            byte eventProc = slaCalc.getEventProcessed();
-            if (ended) {
-                if (isEndMiss) {
-                    slaCalc.setSLAStatus(SLAStatus.MISS);
-                }
-                else {
-                    slaCalc.setSLAStatus(SLAStatus.MET);
-                }
-                if (slaCalc.getActualStart() != null) {
-                    if ((eventProc & 1) == 0) {
-                        if (slaCalc.getExpectedStart().getTime() < 
slaCalc.getActualStart().getTime()) {
-                            slaCalc.setEventStatus(EventStatus.START_MISS);
-                        }
-                        else {
-                            slaCalc.setEventStatus(EventStatus.START_MET);
-                        }
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                        }
-                    }
-                    slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() 
- slaCalc.getActualStart().getTime());
-                    if (((eventProc >> 1) & 1) == 0) {
-                        processDurationSLA(slaCalc.getExpectedDuration(), 
slaCalc.getActualDuration(), slaCalc);
-                    }
-                }
-                if (eventProc < 4) {
-                    if (isEndMiss) {
-                        slaCalc.setEventStatus(EventStatus.END_MISS);
-                    }
-                    else {
-                        slaCalc.setEventStatus(EventStatus.END_MET);
-                    }
-                    if (shouldAlert(slaCalc)) {
-                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                    }
-                }
-                slaCalc.setEventProcessed(8);
-            }
-            else {
-                if (slaCalc.getActualStart() != null) {
-                    slaCalc.setSLAStatus(SLAStatus.IN_PROCESS);
-                }
-                if ((eventProc & 1) == 0) {
-                    if (slaCalc.getActualStart() != null) {
-                        if (slaCalc.getExpectedStart().getTime() < 
slaCalc.getActualStart().getTime()) {
-                            slaCalc.setEventStatus(EventStatus.START_MISS);
-                        }
-                        else {
-                            slaCalc.setEventStatus(EventStatus.START_MET);
-                        }
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                        }
-                        eventProc++;
-                    }
-                    else if (slaCalc.getExpectedStart().getTime() < 
System.currentTimeMillis()) {
-                        slaCalc.setEventStatus(EventStatus.START_MISS);
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                        }
-                        eventProc++;
-                    }
-                }
-                if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != 
null
-                        && slaCalc.getExpectedDuration() != -1) {
-                    if (System.currentTimeMillis() - 
slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) {
-                        slaCalc.setEventStatus(EventStatus.DURATION_MISS);
-                        if (shouldAlert(slaCalc)) {
-                            eventHandler.queueEvent(new 
SLACalcStatus(slaCalc));
-                        }
-                        eventProc += 2;
-                    }
-                }
-                if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < 
System.currentTimeMillis()) {
-                    slaCalc.setEventStatus(EventStatus.END_MISS);
-                    slaCalc.setSLAStatus(SLAStatus.MISS);
-                    if (shouldAlert(slaCalc)) {
-                        eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                    }
-                    eventProc += 4;
-                }
-                slaCalc.setEventProcessed(eventProc);
-            }
+    private void checkEventProc(SLACalcStatus slaCalc){
+        byte eventProc = slaCalc.getEventProcessed();
+        if (slaCalc.getEventProcessed() >= 8) {
+            slaMap.remove(slaCalc.getId());
+            LOG.debug("Removed Job [{0}] from map after Event-processed=8", 
slaCalc.getId());
         }
-        catch (Exception e) {
-            LOG.warn("Error while confirming SLA against DB for jobid= " + 
slaCalc.getId() + ". Exception is "
-                    + e.getClass().getName() + ": " + e.getMessage());
-            if (slaCalc.getEventProcessed() < 4 && 
slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) {
-                slaCalc.setEventStatus(EventStatus.END_MISS);
-                slaCalc.setSLAStatus(SLAStatus.MISS);
-                if (shouldAlert(slaCalc)) {
-                    eventHandler.queueEvent(new SLACalcStatus(slaCalc));
-                }
-                slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4);
-            }
+        if (eventProc == 7) {
+            historySet.add(slaCalc.getId());
+            slaMap.remove(slaCalc.getId());
+            LOG.debug("Removed Job [{0}] from map after Event-processed=7", 
slaCalc.getId());
         }
     }
 
@@ -1235,21 +575,21 @@ public class SLACalculatorMemory implements 
SLACalculator {
         return enableAlert(getSLAJobsforParents(parentJobIds));
     }
 
-
     @Override
     public boolean disableAlert(List<String> jobIds) throws 
JPAExecutorException, ServiceException {
         boolean isJobFound = false;
         @SuppressWarnings("rawtypes")
         List<UpdateEntry> updateList = new 
ArrayList<BatchQueryExecutor.UpdateEntry>();
 
-            for (String jobId : jobIds) {
-                SLACalcStatus slaCalc = getSLACalcStatus(jobId);
-                if (slaCalc != null) {
-                    
slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
 Boolean.toString(true));
-                    updateDBSlaConfig(slaCalc, updateList);
-                    isJobFound = true;
-                }
+        for (String jobId : jobIds) {
+            SLACalcStatus slaCalc = getSLACalcStatus(jobId);
+            if (slaCalc != null) {
+                
slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT,
+                        Boolean.toString(true));
+                updateDBSlaConfig(slaCalc, updateList);
+                isJobFound = true;
             }
+        }
         executeBatchQuery(updateList);
         return isJobFound;
     }
@@ -1260,19 +600,19 @@ public class SLACalculatorMemory implements 
SLACalculator {
     }
 
     @Override
-    public boolean changeDefinition(List<Pair<String, Map<String,String>>> 
jobIdsSLAPair ) throws JPAExecutorException,
-            ServiceException{
+    public boolean changeDefinition(List<Pair<String, Map<String, String>>> 
jobIdsSLAPair) throws JPAExecutorException,
+            ServiceException {
         boolean isJobFound = false;
         @SuppressWarnings("rawtypes")
         List<UpdateEntry> updateList = new 
ArrayList<BatchQueryExecutor.UpdateEntry>();
-            for (Pair<String, Map<String,String>> jobIdSLAPair : 
jobIdsSLAPair) {
-                SLACalcStatus slaCalc = 
getSLACalcStatus(jobIdSLAPair.getFist());
-                if (slaCalc != null) {
-                    updateParams(slaCalc, jobIdSLAPair.getSecond());
-                    updateDBSlaExpectedValues(slaCalc, updateList);
-                    isJobFound = true;
-                }
+        for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) {
+            SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist());
+            if (slaCalc != null) {
+                updateParams(slaCalc, jobIdSLAPair.getSecond());
+                updateDBSlaExpectedValues(slaCalc, updateList);
+                isJobFound = true;
             }
+        }
         executeBatchQuery(updateList);
         return isJobFound;
     }
@@ -1292,11 +632,7 @@ public class SLACalculatorMemory implements SLACalculator 
{
         }
     }
 
-    private boolean shouldAlert(SLACalcStatus slaObj) {
-        return 
!slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT);
-    }
-
-    private List<String> getSLAJobsforParents(List<String> parentJobIds) 
throws JPAExecutorException{
+    private List<String> getSLAJobsforParents(List<String> parentJobIds) 
throws JPAExecutorException {
         List<String> childJobIds = new ArrayList<String>();
         for (String jobId : parentJobIds) {
             List<SLARegistrationBean> registrationBeanList = 
SLARegistrationQueryExecutor.getInstance().getList(
@@ -1307,4 +643,5 @@ public class SLACalculatorMemory implements SLACalculator {
         }
         return childJobIds;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java 
b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
index ef1ea98..3b2cebd 100644
--- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
+++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java
@@ -52,8 +52,6 @@ import org.json.simple.JSONObject;
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = 
"update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = 
:eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, 
w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, 
w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId 
= :jobId"),
 
- @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update 
SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = 
:actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, 
w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where 
w.jobId = :jobId"),
-
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update 
SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = 
:expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = 
:expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"),
 
  @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update 
SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = 
:jobId"),

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java 
b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
new file mode 100644
index 0000000..e6298f5
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.sla;
+
+import org.apache.oozie.command.sla.SLACoordActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLACoordActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAJobEventXCommand;
+import org.apache.oozie.command.sla.SLAJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowActionJobHistoryXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobEventXCommand;
+import org.apache.oozie.command.sla.SLAWorkflowJobHistoryXCommand;
+
+/**
+ * A factory for creating SLACommand objects.
+ */
+public class SLAXCommandFactory {
+
+    /**
+     * Gets the SLA job history  command.
+     *
+     * @param jobId the job id
+     * @return the SLA job history x command
+     */
+    public static SLAJobHistoryXCommand getSLAJobHistoryXCommand(String jobId) 
{
+        if (jobId.endsWith("-W")) {
+            return new SLAWorkflowJobHistoryXCommand(jobId);
+        }
+        else if (jobId.contains("-W@")) {
+            return new SLAWorkflowActionJobHistoryXCommand(jobId);
+
+        }
+        else if (jobId.contains("-C@")) {
+            return new SLACoordActionJobHistoryXCommand(jobId);
+        }
+
+        else {
+            return null;
+        }
+    }
+
+    /**
+     * Gets the SLAevent  command.
+     *
+     * @param slaCalc the sla calc
+     * @return the SLA event x command
+     */
+    public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus 
slaCalc) {
+        return getSLAEventXCommand(slaCalc, 0);
+    }
+
+    /**
+     * Gets the SLA event x command.
+     *
+     * @param slaCalc the sla calc
+     * @param lockTimeOut the lock time out
+     * @return the SLA event x command
+     */
+    public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus 
slaCalc, long lockTimeOut) {
+        if (slaCalc.getId().endsWith("-W")) {
+            return new SLAWorkflowJobEventXCommand(slaCalc, lockTimeOut);
+        }
+        else if (slaCalc.getId().contains("-W@")) {
+            return new SLAWorkflowActionJobEventXCommand(slaCalc, lockTimeOut);
+
+        }
+        else if (slaCalc.getId().contains("-C@")) {
+            return new SLACoordActionJobEventXCommand(slaCalc, lockTimeOut);
+        }
+
+        else {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
index 52560e6..e239084 100644
--- 
a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
+++ 
b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.command.coord;
 
 import java.util.Date;
+
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.WorkflowJobBean;
@@ -28,9 +29,10 @@ import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor;
 import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
-import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
+import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -84,7 +86,9 @@ public class TestCoordActionsKillXCommand extends 
XDataTestCase {
         assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
 
         sleep(100);
-        WorkflowJobBean wf = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(ids[3]));
+        WorkflowJobBean wf = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                ids[3]);
+
         assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
 
         CoordinatorJobBean job = jpaService.execute(new 
CoordJobGetJPAExecutor(ids[0]));
@@ -118,7 +122,8 @@ public class TestCoordActionsKillXCommand extends 
XDataTestCase {
         assertEquals(CoordinatorAction.Status.KILLED, action.getStatus());
 
         sleep(100);
-        WorkflowJobBean wf = jpaService.execute(new 
WorkflowJobGetForSLAJPAExecutor(ids[3]));
+        WorkflowJobBean wf = 
WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA,
+                ids[3]);
         assertEquals(WorkflowJob.Status.KILLED, wf.getStatus());
 
         CoordinatorJobBean job = jpaService.execute(new 
CoordJobGetJPAExecutor(ids[0]));

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
index 5914b3b..1daecdc 100644
--- 
a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java
@@ -131,7 +131,7 @@ public class TestSLASummaryQueryExecutor extends 
XDataTestCase {
         bean.setActualDuration(endTime.getTime() - startTime.getTime());
         bean.setLastModifiedTime(new Date());
         bean.setEventProcessed(8);
-        
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES,
 bean);
+        
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES,
 bean);
         retBean = 
SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, 
bean.getId());
         assertEquals(bean.getActualStartTimestamp(), 
retBean.getActualStartTimestamp());
         assertEquals(bean.getActualEndTimestamp(), 
retBean.getActualEndTimestamp());

http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java 
b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
index 795db37..3af263e 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java
@@ -101,17 +101,18 @@ public class TestHASLAService extends ZKXTestCase {
 
             // Case 1 workflow job submitted to dummy server,
             // but before start running, the dummy server is down
-            createWorkflow("job-1");
-            SLARegistrationBean sla1 = 
TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB);
+            WorkflowJobBean wfJob1 = createWorkflow("job-1-W");
+            SLARegistrationBean sla1 = 
TestSLAService._createSLARegistration("job-1-W", AppType.WORKFLOW_JOB);
             sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 
3600 * 1000)); // 2 hr before
             sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 
* 1000)); // 1 hr before
             sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins
             dummyCalc.addRegistration(sla1.getId(), sla1);
+            dummyCalc.updateAllSlaStatus();
 
             // Case 2. workflow job submitted to dummy server, start running,
             // then the dummy server is down
-            createWorkflow("job-2");
-            SLARegistrationBean sla2 = 
TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB);
+            WorkflowJobBean wfJob2 = createWorkflow("job-2-W");
+            SLARegistrationBean sla2 = 
TestSLAService._createSLARegistration("job-2-W", AppType.WORKFLOW_JOB);
             sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 
3600 * 1000)); // 2hr before
             sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 
* 1000)); // 1hr ahead
             sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins
@@ -138,6 +139,12 @@ public class TestHASLAService extends ZKXTestCase {
             ehs.new EventWorker().run();
             assertTrue(output.toString().contains(sla1.getId() + " Sla START - 
MISS!!!"));
 
+            wfJob1.setStatus(WorkflowJob.Status.SUCCEEDED);
+            wfJob1.setEndTime(new Date());
+            wfJob1.setStartTime(new Date());
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob1);
+
             // Job 1 succeeded on the living server --> duration met and end 
miss
             slaCalcMem.addJobStatus(sla1.getId(), 
WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
                     new Date());
@@ -145,6 +152,12 @@ public class TestHASLAService extends ZKXTestCase {
             assertTrue(output.toString().contains(sla1.getId() + " Sla 
DURATION - MET!!!"));
             assertTrue(output.toString().contains(sla1.getId() + " Sla END - 
MISS!!!"));
 
+            wfJob2.setStatus(WorkflowJob.Status.SUCCEEDED);
+            wfJob2.setEndTime(new Date());
+            wfJob2.setStartTime(new Date());
+            WorkflowJobQueryExecutor.getInstance().executeUpdate(
+                    
WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2);
+
             // Job 2 succeeded on the living server --> duration met and end 
met
             slaCalcMem.addJobStatus(sla2.getId(), 
WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(),
                     new Date());
@@ -161,13 +174,14 @@ public class TestHASLAService extends ZKXTestCase {
         }
     }
 
-    public void updateCoordAction(String id, String status) throws 
JPAExecutorException {
+    public CoordinatorActionBean updateCoordAction(String id, String status) 
throws JPAExecutorException {
         CoordinatorActionBean action = new CoordinatorActionBean();
         action.setId(id);
         action.setStatusStr(status);
         action.setLastModifiedTime(new Date());
         
CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME,
                 action);
+        return action;
     }
 
     public void testSLAUpdateWithHA() throws Exception {
@@ -187,8 +201,8 @@ public class TestHASLAService extends ZKXTestCase {
         createDBEntry(id3, expectedStartTS, expectedEndTS1);
         createDBEntry(id4, expectedStartTS, expectedEndTS1);
         // Coord Action of jobs 5-6 already started and currently running (to 
test history set)
-        createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2);
-        createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2);
+        createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2, 1);
+        createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2, 1);
 
         SLAService slas = Services.get().get(SLAService.class);
         SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) 
slas.getSLACalculator();
@@ -198,7 +212,10 @@ public class TestHASLAService extends ZKXTestCase {
         while (itr.hasNext()) {
             slaMapKeys.add(itr.next());
         }
-        assertEquals(6, slaMapKeys.size());
+        //4 jobs expected end is not yet reached
+        //2 jobs has end miss, waiting for job to complete
+        assertEquals(4, slaMapKeys.size());
+        assertEquals(2, slaCalcMem.getHistorySet().size());
 
         DummyZKOozie dummyOozie_1 = null;
         try {
@@ -214,8 +231,8 @@ public class TestHASLAService extends ZKXTestCase {
             while (itr.hasNext()) {
                 slaMapKeys.add(itr.next());
             }
-            assertEquals(6, slaMapKeys.size());
-
+            assertEquals(4, slaMapKeys.size());
+            assertEquals(2, dummySlaCalcMem.getHistorySet().size());
             // Coord Action 1,3 run and update status on *non-dummy* server
             updateCoordAction(id1, "RUNNING");
             slaCalcMem
@@ -314,14 +331,12 @@ public class TestHASLAService extends ZKXTestCase {
 
     public void testNoDuplicateEventsInHA() throws Exception {
         String id1 = "0000001-130521183438837-oozie-test-C@1";
-        Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 
* 1000); // get MISS
-        Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 
1000); // get MISS
-        createDBEntry(id1, expectedStartTS, expectedEndTS);
 
         SLAService slas = Services.get().get(SLAService.class);
         SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) 
slas.getSLACalculator();
         slaCalcMem.init(Services.get().getConf()); // loads the job in sla map
 
+
         EventHandlerService ehs = 
Services.get().get(EventHandlerService.class);
         EventQueue ehs_q = ehs.getEventQueue();
 
@@ -336,20 +351,30 @@ public class TestHASLAService extends ZKXTestCase {
             dummyEhs.init(Services.get());
             EventQueue dummyEhs_q = dummyEhs.getEventQueue();
 
+            Date expectedStartTS = new Date(System.currentTimeMillis() + 2 * 
3600 * 1000); // get MISS
+            Date expectedEndTS = new Date(System.currentTimeMillis() + 1 * 
3600 * 1000); // get MISS
+            SLASummaryBean sla = createDBEntryForStarted(id1, expectedStartTS, 
expectedEndTS, 0);
+            sla.setExpectedDuration(-1);
+            sla.setLastModifiedTime(new Date());
+            
SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES,
+                    sla);
+
             // Action started on Server 1
             updateCoordAction(id1, "RUNNING");
+
             slaCalcMem
                     .addJobStatus(id1, 
CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null);
+            assertEquals(1, ehs_q.size());
             SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll();
-            assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
 
+            assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus());
             // Action ended on Server 2
             updateCoordAction(id1, "FAILED");
             dummySlaCalcMem.addJobStatus(id1, 
CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date(
                     System.currentTimeMillis() - 1800 * 1000),
                     new Date());
-            dummyEhs_q.poll(); // getting rid of the duration_miss event
             SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll();
+
             assertEquals(SLAStatus.MISS, s2.getSLAStatus());
 
             slaCalcMem.updateAllSlaStatus();
@@ -464,7 +489,8 @@ public class TestHASLAService extends ZKXTestCase {
         
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, 
null, null);
     }
 
-    private void createDBEntryForStarted(String actionId, Date 
expectedStartTS, Date expectedEndTS) throws Exception {
+    private SLASummaryBean createDBEntryForStarted(String actionId, Date 
expectedStartTS, Date expectedEndTS,
+            int eventProcessed) throws Exception {
         ArrayList<JsonBean> insertList = new ArrayList<JsonBean>();
         Date modTime = new Date();
         WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, 
WorkflowInstance.Status.RUNNING,
@@ -494,7 +520,7 @@ public class TestHASLAService extends ZKXTestCase {
         sla.setAppType(AppType.COORDINATOR_ACTION);
         sla.setJobStatus("RUNNING");
         sla.setSLAStatus(SLAStatus.IN_PROCESS);
-        sla.setEventProcessed(1);
+        sla.setEventProcessed(eventProcessed);
         sla.setLastModifiedTime(modTime);
         sla.setExpectedStart(expectedStartTS);
         sla.setActualStart(expectedStartTS);
@@ -508,9 +534,10 @@ public class TestHASLAService extends ZKXTestCase {
         insertList.add(sla);
         insertList.add(reg);
         
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, 
null, null);
+        return sla;
     }
 
-    private void createWorkflow(String id) throws Exception {
+    private WorkflowJobBean createWorkflow(String id) throws Exception {
         List<JsonBean> insertList = new ArrayList<JsonBean>();
         WorkflowJobBean workflow = new WorkflowJobBean();
         workflow.setId(id);
@@ -519,6 +546,7 @@ public class TestHASLAService extends ZKXTestCase {
         workflow.setSlaXml("<sla></sla>");
         insertList.add(workflow);
         
BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, 
null, null);
+        return workflow;
     }
 
     public static class DummySLAEventListener extends SLAEventListener {

Reply via email to