http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index 42313fd..e8638a9 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -31,47 +31,28 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.AppType; -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.XException; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.event.JobEvent; -import org.apache.oozie.client.event.SLAEvent.EventStatus; import org.apache.oozie.client.event.SLAEvent.SLAStatus; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; -import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; -import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; -import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; -import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; -import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; -import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; -import org.apache.oozie.lock.LockToken; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; -import org.apache.oozie.service.JobsConcurrencyService; -import org.apache.oozie.service.MemoryLocksService; import org.apache.oozie.service.SchedulerService; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; @@ -80,8 +61,8 @@ import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; import org.apache.oozie.util.Pair; -import com.google.common.annotations.VisibleForTesting; +import com.google.common.annotations.VisibleForTesting; /** * Implementation class for SLACalculator that calculates SLA related to @@ -111,14 +92,14 @@ public class SLACalculatorMemory implements SLACalculator { modifiedAfter = conf.getInt(SLAService.CONF_EVENTS_MODIFIED_AFTER, 7); loadOnRestart(); Runnable purgeThread = new HistoryPurgeWorker(); - // schedule runnable by default 1 day + // schedule runnable by default 1 hours Services.get() .get(SchedulerService.class) - .schedule(purgeThread, 86400, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 86400), + .schedule(purgeThread, 3600, Services.get().getConf().getInt(SLAService.CONF_SLA_HISTORY_PURGE_INTERVAL, 3600), SchedulerService.Unit.SEC); } - public class HistoryPurgeWorker implements Runnable { + public class HistoryPurgeWorker extends Thread { public HistoryPurgeWorker() { } @@ -131,327 +112,88 @@ public class SLACalculatorMemory implements SLACalculator { Iterator<String> jobItr = historySet.iterator(); while (jobItr.hasNext()) { String jobId = jobItr.next(); - - if (jobId.endsWith("-W")) { - WorkflowJobBean wfJob = null; - try { - wfJob = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_STATUS, jobId); - } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0604)) { - jobItr.remove(); - } - else { - LOG.info("Failed to fetch the workflow job: " + jobId, e); - } - } - if (wfJob != null && wfJob.inTerminalState()) { - try { - updateSLASummary(wfJob.getId(), wfJob.getStartTime(), wfJob.getEndTime()); - jobItr.remove(); - } - catch (JPAExecutorException e) { - LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); - } - - } - } - else if (jobId.contains("-W@")) { - WorkflowActionBean wfAction = null; - try { - wfAction = WorkflowActionQueryExecutor.getInstance().get( - WorkflowActionQuery.GET_ACTION_COMPLETED, jobId); - } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0605)) { - jobItr.remove(); - } - else { - LOG.info("Failed to fetch the workflow action: " + jobId, e); - } - } - if (wfAction != null && (wfAction.isComplete() || wfAction.isTerminalWithFailure())) { - try { - updateSLASummary(wfAction.getId(), wfAction.getStartTime(), wfAction.getEndTime()); - jobItr.remove(); - } - catch (JPAExecutorException e) { - LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); - } - } - } - else if (jobId.contains("-C@")) { - CoordinatorActionBean cAction = null; - try { - cAction = CoordActionQueryExecutor.getInstance().get(CoordActionQuery.GET_COORD_ACTION, jobId); - } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0605)) { - jobItr.remove(); - } - else { - LOG.info("Failed to fetch the coord action: " + jobId, e); - } - } - if (cAction != null && cAction.isTerminalStatus()) { - try { - updateSLASummaryForCoordAction(cAction); - jobItr.remove(); - } - catch (JPAExecutorException e) { - XLog.getLog(SLACalculatorMemory.class).info( - "Failed to update SLASummaryBean when purging history set entry for " + jobId, e); - } - + LOG.debug(" Running HistoryPurgeWorker for " + jobId); + try { + boolean isDone = SLAXCommandFactory.getSLAJobHistoryXCommand(jobId).call(); + if (isDone) { + LOG.debug("[{0}] job is finished and processed. Removing from history"); + jobItr.remove(); } } - else if (jobId.endsWith("-C")) { - CoordinatorJobBean cJob = null; - try { - cJob = CoordJobQueryExecutor.getInstance().get(CoordJobQuery.GET_COORD_JOB_STATUS_PARENTID, - jobId); - } - catch (JPAExecutorException e) { - if (e.getErrorCode().equals(ErrorCode.E0604)) { - jobItr.remove(); - } - else { - LOG.info("Failed to fetch the coord job: " + jobId, e); - } + catch (CommandException e) { + if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { + LOG.warn("Job is not found in db: " + jobId, e); + jobItr.remove(); } - if (cJob != null && cJob.isTerminalStatus()) { - try { - updateSLASummary(cJob.getId(), cJob.getStartTime(), cJob.getEndTime()); - jobItr.remove(); - } - catch (JPAExecutorException e) { - LOG.info("Failed to update SLASummaryBean when purging history set entry for " + jobId, e); - } - + else { + LOG.error("Failed to fetch the job: " + jobId, e); } } } } - - private void updateSLASummary(String id, Date startTime, Date endTime) throws JPAExecutorException { - SLASummaryBean sla = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, id); - if (sla != null) { - sla.setActualStart(startTime); - sla.setActualEnd(endTime); - if (startTime != null && endTime != null) { - sla.setActualDuration(endTime.getTime() - startTime.getTime()); - } - sla.setLastModifiedTime(new Date()); - sla.setEventProcessed(8); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, sla); - } - } - - private void updateSLASummaryForCoordAction(CoordinatorActionBean bean) throws JPAExecutorException { - String wrkflowId = bean.getExternalId(); - if (wrkflowId != null) { - WorkflowJobBean wrkflow = WorkflowJobQueryExecutor.getInstance().get( - WorkflowJobQuery.GET_WORKFLOW_START_END_TIME, wrkflowId); - if (wrkflow != null) { - updateSLASummary(bean.getId(), wrkflow.getStartTime(), wrkflow.getEndTime()); - } - } - } } private void loadOnRestart() { - boolean isJobModified = false; + long slaPendingCount = 0; + long statusPendingCount = 0; + try { - long slaPendingCount = 0; - long statusPendingCount = 0; List<SLASummaryBean> summaryBeans = jpaService.execute(new SLASummaryGetRecordsOnRestartJPAExecutor( modifiedAfter)); for (SLASummaryBean summaryBean : summaryBeans) { String jobId = summaryBean.getId(); - LockToken lock = null; - switch (summaryBean.getAppType()) { - case COORDINATOR_ACTION: - isJobModified = processSummaryBeanForCoordAction(summaryBean, jobId); - break; - case WORKFLOW_ACTION: - isJobModified = processSummaryBeanForWorkflowAction(summaryBean, jobId); - break; - case WORKFLOW_JOB: - isJobModified = processSummaryBeanForWorkflowJob(summaryBean, jobId); - break; - default: - break; - } - if (isJobModified) { - try { - boolean update = true; - if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { - lock = Services - .get() - .get(MemoryLocksService.class) - .getWriteLock( - SLACalcStatus.SLA_ENTITYKEY_PREFIX + jobId, - Services.get().getConf() - .getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 5 * 1000)); - if (lock == null) { - update = false; - } - } - if (update) { - summaryBean.setLastModifiedTime(new Date()); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, summaryBean); - } - } - catch (Exception e) { - LOG.warn("Failed to load records for " + jobId, e); - } - finally { - if (lock != null) { - lock.release(); - lock = null; - } - } - } + + SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); + SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); + + // Processed missed jobs try { - if (summaryBean.getEventProcessed() == 7) { - historySet.add(jobId); - statusPendingCount++; - } - else if (summaryBean.getEventProcessed() <= 7) { - SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( - SLARegQuery.GET_SLA_REG_ON_RESTART, jobId); - SLACalcStatus slaCalcStatus = new SLACalcStatus(summaryBean, slaRegBean); - slaMap.put(jobId, slaCalcStatus); - slaPendingCount++; - } + SLAXCommandFactory.getSLAEventXCommand(slaCalcStatus).call(); } - catch (Exception e) { - LOG.warn("Failed to fetch/update records for " + jobId, e); + catch (Throwable e) { + LOG.error("Error while updating job {0}", slaCalcStatus.getId(), e); } + if (slaCalcStatus.getEventProcessed() == 7) { + historySet.add(jobId); + statusPendingCount++; + LOG.debug("Adding job [{0}] to historySet. EventProcessed is [{1}]", slaCalcStatus, + slaCalcStatus); + } + else if (slaCalcStatus.getEventProcessed() < 7) { + slaMap.put(jobId, slaCalcStatus); + slaPendingCount++; + LOG.debug("Adding job [{0}] to slamap. EventProcessed is [{1}]", slaCalcStatus, + slaCalcStatus); + + } } LOG.info("Loaded SLASummary pendingSLA=" + slaPendingCount + ", pendingStatusUpdate=" + statusPendingCount); - } catch (Exception e) { LOG.warn("Failed to retrieve SLASummary records on restart", e); } } - private boolean processSummaryBeanForCoordAction(SLASummaryBean summaryBean, String jobId) - throws JPAExecutorException { - boolean isJobModified = false; - CoordinatorActionBean coordAction = null; - coordAction = jpaService.execute(new CoordActionGetForSLAJPAExecutor(jobId)); - if (!coordAction.getStatusStr().equals(summaryBean.getJobStatus())) { - LOG.trace("Coordinator action status is " + coordAction.getStatusStr() + " and summary bean status is " - + summaryBean.getJobStatus()); - isJobModified = true; - summaryBean.setJobStatus(coordAction.getStatusStr()); - if (coordAction.isTerminalStatus()) { - WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction - .getExternalId())); - setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), coordAction.getLastModifiedTime(), - coordAction.getStatusStr()); - } - else if (coordAction.getStatus() != CoordinatorAction.Status.WAITING) { - WorkflowJobBean wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(coordAction - .getExternalId())); - setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); - } - } - return isJobModified; - } - - private boolean processSummaryBeanForWorkflowAction(SLASummaryBean summaryBean, String jobId) - throws JPAExecutorException { - boolean isJobModified = false; - WorkflowActionBean wfAction = null; - wfAction = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(jobId)); - if (!wfAction.getStatusStr().equals(summaryBean.getJobStatus())) { - LOG.trace("Workflow action status is " + wfAction.getStatusStr() + "and summary bean status is " - + summaryBean.getJobStatus()); - isJobModified = true; - summaryBean.setJobStatus(wfAction.getStatusStr()); - if (wfAction.inTerminalState()) { - setEndForSLASummaryBean(summaryBean, wfAction.getStartTime(), wfAction.getEndTime(), wfAction.getStatusStr()); - } - else if (wfAction.getStatus() != WorkflowAction.Status.PREP) { - setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfAction.getStartTime()); - } - } - return isJobModified; - } - - private boolean processSummaryBeanForWorkflowJob(SLASummaryBean summaryBean, String jobId) - throws JPAExecutorException { - boolean isJobModified = false; - WorkflowJobBean wfJob = null; - wfJob = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(jobId)); - if (!wfJob.getStatusStr().equals(summaryBean.getJobStatus())) { - LOG.trace("Workflow job status is " + wfJob.getStatusStr() + "and summary bean status is " - + summaryBean.getJobStatus()); - isJobModified = true; - summaryBean.setJobStatus(wfJob.getStatusStr()); - if (wfJob.inTerminalState()) { - setEndForSLASummaryBean(summaryBean, wfJob.getStartTime(), wfJob.getEndTime(), wfJob.getStatusStr()); - } - else if (wfJob.getStatus() != WorkflowJob.Status.PREP) { - setStartForSLASummaryBean(summaryBean, summaryBean.getEventProcessed(), wfJob.getStartTime()); - } - } - return isJobModified; - } - - private void setEndForSLASummaryBean(SLASummaryBean summaryBean, Date startTime, Date endTime, String status) { - byte eventProc = summaryBean.getEventProcessed(); - summaryBean.setEventProcessed(8); - summaryBean.setActualStart(startTime); - summaryBean.setActualEnd(endTime); - long actualDuration = endTime.getTime() - startTime.getTime(); - summaryBean.setActualDuration(actualDuration); - if (eventProc < 4) { - if (status.equals(WorkflowJob.Status.SUCCEEDED.name()) || status.equals(WorkflowAction.Status.OK.name()) - || status.equals(CoordinatorAction.Status.SUCCEEDED.name())) { - if (endTime.getTime() <= summaryBean.getExpectedEnd().getTime()) { - summaryBean.setSLAStatus(SLAStatus.MET); - } - else { - summaryBean.setSLAStatus(SLAStatus.MISS); - } - } - else { - summaryBean.setSLAStatus(SLAStatus.MISS); - } - } - - } - - private void setStartForSLASummaryBean(SLASummaryBean summaryBean, byte eventProc, Date startTime) { - if (((eventProc & 1) == 0)) { - eventProc += 1; - summaryBean.setEventProcessed(eventProc); - } - if (summaryBean.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { - summaryBean.setSLAStatus(SLAStatus.IN_PROCESS); - } - summaryBean.setActualStart(startTime); - } - @Override public int size() { return slaMap.size(); } + @VisibleForTesting + public Set<String> getHistorySet(){ + return historySet; + } + @Override public SLACalcStatus get(String jobId) throws JPAExecutorException { SLACalcStatus memObj; memObj = slaMap.get(jobId); if (memObj == null && historySet.contains(jobId)) { - memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), - SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); + memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() + .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); } return memObj; } @@ -460,13 +202,13 @@ public class SLACalculatorMemory implements SLACalculator { SLACalcStatus memObj; memObj = slaMap.get(jobId); if (memObj == null) { - memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), - SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); + memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance() + .get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); } return memObj; } - @Override public Iterator<String> iterator() { return slaMap.keySet().iterator(); @@ -488,12 +230,17 @@ public class SLACalculatorMemory implements SLACalculator { */ protected void updateJobSla(String jobId) throws Exception { SLACalcStatus slaCalc = slaMap.get(jobId); + + if (slaCalc == null) { + // job might be processed and removed from map by addJobStatus + return; + } synchronized (slaCalc) { - boolean change = false; // get eventProcessed on DB for validation in HA SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); byte eventProc = summaryBean.getEventProcessed(); + slaCalc.setEventProcessed(eventProc); if (eventProc >= 7) { if (eventProc == 7) { historySet.add(jobId); @@ -503,127 +250,69 @@ public class SLACalculatorMemory implements SLACalculator { } else { if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { - //Update last modified time. + // Update last modified time. slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); reloadExpectedTimeAndConfig(slaCalc); LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); } - slaCalc.setEventProcessed(eventProc); - SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); - // calculation w.r.t current time and status - if ((eventProc & 1) == 0) { // first bit (start-processed) unset - if (reg.getExpectedStart() != null) { - if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { - confirmWithDB(slaCalc); - eventProc = slaCalc.getEventProcessed(); - if (eventProc != 8 && (eventProc & 1) == 0) { - // Some DB exception - slaCalc.setEventStatus(EventStatus.START_MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc++; - } - change = true; - } - } - else { - eventProc++; // disable further processing for optional start sla condition - change = true; - } - } - // check if second bit (duration-processed) is unset - if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { - if (reg.getExpectedDuration() == -1) { - eventProc += 2; - change = true; - } - else if (slaCalc.getActualStart() != null) { - if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc - .getActualStart().getTime())) { - slaCalc.setEventProcessed(eventProc); - confirmWithDB(slaCalc); - eventProc = slaCalc.getEventProcessed(); - if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { - // Some DB exception - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc += 2; - } - change = true; - } - } - } - if (eventProc < 4) { - if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { - slaCalc.setEventProcessed(eventProc); - confirmWithDB(slaCalc); - eventProc = slaCalc.getEventProcessed(); - change = true; - } - } - if (change) { + if (isChanged(slaCalc)) { + LOG.debug("{0} job has SLA event change. EventProc = {1}, status = {2}", slaCalc.getId(), + slaCalc.getEventProcessed(), slaCalc.getJobStatus()); try { - boolean locked = true; - slaCalc.acquireLock(); - locked = slaCalc.isLocked(); - if (locked) { - // no more processing, no transfer to history set - if (slaCalc.getEventProcessed() >= 8) { - eventProc = 8; - // Should not be > 8. But to handle any corner cases - slaCalc.setEventProcessed(8); - slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map after Event-processed=8", jobId); - } - else { - slaCalc.setEventProcessed(eventProc); - } - writetoDB(slaCalc, eventProc); - if (eventProc == 7) { - historySet.add(jobId); - slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map after Event-processed=7", jobId); - } - } - } - catch (InterruptedException e) { - throw new XException(ErrorCode.E0606, slaCalc.getId(), slaCalc.getLockTimeOut()); + SLAXCommandFactory.getSLAEventXCommand(slaCalc).call(); + checkEventProc(slaCalc); } - finally { - slaCalc.releaseLock(); + catch (XException e) { + if (e.getErrorCode().equals(ErrorCode.E0604) || e.getErrorCode().equals(ErrorCode.E0605)) { + LOG.debug("job [{0}] is is not in DB, removing from Memory", slaCalc.getId()); + slaMap.remove(jobId); + } } } + } } } - private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws JPAExecutorException { - SLASummaryBean slaSummaryBean = new SLASummaryBean(); - slaSummaryBean.setId(slaCalc.getId()); - slaSummaryBean.setEventProcessed(eventProc); - slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); - slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); - slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); - slaSummaryBean.setActualStart(slaCalc.getActualStart()); - slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); - slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); - slaSummaryBean.setLastModifiedTime(new Date()); + private boolean isChanged(SLACalcStatus slaCalc) { + SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); + byte eventProc = slaCalc.getEventProcessed(); - SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, - slaSummaryBean); - LOG.trace("Stored SLA SummaryBean Job [{0}] with Event-processed=[{1}]", slaCalc.getId(), - slaSummaryBean.getEventProcessed()); + if ((eventProc & 1) == 0) { // first bit (start-processed) unset + if (reg.getExpectedStart() != null) { + if (reg.getExpectedStart().getTime() + jobEventLatency < System.currentTimeMillis()) { + return true; + } + } + else { + return true; + } + } + if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { + if (reg.getExpectedDuration() == -1) { + return true; + } + else if (slaCalc.getActualStart() != null) { + if ((reg.getExpectedDuration() + jobEventLatency) < (System.currentTimeMillis() - slaCalc + .getActualStart().getTime())) { + return true; + } + } + } + if (eventProc < 4) { + if (reg.getExpectedEnd().getTime() + jobEventLatency < System.currentTimeMillis()) { + return true; + } + } + return false; } @SuppressWarnings("rawtypes") - private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) - throws JPAExecutorException { + private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) throws JPAExecutorException { updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean())); slaCalc.setLastModifiedTime(new Date()); - updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, new SLASummaryBean(slaCalc))); + updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, + new SLASummaryBean(slaCalc))); } @SuppressWarnings("rawtypes") @@ -641,7 +330,6 @@ public class SLACalculatorMemory implements SLACalculator { BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); } - /** * Periodically run by the SLAService worker threads to update SLA status by * iterating through all the jobs in the map @@ -770,410 +458,62 @@ public class SLACalculatorMemory implements SLACalculator { @Override public boolean addJobStatus(String jobId, String jobStatus, JobEvent.EventStatus jobEventStatus, Date startTime, Date endTime) throws JPAExecutorException, ServiceException { + LOG.debug( + "Received addJobStatus request for job [{0}] jobStatus = [{1}], jobEventStatus = [{2}], startTime = [{3}], " + + "endTime = [{4}] ", jobId, jobStatus, jobEventStatus, startTime, endTime); SLACalcStatus slaCalc = slaMap.get(jobId); - SLASummaryBean slaInfo = null; - boolean hasSla = false; - if (slaCalc == null) { - if (historySet.contains(jobId)) { - slaInfo = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId); - if (slaInfo == null) { - throw new JPAExecutorException(ErrorCode.E0604, jobId); - } - slaInfo.setJobStatus(jobStatus); - slaInfo.setActualStart(startTime); - slaInfo.setActualEnd(endTime); - if (endTime != null) { - slaInfo.setActualDuration(endTime.getTime() - startTime.getTime()); - } - slaInfo.setEventProcessed(8); - historySet.remove(jobId); - slaInfo.setLastModifiedTime(new Date()); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo); - hasSla = true; - } - else if (Services.get().get(JobsConcurrencyService.class).isHighlyAvailableMode()) { - // jobid might not exist in slaMap in HA Setting - SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( - SLARegQuery.GET_SLA_REG_ALL, jobId); - if (slaRegBean != null) { // filter out jobs picked by SLA job event listener - // but not actually configured for SLA - SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( - SLASummaryQuery.GET_SLA_SUMMARY, jobId); - if (slaSummaryBean.getEventProcessed() < 7) { - slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); - slaMap.put(jobId, slaCalc); - } - } - } - } - if (slaCalc != null) { - synchronized (slaCalc) { - try { - // only get ZK lock when multiple servers running - boolean locked = true; - slaCalc.acquireLock(); - locked = slaCalc.isLocked(); - if (locked) { - // get eventProcessed on DB for validation in HA - SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( - SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); - byte eventProc = summaryBean.getEventProcessed(); - - if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { - //Update last modified time. - slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); - reloadExpectedTimeAndConfig(slaCalc); - LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); - } - slaCalc.setEventProcessed(eventProc); - slaCalc.setJobStatus(jobStatus); - switch (jobEventStatus) { - case STARTED: - slaInfo = processJobStartSLA(slaCalc, startTime); - break; - case SUCCESS: - slaInfo = processJobEndSuccessSLA(slaCalc, startTime, endTime); - break; - case FAILURE: - slaInfo = processJobEndFailureSLA(slaCalc, startTime, endTime); - break; - default: - LOG.debug("Unknown Job Status for SLA purpose[{0}]", jobEventStatus); - slaInfo = getSLASummaryBean(slaCalc); - } - if (slaCalc.getEventProcessed() == 7) { - slaInfo.setEventProcessed(8); - slaMap.remove(jobId); - } - slaInfo.setLastModifiedTime(new Date()); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaInfo); - hasSla = true; - } - } - catch (InterruptedException e) { - throw new ServiceException(ErrorCode.E0606, slaCalc.getEntityKey(), slaCalc.getLockTimeOut()); - } - finally { - slaCalc.releaseLock(); - } + if (slaCalc == null) { + SLARegistrationBean slaRegBean = SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_REG_ALL, jobId); + if (slaRegBean != null) { // filter out jobs picked by SLA job event listener + // but not actually configured for SLA + SLASummaryBean slaSummaryBean = SLASummaryQueryExecutor.getInstance().get( + SLASummaryQuery.GET_SLA_SUMMARY, jobId); + slaCalc = new SLACalcStatus(slaSummaryBean, slaRegBean); + slaMap.put(jobId, slaCalc); } - LOG.trace("SLA Status Event - Job:" + jobId + " Status:" + slaCalc.getSLAStatus()); } + else { + SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( + SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); + byte eventProc = summaryBean.getEventProcessed(); + if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { + // Update last modified time. + slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); + reloadExpectedTimeAndConfig(slaCalc); + LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); - return hasSla; - } - - /** - * Process SLA for jobs that started running. Also update actual-start time - * - * @param slaCalc - * @param actualStart - * @return SLASummaryBean - */ - private SLASummaryBean processJobStartSLA(SLACalcStatus slaCalc, Date actualStart) { - slaCalc.setActualStart(actualStart); - if (slaCalc.getSLAStatus().equals(SLAStatus.NOT_STARTED)) { - slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); - } - SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); - Date expecStart = reg.getExpectedStart(); - byte eventProc = slaCalc.getEventProcessed(); - // set event proc here - if (((eventProc & 1) == 0)) { - if (expecStart != null) { - if (actualStart.getTime() > expecStart.getTime()) { - slaCalc.setEventStatus(EventStatus.START_MISS); - } - else { - slaCalc.setEventStatus(EventStatus.START_MET); - } - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } } - eventProc += 1; - slaCalc.setEventProcessed(eventProc); - } - return getSLASummaryBean(slaCalc); - } - - /** - * Process SLA for jobs that ended successfully. Also update actual-start - * and end time - * - * @param slaCalc - * @param actualStart - * @param actualEnd - * @return SLASummaryBean - * @throws JPAExecutorException - */ - private SLASummaryBean processJobEndSuccessSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { - SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); - slaCalc.setActualStart(actualStart); - slaCalc.setActualEnd(actualEnd); - long expectedDuration = reg.getExpectedDuration(); - long actualDuration = actualEnd.getTime() - actualStart.getTime(); - slaCalc.setActualDuration(actualDuration); - //check event proc - byte eventProc = slaCalc.getEventProcessed(); - if (((eventProc >> 1) & 1) == 0) { - processDurationSLA(expectedDuration, actualDuration, slaCalc); - eventProc += 2; slaCalc.setEventProcessed(eventProc); } - - if (eventProc < 4) { - Date expectedEnd = reg.getExpectedEnd(); - if (actualEnd.getTime() > expectedEnd.getTime()) { - slaCalc.setEventStatus(EventStatus.END_MISS); - slaCalc.setSLAStatus(SLAStatus.MISS); - } - else { - slaCalc.setEventStatus(EventStatus.END_MET); - slaCalc.setSLAStatus(SLAStatus.MET); - } - eventProc += 4; - slaCalc.setEventProcessed(eventProc); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (slaCalc != null) { + try { + SLAXCommandFactory.getSLAEventXCommand(slaCalc, + ConfigurationService.getLong(SLAService.CONF_SLA_CALC_LOCK_TIMEOUT, 20 * 1000)).call(); + checkEventProc(slaCalc); } - } - return getSLASummaryBean(slaCalc); - } - - /** - * Process SLA for jobs that ended in failure. Also update actual-start and - * end time - * - * @param slaCalc - * @param actualStart - * @param actualEnd - * @return SLASummaryBean - * @throws JPAExecutorException - */ - private SLASummaryBean processJobEndFailureSLA(SLACalcStatus slaCalc, Date actualStart, Date actualEnd) throws JPAExecutorException { - slaCalc.setActualStart(actualStart); - slaCalc.setActualEnd(actualEnd); - if (actualStart == null) { // job failed before starting - if (slaCalc.getEventProcessed() < 4) { - slaCalc.setEventStatus(EventStatus.END_MISS); - slaCalc.setSLAStatus(SLAStatus.MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - slaCalc.setEventProcessed(7); - return getSLASummaryBean(slaCalc); + catch (XException e) { + LOG.error(e); + throw new ServiceException(e); } + return true; } - SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); - long expectedDuration = reg.getExpectedDuration(); - long actualDuration = actualEnd.getTime() - actualStart.getTime(); - slaCalc.setActualDuration(actualDuration); - - byte eventProc = slaCalc.getEventProcessed(); - if (((eventProc >> 1) & 1) == 0) { - if (expectedDuration != -1) { - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - } - eventProc += 2; - slaCalc.setEventProcessed(eventProc); - } - if (eventProc < 4) { - slaCalc.setEventStatus(EventStatus.END_MISS); - slaCalc.setSLAStatus(SLAStatus.MISS); - eventProc += 4; - slaCalc.setEventProcessed(eventProc); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - } - return getSLASummaryBean(slaCalc); - } - - private SLASummaryBean getSLASummaryBean (SLACalcStatus slaCalc) { - SLASummaryBean slaSummaryBean = new SLASummaryBean(); - slaSummaryBean.setActualStart(slaCalc.getActualStart()); - slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); - slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); - slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); - slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); - slaSummaryBean.setEventProcessed(slaCalc.getEventProcessed()); - slaSummaryBean.setId(slaCalc.getId()); - slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); - return slaSummaryBean; - } - - private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { - if (expected != -1) { - if (actual > expected) { - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - } - else if (actual <= expected) { - slaCalc.setEventStatus(EventStatus.DURATION_MET); - } - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } + else { + return false; } } - /* - * Confirm alerts against source of truth - DB. Also required in case of High Availability - */ - private void confirmWithDB(SLACalcStatus slaCalc) { - boolean ended = false, isEndMiss = false; - try { - switch (slaCalc.getAppType()) { - case WORKFLOW_JOB: - WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(slaCalc.getId())); - if (wf.getEndTime() != null) { - ended = true; - if (wf.getStatus() == WorkflowJob.Status.KILLED || wf.getStatus() == WorkflowJob.Status.FAILED - || wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { - isEndMiss = true; - } - } - slaCalc.setActualStart(wf.getStartTime()); - slaCalc.setActualEnd(wf.getEndTime()); - slaCalc.setJobStatus(wf.getStatusStr()); - break; - case WORKFLOW_ACTION: - WorkflowActionBean wa = jpaService.execute(new WorkflowActionGetForSLAJPAExecutor(slaCalc.getId())); - if (wa.getEndTime() != null) { - ended = true; - if (wa.isTerminalWithFailure() - || wa.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { - isEndMiss = true; - } - } - slaCalc.setActualStart(wa.getStartTime()); - slaCalc.setActualEnd(wa.getEndTime()); - slaCalc.setJobStatus(wa.getStatusStr()); - break; - case COORDINATOR_ACTION: - CoordinatorActionBean ca = jpaService.execute(new CoordActionGetForSLAJPAExecutor(slaCalc.getId())); - if (ca.isTerminalWithFailure()) { - isEndMiss = ended = true; - slaCalc.setActualEnd(ca.getLastModifiedTime()); - } - if (ca.getExternalId() != null) { - wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ca.getExternalId())); - if (wf.getEndTime() != null) { - ended = true; - if (wf.getEndTime().getTime() > slaCalc.getExpectedEnd().getTime()) { - isEndMiss = true; - } - } - slaCalc.setActualEnd(wf.getEndTime()); - slaCalc.setActualStart(wf.getStartTime()); - } - slaCalc.setJobStatus(ca.getStatusStr()); - break; - default: - LOG.debug("Unsupported App-type for SLA - " + slaCalc.getAppType()); - } - - byte eventProc = slaCalc.getEventProcessed(); - if (ended) { - if (isEndMiss) { - slaCalc.setSLAStatus(SLAStatus.MISS); - } - else { - slaCalc.setSLAStatus(SLAStatus.MET); - } - if (slaCalc.getActualStart() != null) { - if ((eventProc & 1) == 0) { - if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { - slaCalc.setEventStatus(EventStatus.START_MISS); - } - else { - slaCalc.setEventStatus(EventStatus.START_MET); - } - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - } - slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); - if (((eventProc >> 1) & 1) == 0) { - processDurationSLA(slaCalc.getExpectedDuration(), slaCalc.getActualDuration(), slaCalc); - } - } - if (eventProc < 4) { - if (isEndMiss) { - slaCalc.setEventStatus(EventStatus.END_MISS); - } - else { - slaCalc.setEventStatus(EventStatus.END_MET); - } - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - } - slaCalc.setEventProcessed(8); - } - else { - if (slaCalc.getActualStart() != null) { - slaCalc.setSLAStatus(SLAStatus.IN_PROCESS); - } - if ((eventProc & 1) == 0) { - if (slaCalc.getActualStart() != null) { - if (slaCalc.getExpectedStart().getTime() < slaCalc.getActualStart().getTime()) { - slaCalc.setEventStatus(EventStatus.START_MISS); - } - else { - slaCalc.setEventStatus(EventStatus.START_MET); - } - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc++; - } - else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { - slaCalc.setEventStatus(EventStatus.START_MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc++; - } - } - if (((eventProc >> 1) & 1) == 0 && slaCalc.getActualStart() != null - && slaCalc.getExpectedDuration() != -1) { - if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc += 2; - } - } - if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { - slaCalc.setEventStatus(EventStatus.END_MISS); - slaCalc.setSLAStatus(SLAStatus.MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - eventProc += 4; - } - slaCalc.setEventProcessed(eventProc); - } + private void checkEventProc(SLACalcStatus slaCalc){ + byte eventProc = slaCalc.getEventProcessed(); + if (slaCalc.getEventProcessed() >= 8) { + slaMap.remove(slaCalc.getId()); + LOG.debug("Removed Job [{0}] from map after Event-processed=8", slaCalc.getId()); } - catch (Exception e) { - LOG.warn("Error while confirming SLA against DB for jobid= " + slaCalc.getId() + ". Exception is " - + e.getClass().getName() + ": " + e.getMessage()); - if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { - slaCalc.setEventStatus(EventStatus.END_MISS); - slaCalc.setSLAStatus(SLAStatus.MISS); - if (shouldAlert(slaCalc)) { - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4); - } + if (eventProc == 7) { + historySet.add(slaCalc.getId()); + slaMap.remove(slaCalc.getId()); + LOG.debug("Removed Job [{0}] from map after Event-processed=7", slaCalc.getId()); } } @@ -1235,21 +575,21 @@ public class SLACalculatorMemory implements SLACalculator { return enableAlert(getSLAJobsforParents(parentJobIds)); } - @Override public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { boolean isJobFound = false; @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); - for (String jobId : jobIds) { - SLACalcStatus slaCalc = getSLACalcStatus(jobId); - if (slaCalc != null) { - slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true)); - updateDBSlaConfig(slaCalc, updateList); - isJobFound = true; - } + for (String jobId : jobIds) { + SLACalcStatus slaCalc = getSLACalcStatus(jobId); + if (slaCalc != null) { + slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, + Boolean.toString(true)); + updateDBSlaConfig(slaCalc, updateList); + isJobFound = true; } + } executeBatchQuery(updateList); return isJobFound; } @@ -1260,19 +600,19 @@ public class SLACalculatorMemory implements SLACalculator { } @Override - public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException, - ServiceException{ + public boolean changeDefinition(List<Pair<String, Map<String, String>>> jobIdsSLAPair) throws JPAExecutorException, + ServiceException { boolean isJobFound = false; @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); - for (Pair<String, Map<String,String>> jobIdSLAPair : jobIdsSLAPair) { - SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); - if (slaCalc != null) { - updateParams(slaCalc, jobIdSLAPair.getSecond()); - updateDBSlaExpectedValues(slaCalc, updateList); - isJobFound = true; - } + for (Pair<String, Map<String, String>> jobIdSLAPair : jobIdsSLAPair) { + SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); + if (slaCalc != null) { + updateParams(slaCalc, jobIdSLAPair.getSecond()); + updateDBSlaExpectedValues(slaCalc, updateList); + isJobFound = true; } + } executeBatchQuery(updateList); return isJobFound; } @@ -1292,11 +632,7 @@ public class SLACalculatorMemory implements SLACalculator { } } - private boolean shouldAlert(SLACalcStatus slaObj) { - return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT); - } - - private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException{ + private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException { List<String> childJobIds = new ArrayList<String>(); for (String jobId : parentJobIds) { List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList( @@ -1307,4 +643,5 @@ public class SLACalculatorMemory implements SLACalculator { } return childJobIds; } + }
http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java index ef1ea98..3b2cebd 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java +++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java @@ -52,8 +52,6 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES", query = "update SLASummaryBean w set w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.eventProcessed = :eventProcessed, w.jobStatus = :jobStatus, w.lastModifiedTS = :lastModifiedTS, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration where w.jobId = :jobId"), - @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"), - @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"), @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"), http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java new file mode 100644 index 0000000..e6298f5 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/sla/SLAXCommandFactory.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.sla; + +import org.apache.oozie.command.sla.SLACoordActionJobEventXCommand; +import org.apache.oozie.command.sla.SLACoordActionJobHistoryXCommand; +import org.apache.oozie.command.sla.SLAJobEventXCommand; +import org.apache.oozie.command.sla.SLAJobHistoryXCommand; +import org.apache.oozie.command.sla.SLAWorkflowActionJobEventXCommand; +import org.apache.oozie.command.sla.SLAWorkflowActionJobHistoryXCommand; +import org.apache.oozie.command.sla.SLAWorkflowJobEventXCommand; +import org.apache.oozie.command.sla.SLAWorkflowJobHistoryXCommand; + +/** + * A factory for creating SLACommand objects. + */ +public class SLAXCommandFactory { + + /** + * Gets the SLA job history command. + * + * @param jobId the job id + * @return the SLA job history x command + */ + public static SLAJobHistoryXCommand getSLAJobHistoryXCommand(String jobId) { + if (jobId.endsWith("-W")) { + return new SLAWorkflowJobHistoryXCommand(jobId); + } + else if (jobId.contains("-W@")) { + return new SLAWorkflowActionJobHistoryXCommand(jobId); + + } + else if (jobId.contains("-C@")) { + return new SLACoordActionJobHistoryXCommand(jobId); + } + + else { + return null; + } + } + + /** + * Gets the SLAevent command. + * + * @param slaCalc the sla calc + * @return the SLA event x command + */ + public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc) { + return getSLAEventXCommand(slaCalc, 0); + } + + /** + * Gets the SLA event x command. + * + * @param slaCalc the sla calc + * @param lockTimeOut the lock time out + * @return the SLA event x command + */ + public static SLAJobEventXCommand getSLAEventXCommand(SLACalcStatus slaCalc, long lockTimeOut) { + if (slaCalc.getId().endsWith("-W")) { + return new SLAWorkflowJobEventXCommand(slaCalc, lockTimeOut); + } + else if (slaCalc.getId().contains("-W@")) { + return new SLAWorkflowActionJobEventXCommand(slaCalc, lockTimeOut); + + } + else if (slaCalc.getId().contains("-C@")) { + return new SLACoordActionJobEventXCommand(slaCalc, lockTimeOut); + } + + else { + return null; + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java index 52560e6..e239084 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordActionsKillXCommand.java @@ -19,6 +19,7 @@ package org.apache.oozie.command.coord; import java.util.Date; + import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.WorkflowJobBean; @@ -28,9 +29,10 @@ import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.executor.jpa.CoordActionGetForCheckJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; @@ -84,7 +86,9 @@ public class TestCoordActionsKillXCommand extends XDataTestCase { assertEquals(CoordinatorAction.Status.KILLED, action.getStatus()); sleep(100); - WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3])); + WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, + ids[3]); + assertEquals(WorkflowJob.Status.KILLED, wf.getStatus()); CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0])); @@ -118,7 +122,8 @@ public class TestCoordActionsKillXCommand extends XDataTestCase { assertEquals(CoordinatorAction.Status.KILLED, action.getStatus()); sleep(100); - WorkflowJobBean wf = jpaService.execute(new WorkflowJobGetForSLAJPAExecutor(ids[3])); + WorkflowJobBean wf = WorkflowJobQueryExecutor.getInstance().get(WorkflowJobQuery.GET_WORKFLOW_FOR_SLA, + ids[3]); assertEquals(WorkflowJob.Status.KILLED, wf.getStatus()); CoordinatorJobBean job = jpaService.execute(new CoordJobGetJPAExecutor(ids[0])); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java index 5914b3b..1daecdc 100644 --- a/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestSLASummaryQueryExecutor.java @@ -131,7 +131,7 @@ public class TestSLASummaryQueryExecutor extends XDataTestCase { bean.setActualDuration(endTime.getTime() - startTime.getTime()); bean.setLastModifiedTime(new Date()); bean.setEventProcessed(8); - SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, bean); + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, bean); retBean = SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, bean.getId()); assertEquals(bean.getActualStartTimestamp(), retBean.getActualStartTimestamp()); assertEquals(bean.getActualEndTimestamp(), retBean.getActualEndTimestamp()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ba7a7b85/core/src/test/java/org/apache/oozie/service/TestHASLAService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java index 795db37..3af263e 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHASLAService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHASLAService.java @@ -101,17 +101,18 @@ public class TestHASLAService extends ZKXTestCase { // Case 1 workflow job submitted to dummy server, // but before start running, the dummy server is down - createWorkflow("job-1"); - SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1", AppType.WORKFLOW_JOB); + WorkflowJobBean wfJob1 = createWorkflow("job-1-W"); + SLARegistrationBean sla1 = TestSLAService._createSLARegistration("job-1-W", AppType.WORKFLOW_JOB); sla1.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2 hr before sla1.setExpectedEnd(new Date(System.currentTimeMillis() - 1 * 3600 * 1000)); // 1 hr before sla1.setExpectedDuration(10 * 60 * 1000); // 10 mins dummyCalc.addRegistration(sla1.getId(), sla1); + dummyCalc.updateAllSlaStatus(); // Case 2. workflow job submitted to dummy server, start running, // then the dummy server is down - createWorkflow("job-2"); - SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2", AppType.WORKFLOW_JOB); + WorkflowJobBean wfJob2 = createWorkflow("job-2-W"); + SLARegistrationBean sla2 = TestSLAService._createSLARegistration("job-2-W", AppType.WORKFLOW_JOB); sla2.setExpectedStart(new Date(System.currentTimeMillis() - 2 * 3600 * 1000)); // 2hr before sla2.setExpectedEnd(new Date(System.currentTimeMillis() + 1 * 3600 * 1000)); // 1hr ahead sla2.setExpectedDuration(10 * 60 * 1000); // 10 mins @@ -138,6 +139,12 @@ public class TestHASLAService extends ZKXTestCase { ehs.new EventWorker().run(); assertTrue(output.toString().contains(sla1.getId() + " Sla START - MISS!!!")); + wfJob1.setStatus(WorkflowJob.Status.SUCCEEDED); + wfJob1.setEndTime(new Date()); + wfJob1.setStartTime(new Date()); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob1); + // Job 1 succeeded on the living server --> duration met and end miss slaCalcMem.addJobStatus(sla1.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), new Date()); @@ -145,6 +152,12 @@ public class TestHASLAService extends ZKXTestCase { assertTrue(output.toString().contains(sla1.getId() + " Sla DURATION - MET!!!")); assertTrue(output.toString().contains(sla1.getId() + " Sla END - MISS!!!")); + wfJob2.setStatus(WorkflowJob.Status.SUCCEEDED); + wfJob2.setEndTime(new Date()); + wfJob2.setStartTime(new Date()); + WorkflowJobQueryExecutor.getInstance().executeUpdate( + WorkflowJobQuery.UPDATE_WORKFLOW_STATUS_INSTANCE_MOD_START_END, wfJob2); + // Job 2 succeeded on the living server --> duration met and end met slaCalcMem.addJobStatus(sla2.getId(), WorkflowJob.Status.SUCCEEDED.name(), EventStatus.SUCCESS, new Date(), new Date()); @@ -161,13 +174,14 @@ public class TestHASLAService extends ZKXTestCase { } } - public void updateCoordAction(String id, String status) throws JPAExecutorException { + public CoordinatorActionBean updateCoordAction(String id, String status) throws JPAExecutorException { CoordinatorActionBean action = new CoordinatorActionBean(); action.setId(id); action.setStatusStr(status); action.setLastModifiedTime(new Date()); CoordActionQueryExecutor.getInstance().executeUpdate(CoordActionQuery.UPDATE_COORD_ACTION_STATUS_PENDING_TIME, action); + return action; } public void testSLAUpdateWithHA() throws Exception { @@ -187,8 +201,8 @@ public class TestHASLAService extends ZKXTestCase { createDBEntry(id3, expectedStartTS, expectedEndTS1); createDBEntry(id4, expectedStartTS, expectedEndTS1); // Coord Action of jobs 5-6 already started and currently running (to test history set) - createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2); - createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2); + createDBEntryForStarted(id5, expectedStartTS, expectedEndTS2, 1); + createDBEntryForStarted(id6, expectedStartTS, expectedEndTS2, 1); SLAService slas = Services.get().get(SLAService.class); SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); @@ -198,7 +212,10 @@ public class TestHASLAService extends ZKXTestCase { while (itr.hasNext()) { slaMapKeys.add(itr.next()); } - assertEquals(6, slaMapKeys.size()); + //4 jobs expected end is not yet reached + //2 jobs has end miss, waiting for job to complete + assertEquals(4, slaMapKeys.size()); + assertEquals(2, slaCalcMem.getHistorySet().size()); DummyZKOozie dummyOozie_1 = null; try { @@ -214,8 +231,8 @@ public class TestHASLAService extends ZKXTestCase { while (itr.hasNext()) { slaMapKeys.add(itr.next()); } - assertEquals(6, slaMapKeys.size()); - + assertEquals(4, slaMapKeys.size()); + assertEquals(2, dummySlaCalcMem.getHistorySet().size()); // Coord Action 1,3 run and update status on *non-dummy* server updateCoordAction(id1, "RUNNING"); slaCalcMem @@ -314,14 +331,12 @@ public class TestHASLAService extends ZKXTestCase { public void testNoDuplicateEventsInHA() throws Exception { String id1 = "0000001-130521183438837-oozie-test-C@1"; - Date expectedStartTS = new Date(System.currentTimeMillis() - 2 * 3600 * 1000); // get MISS - Date expectedEndTS = new Date(System.currentTimeMillis() - 1 * 3600 * 1000); // get MISS - createDBEntry(id1, expectedStartTS, expectedEndTS); SLAService slas = Services.get().get(SLAService.class); SLACalculatorMemory slaCalcMem = (SLACalculatorMemory) slas.getSLACalculator(); slaCalcMem.init(Services.get().getConf()); // loads the job in sla map + EventHandlerService ehs = Services.get().get(EventHandlerService.class); EventQueue ehs_q = ehs.getEventQueue(); @@ -336,20 +351,30 @@ public class TestHASLAService extends ZKXTestCase { dummyEhs.init(Services.get()); EventQueue dummyEhs_q = dummyEhs.getEventQueue(); + Date expectedStartTS = new Date(System.currentTimeMillis() + 2 * 3600 * 1000); // get MISS + Date expectedEndTS = new Date(System.currentTimeMillis() + 1 * 3600 * 1000); // get MISS + SLASummaryBean sla = createDBEntryForStarted(id1, expectedStartTS, expectedEndTS, 0); + sla.setExpectedDuration(-1); + sla.setLastModifiedTime(new Date()); + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, + sla); + // Action started on Server 1 updateCoordAction(id1, "RUNNING"); + slaCalcMem .addJobStatus(id1, CoordinatorAction.Status.RUNNING.name(), EventStatus.STARTED, new Date(), null); + assertEquals(1, ehs_q.size()); SLACalcStatus s1 = (SLACalcStatus) ehs_q.poll(); - assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus()); + assertEquals(SLAStatus.IN_PROCESS, s1.getSLAStatus()); // Action ended on Server 2 updateCoordAction(id1, "FAILED"); dummySlaCalcMem.addJobStatus(id1, CoordinatorAction.Status.FAILED.name(), EventStatus.FAILURE, new Date( System.currentTimeMillis() - 1800 * 1000), new Date()); - dummyEhs_q.poll(); // getting rid of the duration_miss event SLACalcStatus s2 = (SLACalcStatus) dummyEhs_q.poll(); + assertEquals(SLAStatus.MISS, s2.getSLAStatus()); slaCalcMem.updateAllSlaStatus(); @@ -464,7 +489,8 @@ public class TestHASLAService extends ZKXTestCase { BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); } - private void createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS) throws Exception { + private SLASummaryBean createDBEntryForStarted(String actionId, Date expectedStartTS, Date expectedEndTS, + int eventProcessed) throws Exception { ArrayList<JsonBean> insertList = new ArrayList<JsonBean>(); Date modTime = new Date(); WorkflowJobBean wf = addRecordToWfJobTable(WorkflowJob.Status.RUNNING, WorkflowInstance.Status.RUNNING, @@ -494,7 +520,7 @@ public class TestHASLAService extends ZKXTestCase { sla.setAppType(AppType.COORDINATOR_ACTION); sla.setJobStatus("RUNNING"); sla.setSLAStatus(SLAStatus.IN_PROCESS); - sla.setEventProcessed(1); + sla.setEventProcessed(eventProcessed); sla.setLastModifiedTime(modTime); sla.setExpectedStart(expectedStartTS); sla.setActualStart(expectedStartTS); @@ -508,9 +534,10 @@ public class TestHASLAService extends ZKXTestCase { insertList.add(sla); insertList.add(reg); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return sla; } - private void createWorkflow(String id) throws Exception { + private WorkflowJobBean createWorkflow(String id) throws Exception { List<JsonBean> insertList = new ArrayList<JsonBean>(); WorkflowJobBean workflow = new WorkflowJobBean(); workflow.setId(id); @@ -519,6 +546,7 @@ public class TestHASLAService extends ZKXTestCase { workflow.setSlaXml("<sla></sla>"); insertList.add(workflow); BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(insertList, null, null); + return workflow; } public static class DummySLAEventListener extends SLAEventListener {
