http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java index fdce6b5..42313fd 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculatorMemory.java @@ -38,13 +38,17 @@ import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.XException; import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.event.JobEvent; import org.apache.oozie.client.event.SLAEvent.EventStatus; import org.apache.oozie.client.event.SLAEvent.SLAStatus; import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.BatchQueryExecutor; +import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.executor.jpa.CoordActionGetForSLAJPAExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; @@ -52,17 +56,16 @@ import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor; +import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; import org.apache.oozie.executor.jpa.WorkflowActionGetForSLAJPAExecutor; -import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQuery; +import org.apache.oozie.executor.jpa.WorkflowJobGetForSLAJPAExecutor; +import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor; import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery; import org.apache.oozie.executor.jpa.sla.SLASummaryGetRecordsOnRestartJPAExecutor; -import org.apache.oozie.executor.jpa.SLASummaryQueryExecutor.SLASummaryQuery; -import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; import org.apache.oozie.lock.LockToken; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.EventHandlerService; @@ -76,7 +79,7 @@ import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; - +import org.apache.oozie.util.Pair; import com.google.common.annotations.VisibleForTesting; @@ -453,6 +456,17 @@ public class SLACalculatorMemory implements SLACalculator { return memObj; } + private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException { + SLACalcStatus memObj; + memObj = slaMap.get(jobId); + if (memObj == null) { + memObj = new SLACalcStatus(SLASummaryQueryExecutor.getInstance().get(SLASummaryQuery.GET_SLA_SUMMARY, jobId), + SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ON_RESTART, jobId)); + } + return memObj; + } + + @Override public Iterator<String> iterator() { return slaMap.keySet().iterator(); @@ -477,9 +491,9 @@ public class SLACalculatorMemory implements SLACalculator { synchronized (slaCalc) { boolean change = false; // get eventProcessed on DB for validation in HA - Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).getSingleValue( - SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId); - byte eventProc = ((Byte) eventProcObj).byteValue(); + SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( + SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); + byte eventProc = summaryBean.getEventProcessed(); if (eventProc >= 7) { if (eventProc == 7) { historySet.add(jobId); @@ -488,6 +502,12 @@ public class SLACalculatorMemory implements SLACalculator { LOG.trace("Removed Job [{0}] from map as SLA processed", jobId); } else { + if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { + //Update last modified time. + slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); + reloadExpectedTimeAndConfig(slaCalc); + LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); + } slaCalc.setEventProcessed(eventProc); SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); // calculation w.r.t current time and status @@ -499,7 +519,9 @@ public class SLACalculatorMemory implements SLACalculator { if (eventProc != 8 && (eventProc & 1) == 0) { // Some DB exception slaCalc.setEventStatus(EventStatus.START_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc++; } change = true; @@ -525,7 +547,9 @@ public class SLACalculatorMemory implements SLACalculator { if (eventProc != 8 && ((eventProc >> 1) & 1) == 0) { // Some DB exception slaCalc.setEventStatus(EventStatus.DURATION_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc += 2; } change = true; @@ -552,26 +576,16 @@ public class SLACalculatorMemory implements SLACalculator { // Should not be > 8. But to handle any corner cases slaCalc.setEventProcessed(8); slaMap.remove(jobId); + LOG.trace("Removed Job [{0}] from map after Event-processed=8", jobId); } else { slaCalc.setEventProcessed(eventProc); } - SLASummaryBean slaSummaryBean = new SLASummaryBean(); - slaSummaryBean.setId(slaCalc.getId()); - slaSummaryBean.setEventProcessed(eventProc); - slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); - slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); - slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); - slaSummaryBean.setActualStart(slaCalc.getActualStart()); - slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); - slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); - slaSummaryBean.setLastModifiedTime(new Date()); - SLASummaryQueryExecutor.getInstance().executeUpdate( - SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, slaSummaryBean); + writetoDB(slaCalc, eventProc); if (eventProc == 7) { historySet.add(jobId); slaMap.remove(jobId); - LOG.trace("Removed Job [{0}] from map after End-processed", jobId); + LOG.trace("Removed Job [{0}] from map after Event-processed=7", jobId); } } } @@ -586,6 +600,48 @@ public class SLACalculatorMemory implements SLACalculator { } } + private void writetoDB(SLACalcStatus slaCalc, byte eventProc) throws JPAExecutorException { + SLASummaryBean slaSummaryBean = new SLASummaryBean(); + slaSummaryBean.setId(slaCalc.getId()); + slaSummaryBean.setEventProcessed(eventProc); + slaSummaryBean.setSLAStatus(slaCalc.getSLAStatus()); + slaSummaryBean.setEventStatus(slaCalc.getEventStatus()); + slaSummaryBean.setActualEnd(slaCalc.getActualEnd()); + slaSummaryBean.setActualStart(slaCalc.getActualStart()); + slaSummaryBean.setActualDuration(slaCalc.getActualDuration()); + slaSummaryBean.setJobStatus(slaCalc.getJobStatus()); + slaSummaryBean.setLastModifiedTime(new Date()); + + SLASummaryQueryExecutor.getInstance().executeUpdate(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_STATUS_ACTUAL_TIMES, + slaSummaryBean); + LOG.trace("Stored SLA SummaryBean Job [{0}] with Event-processed=[{1}]", slaCalc.getId(), + slaSummaryBean.getEventProcessed()); + } + + @SuppressWarnings("rawtypes") + private void updateDBSlaConfig(SLACalcStatus slaCalc, List<UpdateEntry> updateList) + throws JPAExecutorException { + updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_CONFIG, slaCalc.getSLARegistrationBean())); + slaCalc.setLastModifiedTime(new Date()); + updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, new SLASummaryBean(slaCalc))); + } + + @SuppressWarnings("rawtypes") + private void updateDBSlaExpectedValues(SLACalcStatus slaCalc, List<UpdateEntry> updateList) + throws JPAExecutorException { + slaCalc.setLastModifiedTime(new Date()); + updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_EXPECTED_VALUE, slaCalc + .getSLARegistrationBean())); + updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, + new SLASummaryBean(slaCalc))); + } + + @SuppressWarnings("rawtypes") + private void executeBatchQuery(List<UpdateEntry> updateList) throws JPAExecutorException { + BatchQueryExecutor.getInstance().executeBatchInsertUpdateDelete(null, updateList, null); + } + + /** * Periodically run by the SLAService worker threads to update SLA status by * iterating through all the jobs in the map @@ -673,6 +729,8 @@ public class SLACalculatorMemory implements SLACalculator { slaCalc.setSLAStatus(SLAStatus.NOT_STARTED); slaCalc.setJobStatus(getJobStatus(reg.getAppType())); slaMap.put(jobId, slaCalc); + + @SuppressWarnings("rawtypes") List<UpdateEntry> updateList = new ArrayList<UpdateEntry>(); updateList.add(new UpdateEntry<SLARegQuery>(SLARegQuery.UPDATE_SLA_REG_ALL, reg)); updateList.add(new UpdateEntry<SLASummaryQuery>(SLASummaryQuery.UPDATE_SLA_SUMMARY_ALL, @@ -758,9 +816,17 @@ public class SLACalculatorMemory implements SLACalculator { locked = slaCalc.isLocked(); if (locked) { // get eventProcessed on DB for validation in HA - Object eventProcObj = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()) - .getSingleValue(SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED, jobId); - byte eventProc = ((Byte) eventProcObj).byteValue(); + SLASummaryBean summaryBean = ((SLASummaryQueryExecutor) SLASummaryQueryExecutor.getInstance()).get( + SLASummaryQuery.GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED, jobId); + byte eventProc = summaryBean.getEventProcessed(); + + if (!slaCalc.getLastModifiedTime().equals(summaryBean.getLastModifiedTime())) { + //Update last modified time. + slaCalc.setLastModifiedTime(summaryBean.getLastModifiedTime()); + reloadExpectedTimeAndConfig(slaCalc); + LOG.debug("Last modified time has changed for job " + jobId + " reloading config from DB"); + } + slaCalc.setEventProcessed(eventProc); slaCalc.setJobStatus(jobStatus); switch (jobEventStatus) { @@ -824,7 +890,9 @@ public class SLACalculatorMemory implements SLACalculator { else { slaCalc.setEventStatus(EventStatus.START_MET); } - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } eventProc += 1; slaCalc.setEventProcessed(eventProc); @@ -869,7 +937,9 @@ public class SLACalculatorMemory implements SLACalculator { } eventProc += 4; slaCalc.setEventProcessed(eventProc); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } return getSLASummaryBean(slaCalc); } @@ -891,7 +961,9 @@ public class SLACalculatorMemory implements SLACalculator { if (slaCalc.getEventProcessed() < 4) { slaCalc.setEventStatus(EventStatus.END_MISS); slaCalc.setSLAStatus(SLAStatus.MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } slaCalc.setEventProcessed(7); return getSLASummaryBean(slaCalc); } @@ -905,7 +977,9 @@ public class SLACalculatorMemory implements SLACalculator { if (((eventProc >> 1) & 1) == 0) { if (expectedDuration != -1) { slaCalc.setEventStatus(EventStatus.DURATION_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } eventProc += 2; slaCalc.setEventProcessed(eventProc); @@ -915,7 +989,9 @@ public class SLACalculatorMemory implements SLACalculator { slaCalc.setSLAStatus(SLAStatus.MISS); eventProc += 4; slaCalc.setEventProcessed(eventProc); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } return getSLASummaryBean(slaCalc); } @@ -934,13 +1010,16 @@ public class SLACalculatorMemory implements SLACalculator { } private void processDurationSLA(long expected, long actual, SLACalcStatus slaCalc) { - if (expected != -1 && actual > expected) { - slaCalc.setEventStatus(EventStatus.DURATION_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); - } - else if (expected != -1 && actual <= expected) { - slaCalc.setEventStatus(EventStatus.DURATION_MET); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (expected != -1) { + if (actual > expected) { + slaCalc.setEventStatus(EventStatus.DURATION_MISS); + } + else if (actual <= expected) { + slaCalc.setEventStatus(EventStatus.DURATION_MET); + } + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } } @@ -1016,7 +1095,9 @@ public class SLACalculatorMemory implements SLACalculator { else { slaCalc.setEventStatus(EventStatus.START_MET); } - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } slaCalc.setActualDuration(slaCalc.getActualEnd().getTime() - slaCalc.getActualStart().getTime()); if (((eventProc >> 1) & 1) == 0) { @@ -1030,7 +1111,9 @@ public class SLACalculatorMemory implements SLACalculator { else { slaCalc.setEventStatus(EventStatus.END_MET); } - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } } slaCalc.setEventProcessed(8); } @@ -1046,12 +1129,16 @@ public class SLACalculatorMemory implements SLACalculator { else { slaCalc.setEventStatus(EventStatus.START_MET); } - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc++; } else if (slaCalc.getExpectedStart().getTime() < System.currentTimeMillis()) { slaCalc.setEventStatus(EventStatus.START_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc++; } } @@ -1059,14 +1146,18 @@ public class SLACalculatorMemory implements SLACalculator { && slaCalc.getExpectedDuration() != -1) { if (System.currentTimeMillis() - slaCalc.getActualStart().getTime() > slaCalc.getExpectedDuration()) { slaCalc.setEventStatus(EventStatus.DURATION_MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc += 2; } } if (eventProc < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { slaCalc.setEventStatus(EventStatus.END_MISS); slaCalc.setSLAStatus(SLAStatus.MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } eventProc += 4; } slaCalc.setEventProcessed(eventProc); @@ -1078,12 +1169,36 @@ public class SLACalculatorMemory implements SLACalculator { if (slaCalc.getEventProcessed() < 4 && slaCalc.getExpectedEnd().getTime() < System.currentTimeMillis()) { slaCalc.setEventStatus(EventStatus.END_MISS); slaCalc.setSLAStatus(SLAStatus.MISS); - eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + if (shouldAlert(slaCalc)) { + eventHandler.queueEvent(new SLACalcStatus(slaCalc)); + } slaCalc.setEventProcessed(slaCalc.getEventProcessed() + 4); } } } + public void reloadExpectedTimeAndConfig(SLACalcStatus slaCalc) throws JPAExecutorException { + SLARegistrationBean regBean = SLARegistrationQueryExecutor.getInstance().get( + SLARegQuery.GET_SLA_EXPECTED_VALUE_CONFIG, slaCalc.getId()); + + if (regBean.getExpectedDuration() > 0) { + slaCalc.getSLARegistrationBean().setExpectedDuration(regBean.getExpectedDuration()); + } + if (regBean.getExpectedEnd() != null) { + slaCalc.getSLARegistrationBean().setExpectedEnd(regBean.getExpectedEnd()); + } + if (regBean.getExpectedStart() != null) { + slaCalc.getSLARegistrationBean().setExpectedStart(regBean.getExpectedStart()); + } + if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { + slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, + regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); + } + if (regBean.getNominalTime() != null) { + slaCalc.getSLARegistrationBean().setNominalTime(regBean.getNominalTime()); + } + } + @VisibleForTesting public boolean isJobIdInSLAMap(String jobId) { return this.slaMap.containsKey(jobId); @@ -1097,4 +1212,99 @@ public class SLACalculatorMemory implements SLACalculator { private void setLogPrefix(String jobId) { LOG = LogUtils.setLogInfo(LOG, jobId, null, null); } + + @Override + public boolean enableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { + boolean isJobFound = false; + @SuppressWarnings("rawtypes") + List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); + for (String jobId : jobIds) { + SLACalcStatus slaCalc = getSLACalcStatus(jobId); + if (slaCalc != null) { + slaCalc.getSLARegistrationBean().removeFromSLAConfigMap(OozieClient.SLA_DISABLE_ALERT); + updateDBSlaConfig(slaCalc, updateList); + isJobFound = true; + } + } + executeBatchQuery(updateList); + return isJobFound; + } + + @Override + public boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { + return enableAlert(getSLAJobsforParents(parentJobIds)); + } + + + @Override + public boolean disableAlert(List<String> jobIds) throws JPAExecutorException, ServiceException { + boolean isJobFound = false; + @SuppressWarnings("rawtypes") + List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); + + for (String jobId : jobIds) { + SLACalcStatus slaCalc = getSLACalcStatus(jobId); + if (slaCalc != null) { + slaCalc.getSLARegistrationBean().addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(true)); + updateDBSlaConfig(slaCalc, updateList); + isJobFound = true; + } + } + executeBatchQuery(updateList); + return isJobFound; + } + + @Override + public boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException { + return disableAlert(getSLAJobsforParents(parentJobIds)); + } + + @Override + public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException, + ServiceException{ + boolean isJobFound = false; + @SuppressWarnings("rawtypes") + List<UpdateEntry> updateList = new ArrayList<BatchQueryExecutor.UpdateEntry>(); + for (Pair<String, Map<String,String>> jobIdSLAPair : jobIdsSLAPair) { + SLACalcStatus slaCalc = getSLACalcStatus(jobIdSLAPair.getFist()); + if (slaCalc != null) { + updateParams(slaCalc, jobIdSLAPair.getSecond()); + updateDBSlaExpectedValues(slaCalc, updateList); + isJobFound = true; + } + } + executeBatchQuery(updateList); + return isJobFound; + } + + private void updateParams(SLACalcStatus slaCalc, Map<String, String> newParams) throws ServiceException { + SLARegistrationBean reg = slaCalc.getSLARegistrationBean(); + if (newParams != null) { + try { + Date newNominal = SLAOperations.setNominalTime(newParams.get(RestConstants.SLA_NOMINAL_TIME), reg); + SLAOperations.setExpectedStart(newParams.get(RestConstants.SLA_SHOULD_START), newNominal, reg); + SLAOperations.setExpectedEnd(newParams.get(RestConstants.SLA_SHOULD_END), newNominal, reg); + SLAOperations.setExpectedDuration(newParams.get(RestConstants.SLA_MAX_DURATION), reg); + } + catch (CommandException ce) { + throw new ServiceException(ce); + } + } + } + + private boolean shouldAlert(SLACalcStatus slaObj) { + return !slaObj.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT); + } + + private List<String> getSLAJobsforParents(List<String> parentJobIds) throws JPAExecutorException{ + List<String> childJobIds = new ArrayList<String>(); + for (String jobId : parentJobIds) { + List<SLARegistrationBean> registrationBeanList = SLARegistrationQueryExecutor.getInstance().getList( + SLARegQuery.GET_SLA_REG_FOR_PARENT_ID, jobId); + for (SLARegistrationBean bean : registrationBeanList) { + childJobIds.add(bean.getId()); + } + } + return childJobIds; + } }
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLAOperations.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java index f5fc826..3905003 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLAOperations.java +++ b/core/src/main/java/org/apache/oozie/sla/SLAOperations.java @@ -23,15 +23,14 @@ import java.util.Date; import org.apache.oozie.AppType; import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.event.SLAEvent.EventStatus; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; -import org.apache.oozie.service.JPAService; import org.apache.oozie.service.ServiceException; import org.apache.oozie.service.Services; -import org.apache.oozie.sla.SLARegistrationBean; import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.XLog; @@ -40,14 +39,20 @@ import org.jdom.Element; public class SLAOperations { - private static final String NOMINAL_TIME = "nominal-time"; - private static final String SHOULD_START = "should-start"; - private static final String SHOULD_END = "should-end"; - private static final String MAX_DURATION = "max-duration"; - private static final String ALERT_EVENTS = "alert-events"; + public static final String NOMINAL_TIME = "nominal-time"; + public static final String SHOULD_START = "should-start"; + public static final String SHOULD_END = "should-end"; + public static final String MAX_DURATION = "max-duration"; + public static final String ALERT_EVENTS = "alert-events"; + public static final String ALL_VALUE = "ALL"; + + + static public XLog LOG = XLog.getLog(SLAOperations.class); + public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId, - AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException { + AppType appType, String user, String appName, XLog log, boolean rerun, boolean disableAlert) + throws CommandException { if (eSla == null || !SLAService.isEnabled()) { log.debug("Not registering SLA for job [{0}]. Sla-Xml null OR SLAService not enabled", jobId); return null; @@ -56,56 +61,19 @@ public class SLAOperations { // Setting nominal time String strNominalTime = getTagElement(eSla, NOMINAL_TIME); - if (strNominalTime == null || strNominalTime.length() == 0) { - throw new CommandException(ErrorCode.E1101, NOMINAL_TIME); - } - Date nominalTime; - try { - nominalTime = DateUtils.parseDateOozieTZ(strNominalTime); - sla.setNominalTime(nominalTime); - } - catch (ParseException pex) { - throw new CommandException(ErrorCode.E0302, strNominalTime, pex); - } + Date nominalTime = setNominalTime(strNominalTime, sla); // Setting expected start time String strExpectedStart = getTagElement(eSla, SHOULD_START); - if (strExpectedStart != null) { - float expectedStart = Float.parseFloat(strExpectedStart); - if (expectedStart < 0) { - throw new CommandException(ErrorCode.E0302, strExpectedStart, "for SLA Expected start time"); - } - else { - Date expectedStartTime = new Date(nominalTime.getTime() + (long) (expectedStart * 60 * 1000)); - sla.setExpectedStart(expectedStartTime); - } - } + setExpectedStart(strExpectedStart, nominalTime, sla); // Setting expected end time String strExpectedEnd = getTagElement(eSla, SHOULD_END); - if (strExpectedEnd == null || strExpectedEnd.length() == 0) { - throw new CommandException(ErrorCode.E1101, SHOULD_END); - } - float expectedEnd = Float.parseFloat(strExpectedEnd); - if (expectedEnd < 0) { - throw new CommandException(ErrorCode.E0302, strExpectedEnd, "for SLA Expected end time"); - } - else { - Date expectedEndTime = new Date(nominalTime.getTime() + (long) (expectedEnd * 60 * 1000)); - sla.setExpectedEnd(expectedEndTime); - } + setExpectedEnd(strExpectedEnd, nominalTime, sla); // Setting expected duration in milliseconds String expectedDurationStr = getTagElement(eSla, MAX_DURATION); - if (expectedDurationStr != null && expectedDurationStr.length() > 0) { - float expectedDuration = Float.parseFloat(expectedDurationStr); - if (expectedDuration > 0) { - sla.setExpectedDuration((long) (expectedDuration * 60 * 1000)); - } - } - else if (sla.getExpectedStart() != null) { - sla.setExpectedDuration(sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime()); - } + setExpectedDuration(expectedDurationStr, sla); // Parse desired alert-types i.e. start-miss, end-miss, start-met etc.. String alertEvents = getTagElement(eSla, ALERT_EVENTS); @@ -134,6 +102,10 @@ public class SLAOperations { sla.setAlertContact(getTagElement(eSla, "alert-contact")); sla.setUpstreamApps(getTagElement(eSla, "upstream-apps")); + //disable Alert flag in slaConfig + if (disableAlert) { + sla.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, Boolean.toString(disableAlert)); + } // Oozie defined sla.setId(jobId); sla.setAppType(appType); @@ -158,6 +130,68 @@ public class SLAOperations { return sla; } + public static Date setNominalTime(String strNominalTime, SLARegistrationBean sla) throws CommandException { + if (strNominalTime == null || strNominalTime.length() == 0) { + return sla.getNominalTime(); + } + Date nominalTime; + try { + nominalTime = DateUtils.parseDateOozieTZ(strNominalTime); + sla.setNominalTime(nominalTime); + } + catch (ParseException pex) { + throw new CommandException(ErrorCode.E0302, strNominalTime, pex); + } + return nominalTime; + } + + public static void setExpectedStart(String strExpectedStart, Date nominalTime, SLARegistrationBean sla) + throws CommandException { + if (strExpectedStart != null) { + float expectedStart = Float.parseFloat(strExpectedStart); + if (expectedStart < 0) { + throw new CommandException(ErrorCode.E0302, strExpectedStart, "for SLA Expected start time"); + } + else { + Date expectedStartTime = new Date(nominalTime.getTime() + (long) (expectedStart * 60 * 1000)); + sla.setExpectedStart(expectedStartTime); + LOG.debug("Setting expected start to " + expectedStartTime + " for job " + sla.getId()); + } + } + } + + public static void setExpectedEnd(String strExpectedEnd, Date nominalTime, SLARegistrationBean sla) + throws CommandException { + if (strExpectedEnd != null) { + float expectedEnd = Float.parseFloat(strExpectedEnd); + if (expectedEnd < 0) { + throw new CommandException(ErrorCode.E0302, strExpectedEnd, "for SLA Expected end time"); + } + else { + Date expectedEndTime = new Date(nominalTime.getTime() + (long) (expectedEnd * 60 * 1000)); + sla.setExpectedEnd(expectedEndTime); + LOG.debug("Setting expected end to " + expectedEndTime + " for job " + sla.getId()); + + } + } + } + + public static void setExpectedDuration(String expectedDurationStr, SLARegistrationBean sla) { + if (expectedDurationStr != null && expectedDurationStr.length() > 0) { + float expectedDuration = Float.parseFloat(expectedDurationStr); + if (expectedDuration > 0) { + long duration = (long) (expectedDuration * 60 * 1000); + LOG.debug("Setting expected duration to " + duration + " for job " + sla.getId()); + sla.setExpectedDuration(duration); + } + } + else if (sla.getExpectedStart() != null) { + long duration = sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime(); + LOG.debug("Setting expected duration to " + duration + " for job " + sla.getId()); + sla.setExpectedDuration(sla.getExpectedEnd().getTime() - sla.getExpectedStart().getTime()); + } + } + /** * Retrieve registration event * @param jobId the jobId @@ -165,7 +199,6 @@ public class SLAOperations { * @throws JPAExecutorException */ public static void updateRegistrationEvent(String jobId) throws CommandException, JPAExecutorException { - JPAService jpaService = Services.get().get(JPAService.class); SLAService slaService = Services.get().get(SLAService.class); try { SLARegistrationBean reg = SLARegistrationQueryExecutor.getInstance().get(SLARegQuery.GET_SLA_REG_ALL, jobId); @@ -203,7 +236,15 @@ public class SLAOperations { return createSlaRegistrationEvent(eSla, jobId, null, appType, user, null, log, false); } - private static String getTagElement(Element elem, String tagName) { + /* + * default disableAlert flag + */ + public static SLARegistrationBean createSlaRegistrationEvent(Element eSla, String jobId, String parentId, + AppType appType, String user, String appName, XLog log, boolean rerun) throws CommandException { + return createSlaRegistrationEvent(eSla, jobId, null, appType, user, appName, log, rerun, false); + } + + public static String getTagElement(Element elem, String tagName) { if (elem != null && elem.getChild(tagName, elem.getNamespace("sla")) != null) { return elem.getChild(tagName, elem.getNamespace("sla")).getText().trim(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java b/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java index 0770bd3..1b8370f 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java +++ b/core/src/main/java/org/apache/oozie/sla/SLARegistrationBean.java @@ -33,7 +33,6 @@ import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.Transient; - import org.apache.oozie.AppType; import org.apache.oozie.client.event.Event.MessageType; import org.apache.oozie.client.rest.JsonBean; @@ -48,9 +47,21 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_SLA_REG_ALL", query = "update SLARegistrationBean w set w.jobId = :jobId, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.slaConfig = :slaConfig, w.notificationMsg = :notificationMsg, w.upstreamApps = :upstreamApps, w.appType = :appType, w.appName = :appName, w.user = :user, w.parentId = :parentId, w.jobData = :jobData where w.jobId = :jobId"), + @NamedQuery(name = "UPDATE_SLA_CONFIG", query = "update SLARegistrationBean w set w.slaConfig = :slaConfig where w.jobId = :jobId"), + + @NamedQuery(name = "UPDATE_SLA_EXPECTED_VALUE", query = "update SLARegistrationBean w set w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime , w.expectedDuration = :expectedDuration where w.jobId = :jobId"), + @NamedQuery(name = "GET_SLA_REG_ON_RESTART", query = "select w.notificationMsg, w.upstreamApps, w.slaConfig, w.jobData from SLARegistrationBean w where w.jobId = :id"), - @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id") }) + @NamedQuery(name = "GET_SLA_REG_ALL", query = "select OBJECT(w) from SLARegistrationBean w where w.jobId = :id"), + + @NamedQuery(name = "GET_SLA_CONFIGS", query = "select w.jobId, w.slaConfig from SLARegistrationBean w where w.jobId IN (:ids)"), + + @NamedQuery(name = "GET_SLA_EXPECTED_VALUE_CONFIG", query = "select w.jobId, w.slaConfig, w.expectedStartTS, w.expectedEndTS, w.expectedDuration, w.nominalTimeTS from SLARegistrationBean w where w.jobId = :id"), + + @NamedQuery(name = "GET_SLA_REG_FOR_PARENT_ID", query = "select w.jobId, w.slaConfig from SLARegistrationBean w where w.parentId = :parentId") + }) + public class SLARegistrationBean implements JsonBean { @Id @@ -281,10 +292,21 @@ public class SLARegistrationBean implements JsonBean { slaConfig = slaConfigMapToString(); } - public Map<String, String> getSlaConfigMap() { + + public Map<String, String> getSLAConfigMap() { return slaConfigMap; } + public void addToSLAConfigMap(String key, String value) { + slaConfigMap.put(key, value); + slaConfig = slaConfigMapToString(); + } + + public void removeFromSLAConfigMap(String key) { + slaConfigMap.remove(key); + slaConfig = slaConfigMapToString(); + } + private void slaConfigStringToMap() { if (slaConfig != null) { String[] splitString = slaConfig.split("},"); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java index 9907dd0..a88dcf6 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java +++ b/core/src/main/java/org/apache/oozie/sla/SLASummaryBean.java @@ -21,6 +21,7 @@ package org.apache.oozie.sla; import java.sql.Timestamp; import java.util.Date; import java.util.List; +import java.util.Map; import javax.persistence.Basic; import javax.persistence.Column; @@ -31,6 +32,7 @@ import javax.persistence.NamedQuery; import javax.persistence.Table; import org.apache.oozie.AppType; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.client.rest.JsonBean; import org.apache.oozie.client.rest.JsonTags; @@ -50,15 +52,22 @@ import org.json.simple.JSONObject; @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed, w.actualStartTS = :actualStartTS, w.actualEndTS = :actualEndTS, w.actualEndTS = :actualEndTS, w.actualDuration = :actualDuration, w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"), + @NamedQuery(name = "UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES", query = "update SLASummaryBean w set w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration , w.lastModifiedTS = :lastModTime where w.jobId = :jobId"), + @NamedQuery(name = "UPDATE_SLA_SUMMARY_EVENTPROCESSED", query = "update SLASummaryBean w set w.eventProcessed = :eventProcessed where w.jobId = :jobId"), + @NamedQuery(name = "UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME", query = "update SLASummaryBean w set w.lastModifiedTS = :lastModifiedTS where w.jobId = :jobId"), + @NamedQuery(name = "UPDATE_SLA_SUMMARY_ALL", query = "update SLASummaryBean w set w.jobId = :jobId, w.appName = :appName, w.appType = :appType, w.nominalTimeTS = :nominalTime, w.expectedStartTS = :expectedStartTime, w.expectedEndTS = :expectedEndTime, w.expectedDuration = :expectedDuration, w.jobStatus = :jobStatus, w.slaStatus = :slaStatus, w.eventStatus = :eventStatus, w.lastModifiedTS = :lastModTime, w.user = :user, w.parentId = :parentId, w.eventProcessed = :eventProcessed, w.actualDuration = :actualDuration, w.actualEndTS = :actualEndTS, w.actualStartTS = :actualStartTS where w.jobId = :jobId"), @NamedQuery(name = "GET_SLA_SUMMARY", query = "select OBJECT(w) from SLASummaryBean w where w.jobId = :id"), @NamedQuery(name = "GET_SLA_SUMMARY_RECORDS_RESTART", query = "select OBJECT(w) from SLASummaryBean w where w.eventProcessed <= 7 AND w.lastModifiedTS >= :lastModifiedTime"), - @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id") + @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED", query = "select w.eventProcessed from SLASummaryBean w where w.jobId = :id"), + + @NamedQuery(name = "GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED", query = "select w.eventProcessed, w.lastModifiedTS from SLASummaryBean w where w.jobId = :id") + }) /** @@ -431,6 +440,7 @@ public class SLASummaryBean implements JsonBean { json.put(JsonTags.SLA_SUMMARY_JOB_STATUS, jobStatus); json.put(JsonTags.SLA_SUMMARY_SLA_STATUS, slaStatus); json.put(JsonTags.SLA_SUMMARY_LAST_MODIFIED, JsonUtils.formatDateRfc822(lastModifiedTS, timeZoneId)); + return json; } } @@ -455,4 +465,25 @@ public class SLASummaryBean implements JsonBean { return json; } + @SuppressWarnings("unchecked") + public static JSONObject toJSONObject(List<? extends SLASummaryBean> slaSummaryList, + Map<String, Map<String, String>> slaConfigMap, String timeZoneId) { + JSONObject json = new JSONObject(); + JSONArray array = new JSONArray(); + if (slaSummaryList != null) { + for (SLASummaryBean summary : slaSummaryList) { + JSONObject slaJson = summary.toJSONObject(timeZoneId); + String slaAlertStatus = ""; + if (slaConfigMap.containsKey(summary.getId())) { + slaAlertStatus = slaConfigMap.get(summary.getId()).containsKey( + OozieClient.SLA_DISABLE_ALERT) ? "Disabled" : "Enabled"; + } + slaJson.put(JsonTags.SLA_ALERT_STATUS, slaAlertStatus); + array.add(slaJson); + } + } + json.put(JsonTags.SLA_SUMMARY_LIST, array); + return json; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/service/SLAService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java index a4562e7..ef1d335 100644 --- a/core/src/main/java/org/apache/oozie/sla/service/SLAService.java +++ b/core/src/main/java/org/apache/oozie/sla/service/SLAService.java @@ -19,6 +19,8 @@ package org.apache.oozie.sla.service; import java.util.Date; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; @@ -33,6 +35,7 @@ import org.apache.oozie.service.Services; import org.apache.oozie.sla.SLACalculator; import org.apache.oozie.sla.SLACalculatorMemory; import org.apache.oozie.sla.SLARegistrationBean; +import org.apache.oozie.util.Pair; import org.apache.oozie.util.XLog; import com.google.common.annotations.VisibleForTesting; @@ -107,7 +110,6 @@ public class SLAService implements Service { return calcImpl; } - @VisibleForTesting public void runSLAWorker() { new SLAWorker(calcImpl).run(); } @@ -181,4 +183,94 @@ public class SLAService implements Service { calcImpl.removeRegistration(jobId); } + /** + * Enable jobs sla alert. + * + * @param jobIds the job ids + * @param isParentJob, if jobIds are parent job + * @return true, if successful + * @throws ServiceException the service exception + */ + public boolean enableAlert(List<String> jobIds) throws ServiceException { + try { + return calcImpl.enableAlert(jobIds); + } + catch (JPAExecutorException jpe) { + LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); + throw new ServiceException(jpe); + } + } + + /** + * Enable child jobs sla alert. + * + * @param jobIds the parent job ids + * @param isParentJob, if jobIds are parent job + * @return true, if successful + * @throws ServiceException the service exception + */ + public boolean enableChildJobAlert(List<String> parentJobIds) throws ServiceException { + try { + return calcImpl.enableChildJobAlert(parentJobIds); + } + catch (JPAExecutorException jpe) { + LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); + throw new ServiceException(jpe); + } + } + + /** + * Disable jobs Sla alert. + * + * @param jobIds the job ids + * @param isParentJob, if jobIds are parent job + * @return true, if successful + * @throws ServiceException the service exception + */ + public boolean disableAlert(List<String> jobIds) throws ServiceException { + try { + return calcImpl.disableAlert(jobIds); + } + catch (JPAExecutorException jpe) { + LOG.error("Exception while updating SLA alerting for Job [{0}]", jobIds.get(0)); + throw new ServiceException(jpe); + } + } + + /** + * Disable child jobs Sla alert. + * + * @param jobIds the parent job ids + * @param isParentJob, if jobIds are parent job + * @return true, if successful + * @throws ServiceException the service exception + */ + public boolean disableChildJobAlert(List<String> parentJobIds) throws ServiceException { + try { + return calcImpl.disableChildJobAlert(parentJobIds); + } + catch (JPAExecutorException jpe) { + LOG.error("Exception while updating SLA alerting for Job [{0}]", parentJobIds.get(0)); + throw new ServiceException(jpe); + } + } + + /** + * Change jobs Sla definitions + * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition. + * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration. + * + * @param jobIdsSLAPair the job ids sla pair + * @return true, if successful + * @throws ServiceException the service exception + */ + public boolean changeDefinition(List<Pair<String, Map<String, String>>> idSlaDefinitionList) + throws ServiceException { + try { + return calcImpl.changeDefinition(idSlaDefinitionList); + } + catch (JPAExecutorException jpe) { + throw new ServiceException(jpe); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java index 7c2620c..1c565ef 100644 --- a/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java +++ b/core/src/main/java/org/apache/oozie/util/CoordActionsInDateRange.java @@ -30,12 +30,11 @@ import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionModifiedDateForRangeJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetActionIdsForDateRangeJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionRunningCountForRangeJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetActionsByDatesForKillJPAExecutor; -import org.apache.oozie.executor.jpa.CoordJobGetActionsForDatesJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; @@ -139,10 +138,13 @@ public class CoordActionsInDateRange { throw new XException(ErrorCode.E0308, "'" + range + "'. Start date '" + start + "' is older than end date: '" + end + "'"); } - List<String> list = null; - JPAService jpaService = Services.get().get(JPAService.class); - list = jpaService.execute(new CoordJobGetActionIdsForDateRangeJPAExecutor(jobId, start, end)); - return list; + List<CoordinatorActionBean> listOfActions = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, start, end); + List<String> idsList = new ArrayList<String>(); + for ( CoordinatorActionBean bean : listOfActions){ + idsList.add(bean.getId()); + } + return idsList; } /** @@ -156,12 +158,13 @@ public class CoordActionsInDateRange { private static List<CoordinatorActionBean> getActionsFromDateRange(String jobId, Date start, Date end, boolean active) throws XException { List<CoordinatorActionBean> list; - JPAService jpaService = Services.get().get(JPAService.class); if (!active) { - list = jpaService.execute(new CoordJobGetActionsForDatesJPAExecutor(jobId, start, end)); + list = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, start, end); } else { - list = jpaService.execute(new CoordJobGetActionsByDatesForKillJPAExecutor(jobId, start, end)); + list = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_ACTIVE_ACTIONS_FOR_DATES, jobId, start, end); } return list; } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 6f76b07..b40fec0 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -2211,6 +2211,15 @@ </description> </property> + <property> + <name>oozie.sla.disable.alerts.older.than</name> + <value>48</value> + <description> + Time threshold, in HOURS, for disabling SLA alerting for jobs whose + nominal time is older than this. + </description> + </property> + <!-- ZooKeeper configuration --> <property> <name>oozie.zookeeper.connection.string</name> http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java b/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java new file mode 100644 index 0000000..ce59885 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/command/TestSLAAlertXCommand.java @@ -0,0 +1,300 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import java.io.StringReader; +import java.util.Date; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.AppType; +import org.apache.oozie.BaseEngineException; +import org.apache.oozie.BundleEngine; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.CoordinatorEngine; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.Job; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.ConfigurationService; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.sla.SLACalculatorMemory; +import org.apache.oozie.sla.SLAOperations; +import org.apache.oozie.sla.SLARegistrationBean; +import org.apache.oozie.sla.service.SLAService; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.XConfiguration; + +public class TestSLAAlertXCommand extends XDataTestCase { + private Services services; + SLACalculatorMemory slaCalcMemory; + BundleJobBean bundle; + CoordinatorJobBean coord1, coord2; + final BundleEngine bundleEngine = new BundleEngine("u"); + Date startTime; + final Date endTime = new Date(System.currentTimeMillis() + 1 * 1 * 3600 * 1000); + final int timeInSec = 60 * 1000; + final String data = "2014-01-01T00:00Z"; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + Configuration conf = services.get(ConfigurationService.class).getConf(); + conf.set(Services.CONF_SERVICE_EXT_CLASSES, "org.apache.oozie.service.EventHandlerService," + + "org.apache.oozie.sla.service.SLAService"); + conf.setInt(SLAService.CONF_SLA_CHECK_INTERVAL, 600); + services.init(); + + } + + @Override + protected void tearDown() throws Exception { + LocalOozie.stop(); + services.destroy(); + super.tearDown(); + } + + public void testBundleSLAAlertCommands() throws Exception { + setupSLAJobs(); + String jobIdsStr = bundle.getId(); + String actions = "1,2"; + String coords = null; + bundleEngine.disableSLAAlert(jobIdsStr, actions, null, coords); + checkSLAStatus(coord1.getId() + "@1", true); + checkSLAStatus(coord1.getId() + "@2", true); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@5", false); + checkSLAStatus(coord1.getId() + "@4", false); + checkSLAStatus(coord2.getId() + "@1", true); + checkSLAStatus(coord2.getId() + "@1", true); + + bundleEngine.enableSLAAlert(jobIdsStr, null, null, null); + checkSLAStatus(coord1.getId() + "@1", false); + checkSLAStatus(coord1.getId() + "@2", false); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@5", false); + checkSLAStatus(coord1.getId() + "@4", false); + checkSLAStatus(coord2.getId() + "@1", false); + checkSLAStatus(coord2.getId() + "@2", false); + + CoordinatorJobBean job1 = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coord1.getId()); + XConfiguration xConf = new XConfiguration(new StringReader(job1.getConf())); + assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null); + + CoordinatorJobBean job2 = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coord2.getId()); + xConf = new XConfiguration(new StringReader(job2.getConf())); + assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null); + + bundleEngine.disableSLAAlert(jobIdsStr, null, null, "coord1"); + checkSLAStatus(coord1.getId() + "@1", true); + checkSLAStatus(coord1.getId() + "@2", true); + checkSLAStatus(coord1.getId() + "@3", true); + checkSLAStatus(coord1.getId() + "@4", true); + checkSLAStatus(coord1.getId() + "@5", true); + checkSLAStatus(coord2.getId() + "@1", false); + checkSLAStatus(coord2.getId() + "@2", false); + + job1 = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, + coord1.getId()); + xConf = new XConfiguration(new StringReader(job1.getConf())); + assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), SLAOperations.ALL_VALUE); + bundleEngine.disableSLAAlert(jobIdsStr, null, null, "coord2"); + // with multiple coordID. + + String dates = "2014-01-01T00:00Z::2014-01-03T00:00Z"; + bundleEngine.enableSLAAlert(jobIdsStr, null, dates, "coord1," + coord2.getId()); + checkSLAStatus(coord1.getId() + "@1", false); + checkSLAStatus(coord1.getId() + "@2", false); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@4", true); + checkSLAStatus(coord1.getId() + "@5", true); + checkSLAStatus(coord2.getId() + "@1", false); + checkSLAStatus(coord2.getId() + "@2", false); + checkSLAStatus(coord2.getId() + "@3", false); + checkSLAStatus(coord2.getId() + "@4", true); + + try { + bundleEngine.disableSLAAlert(jobIdsStr, null, null, "dummy"); + fail("Should throw Exception"); + } + catch (BaseEngineException e) { + assertEquals(e.getErrorCode(), ErrorCode.E1026); + } + + } + + public void testSLAChangeCommand() throws Exception { + setupSLAJobs(); + String newParams = RestConstants.SLA_SHOULD_END + "=10"; + String jobIdsStr = bundle.getId(); + String coords = coord1.getAppName(); + bundleEngine.changeSLA(jobIdsStr, null, null, coords, newParams); + + assertEquals(getSLACalcStatus(coord1.getId() + "@1").getExpectedEnd().getTime(), + getSLACalcStatus(coord1.getId() + "@1").getNominalTime().getTime() + 10 * timeInSec); + assertEquals(getSLACalcStatus(coord1.getId() + "@2").getExpectedEnd().getTime(), + getSLACalcStatus(coord1.getId() + "@2").getNominalTime().getTime() + 10 * timeInSec); + + assertEquals(getSLACalcStatus(coord1.getId() + "@5").getExpectedEnd().getTime(), + getSLACalcStatus(coord1.getId() + "@5").getNominalTime().getTime() + 10 * timeInSec); + newParams = "non-valid-param=10"; + try { + bundleEngine.changeSLA(jobIdsStr, null, null, coords, newParams); + fail("Should throw Exception"); + } + catch (BaseEngineException e) { + assertEquals(e.getErrorCode(), ErrorCode.E1027); + } + try { + new CoordinatorEngine().changeSLA(coord1.getId(), null, null, null, newParams); + fail("Should throw Exception"); + } + catch (BaseEngineException e) { + assertEquals(e.getErrorCode(), ErrorCode.E1027); + } + } + + public void testCoordSLAAlertCommands() throws Exception { + setupSLAJobs(); + + final CoordinatorEngine engine = new CoordinatorEngine("u"); + String jobIdsStr = coord1.getId(); + String actions = "1-3,5"; + String coords = null; + engine.disableSLAAlert(jobIdsStr, actions, null, coords); + checkSLAStatus(coord1.getId() + "@1", true); + checkSLAStatus(coord1.getId() + "@2", true); + checkSLAStatus(coord1.getId() + "@3", true); + checkSLAStatus(coord1.getId() + "@5", true); + checkSLAStatus(coord1.getId() + "@4", false); + + actions = "1-3"; + engine.enableSLAAlert(jobIdsStr, actions, null, null); + checkSLAStatus(coord1.getId() + "@1", false); + checkSLAStatus(coord1.getId() + "@2", false); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@5", true); + checkSLAStatus(coord1.getId() + "@4", false); + + engine.enableSLAAlert(jobIdsStr, null, null, null); + checkSLAStatus(coord1.getId() + "@1", false); + checkSLAStatus(coord1.getId() + "@2", false); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@5", false); + checkSLAStatus(coord1.getId() + "@4", false); + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, jobIdsStr); + XConfiguration xConf = new XConfiguration(new StringReader(job.getConf())); + assertEquals(xConf.get(OozieClient.SLA_DISABLE_ALERT), null); + + } + + private void setupSLAJobs() throws Exception { + + coord1 = addRecordToCoordJobTable(Job.Status.RUNNING, true, false); + Date nominalTime1 = DateUtils.parseDateUTC(data); + addRecordToCoordActionTable(coord1.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1, + nominalTime1); + Date nominalTime2 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 1); + + addRecordToCoordActionTable(coord1.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1, + nominalTime2); + + Date nominalTime3 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 2); + addRecordToCoordActionTable(coord1.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1, + nominalTime3); + + Date nominalTime4 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 3); + addRecordToCoordActionTable(coord1.getId(), 4, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1, + nominalTime4); + Date nominalTime5 = org.apache.commons.lang.time.DateUtils.addDays(nominalTime1, 4); + addRecordToCoordActionTable(coord1.getId(), 5, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 1, + nominalTime5); + + coord2 = addRecordToCoordJobTable(Job.Status.RUNNING, true, false); + addRecordToCoordActionTable(coord2.getId(), 1, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, + nominalTime1); + addRecordToCoordActionTable(coord2.getId(), 2, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, + nominalTime2); + addRecordToCoordActionTable(coord2.getId(), 3, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, + nominalTime3); + addRecordToCoordActionTable(coord2.getId(), 4, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0, + nominalTime4); + + bundle = addRecordToBundleJobTable(Job.Status.RUNNING, true); + coord1.setBundleId(bundle.getId()); + coord1.setAppName("coord1"); + coord1.setStartTime(nominalTime1); + coord1.setMatThrottling(12); + coord1.setLastActionNumber(5); + coord2.setBundleId(bundle.getId()); + coord2.setAppName("coord2"); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord1); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQuery.UPDATE_COORD_JOB, coord2); + registerSLABean(coord1.getId(), AppType.COORDINATOR_JOB, null, null); + registerSLABean(coord2.getId(), AppType.COORDINATOR_JOB, null, null); + registerSLABean(coord1.getId() + "@1", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime1); + registerSLABean(coord1.getId() + "@2", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime2); + registerSLABean(coord1.getId() + "@3", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime3); + registerSLABean(coord1.getId() + "@4", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime4); + registerSLABean(coord1.getId() + "@5", AppType.COORDINATOR_ACTION, coord1.getId(), nominalTime5); + registerSLABean(coord2.getId() + "@1", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime1); + registerSLABean(coord2.getId() + "@2", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime2); + registerSLABean(coord2.getId() + "@3", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime3); + registerSLABean(coord2.getId() + "@4", AppType.COORDINATOR_ACTION, coord2.getId(), nominalTime4); + + checkSLAStatus(coord1.getId() + "@1", false); + checkSLAStatus(coord1.getId() + "@2", false); + checkSLAStatus(coord1.getId() + "@3", false); + checkSLAStatus(coord1.getId() + "@5", false); + checkSLAStatus(coord1.getId() + "@4", false); + } + + private void registerSLABean(String jobId, AppType appType, String parentId, Date nominalTime) throws Exception { + SLARegistrationBean slaRegBean = new SLARegistrationBean(); + slaRegBean.setNominalTime(nominalTime); + slaRegBean.setId(jobId); + slaRegBean.setAppType(appType); + startTime = new Date(System.currentTimeMillis() - 1 * 1 * 3600 * 1000); // 1 hour back + slaRegBean.setExpectedStart(startTime); + slaRegBean.setExpectedDuration(3600 * 1000); + slaRegBean.setParentId(parentId); + slaRegBean.setExpectedEnd(endTime); // 1 hour ahead + Services.get().get(SLAService.class).addRegistrationEvent(slaRegBean); + } + + private void checkSLAStatus(String id, boolean status) throws JPAExecutorException { + assertEquals(getSLACalcStatus(id).getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT), status); + } + + private SLACalcStatus getSLACalcStatus(String jobId) throws JPAExecutorException { + return Services.get().get(SLAService.class).getSLACalculator().get(jobId); + + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java index 5ce9a7f..5f72e57 100644 --- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordSubmitXCommand.java @@ -23,6 +23,7 @@ import java.io.FileWriter; import java.io.Reader; import java.io.Writer; import java.net.URI; +import java.util.Date; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -31,13 +32,22 @@ import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.EventHandlerService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLACalcStatus; +import org.apache.oozie.sla.SLACalculator; +import org.apache.oozie.sla.SLAOperations; +import org.apache.oozie.sla.service.SLAService; import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; @@ -51,6 +61,8 @@ public class TestCoordSubmitXCommand extends XDataTestCase { protected void setUp() throws Exception { super.setUp(); services = new Services(); + services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, + EventHandlerService.class.getName() + "," + SLAService.class.getName()); services.init(); } @@ -1319,4 +1331,170 @@ public class TestCoordSubmitXCommand extends XDataTestCase { assertEquals(job.getTimeout(), 43200); } + public void testSubmitWithSLAAlertsDisable() throws Exception { + Configuration conf = new XConfiguration(); + File appPathFile = new File(getTestCaseDir(), "coordinator.xml"); + + // CASE 1: Failure case i.e. multiple data-in instances + Reader reader = IOUtils.getResourceAsReader("coord-action-sla.xml", -1); + Writer writer = new FileWriter(appPathFile); + IOUtils.copyCharStream(reader, writer); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set("start", DateUtils.formatDateOozieTZ(new Date())); + conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1))); + conf.set("frequency", "coord:days(1)"); + conf.set(OozieClient.USER_NAME, getTestUser()); + reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1); + appPathFile = new File(getTestCaseDir(), "workflow.xml"); + writer = new FileWriter(appPathFile); + IOUtils.copyCharStream(reader, writer); + conf.set("wfAppPath", appPathFile.getPath()); + Date nominalTime = new Date(); + conf.set("nominal_time", DateUtils.formatDateOozieTZ(nominalTime)); + + String coordId = new CoordSubmitXCommand(conf).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + SLAService slaService = services.get(SLAService.class); + SLACalculator calc = slaService.getSLACalculator(); + SLACalcStatus slaCalc = calc.get(coordId + "@" + 1); + assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + Configuration conf1=new Configuration(conf); + // CASE I: "ALL" + conf1.set(OozieClient.SLA_DISABLE_ALERT, "ALL"); + coordId = new CoordSubmitXCommand(conf1).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + + slaService = services.get(SLAService.class); + calc = slaService.getSLACalculator(); + slaCalc = calc.get(coordId + "@" + 1); + assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + // CASE II: Date Range + Configuration conf2=new Configuration(conf); + Date startRangeDate = new Date(nominalTime.getTime() - 3600 * 1000); + conf2.set(OozieClient.SLA_DISABLE_ALERT, + DateUtils.formatDateOozieTZ(startRangeDate) + "::" + DateUtils.formatDateOozieTZ(nominalTime)); + coordId = new CoordSubmitXCommand(conf2).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + + slaCalc = calc.get(coordId + "@" + 1); + assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + // CASE III: Coord name (negative test) + Configuration conf3=new Configuration(conf); + conf3.set(OozieClient.SLA_DISABLE_ALERT_COORD, "test-coord-sla-x"); + coordId = new CoordSubmitXCommand(conf3).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + slaCalc = calc.get(coordId + "@" + 1); + assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + // CASE IV: Older than n(hours) + Date otherNominalTime = new Date(nominalTime.getTime() - 73 * 3600 * 1000); + conf = new XConfiguration(); + appPathFile = new File(getTestCaseDir(), "coordinator.xml"); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set("wfAppPath", appPathFile.getPath()); + conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1))); + conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1))); + + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("nominal_time", DateUtils.formatDateOozieTZ(otherNominalTime)); + conf.setInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, 72); + coordId = new CoordSubmitXCommand(conf).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + slaCalc = calc.get(coordId + "@" + 1); + assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + // catchup mode + conf = new XConfiguration(); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set("wfAppPath", appPathFile.getPath()); + conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1))); + conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1))); + + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("nominal_time", + DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1))); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("nominal_time", + DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), -1))); + coordId = new CoordSubmitXCommand(conf).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + slaCalc = calc.get(coordId + "@" + 1); + assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + // normal mode + conf = new XConfiguration(); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set("wfAppPath", appPathFile.getPath()); + conf.set("start", DateUtils.formatDateOozieTZ(new Date())); + conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1))); + + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("nominal_time", DateUtils.formatDateOozieTZ(new Date())); + conf.set(OozieClient.USER_NAME, getTestUser()); + conf.set("nominal_time", DateUtils.formatDateOozieTZ(new Date())); + coordId = new CoordSubmitXCommand(conf).call(); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + slaCalc = calc.get(coordId + "@" + 1); + assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + + } + + public void testSLAAlertWithNewlyCreatedActions() throws Exception { + Configuration conf = new XConfiguration(); + File appPathFile = new File(getTestCaseDir(), "coordinator.xml"); + + // CASE 1: Failure case i.e. multiple data-in instances + Reader reader = IOUtils.getResourceAsReader("coord-action-sla.xml", -1); + Writer writer = new FileWriter(appPathFile); + IOUtils.copyCharStream(reader, writer); + conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString()); + conf.set("start", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addDays(new Date(), -1))); + conf.set("end", DateUtils.formatDateOozieTZ(org.apache.commons.lang.time.DateUtils.addMonths(new Date(), 1))); + conf.set(OozieClient.USER_NAME, getTestUser()); + reader = IOUtils.getResourceAsReader("wf-credentials.xml", -1); + appPathFile = new File(getTestCaseDir(), "workflow.xml"); + writer = new FileWriter(appPathFile); + IOUtils.copyCharStream(reader, writer); + conf.set("wfAppPath", appPathFile.getPath()); + Date nominalTime = new Date(); + conf.set("nominal_time", DateUtils.formatDateOozieTZ(nominalTime)); + + String coordId = new CoordSubmitXCommand(conf).call(); + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + job.setMatThrottling(1); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job); + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + SLAService slaService = services.get(SLAService.class); + SLACalculator calc = slaService.getSLACalculator(); + SLACalcStatus slaCalc = calc.get(coordId + "@" + 1); + assertFalse(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + assertEquals(slaCalc.getExpectedDuration(), 1800000); + job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + assertEquals(job.getLastActionNumber(), 1); + + String newParams = RestConstants.SLA_MAX_DURATION + "=${5 * MINUTES}"; + + new CoordSLAChangeXCommand(coordId, null, null, JobUtils.parseChangeValue(newParams)).call(); + new CoordSLAAlertsDisableXCommand(coordId, null, null).call(); + + job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + job.setMatThrottling(2); + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB, job); + + job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + + new CoordMaterializeTransitionXCommand(coordId, 3600).call(); + job = CoordJobQueryExecutor.getInstance().get(CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB, coordId); + slaCalc = calc.get(coordId + "@" + job.getLastActionNumber()); + assertEquals(slaCalc.getExpectedDuration(), 300000); + // newly action should have sla disable after coord disable command on coord job + assertTrue(Boolean.valueOf(slaCalc.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT))); + Element eAction = XmlUtils.parseXml(job.getJobXml()); + Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); + assertEquals(SLAOperations.getTagElement(eSla, "max-duration"), "${5 * MINUTES}"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java new file mode 100644 index 0000000..85ff5d2 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/executor/jpa/TestCoordActionQueryExecutor.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.executor.jpa; + +import java.util.Date; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.service.Services; +import org.apache.oozie.test.XDataTestCase; +import org.apache.oozie.util.DateUtils; + +public class TestCoordActionQueryExecutor extends XDataTestCase { + + Services services; + + @Override + protected void setUp() throws Exception { + super.setUp(); + services = new Services(); + services.init(); + } + + @Override + protected void tearDown() throws Exception { + services.destroy(); + super.tearDown(); + } + + public void testGetTerminatedActionForDates() throws Exception { + int actionNum = 1; + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0); + + Path appPath = new Path(getFsTestCaseDir(), "coord"); + String actionXml = getCoordActionXml(appPath, "coord-action-get.xml"); + String actionNomialTime = getActionNominalTime(actionXml); + Date nominalTime = DateUtils.parseDateOozieTZ(actionNomialTime); + + Date d1 = new Date(nominalTime.getTime() - 1000); + Date d2 = new Date(nominalTime.getTime() + 1000); + _testGetTerminatedActionForDates(job.getId(), d1, d2, 1); + + d1 = new Date(nominalTime.getTime() + 1000); + d2 = new Date(nominalTime.getTime() + 2000); + _testGetTerminatedActionForDates(job.getId(), d1, d2, 0); + + cleanUpDBTables(); + job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + _testGetTerminatedActionForDates(job.getId(), d1, d2, 0); + } + + private void _testGetTerminatedActionForDates(String jobId, Date d1, Date d2, int expected) throws Exception { + List<CoordinatorActionBean> actionIds = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_TERMINATED_ACTIONS_FOR_DATES, jobId, d1, d2); + assertEquals(expected, actionIds.size()); + } + + public void testGetTerminatedActionIdsForDates() throws Exception { + int actionNum = 1; + CoordinatorJobBean job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.FAILED, "coord-action-get.xml", 0); + + Path appPath = new Path(getFsTestCaseDir(), "coord"); + String actionXml = getCoordActionXml(appPath, "coord-action-get.xml"); + String actionNomialTime = getActionNominalTime(actionXml); + Date nominalTime = DateUtils.parseDateOozieTZ(actionNomialTime); + + Date d1 = new Date(nominalTime.getTime() - 1000); + Date d2 = new Date(nominalTime.getTime() + 1000); + _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 1); + + d1 = new Date(nominalTime.getTime() + 1000); + d2 = new Date(nominalTime.getTime() + 2000); + _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 0); + + cleanUpDBTables(); + job = addRecordToCoordJobTable(CoordinatorJob.Status.RUNNING, false, false); + addRecordToCoordActionTable(job.getId(), actionNum, CoordinatorAction.Status.WAITING, "coord-action-get.xml", 0); + _testGetTerminatedActionIdsForDates(job.getId(), d1, d2, 0); + } + + private void _testGetTerminatedActionIdsForDates(String jobId, Date d1, Date d2, int expected) throws Exception { + List<CoordinatorActionBean> actions = CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_TERMINATED_ACTION_IDS_FOR_DATES, jobId, d1, d2); + assertEquals(expected, actions.size()); + } + +}
