OOZIE-1913 Devise a way to turn off SLA alerts for bundle/coordinator flexibly
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/0f4b0181 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/0f4b0181 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/0f4b0181 Branch: refs/heads/master Commit: 0f4b0181bc8bdac4696bce2bde854b332bb02d80 Parents: 5228eb8 Author: Purshotam Shah <[email protected]> Authored: Fri Feb 20 15:01:19 2015 -0800 Committer: Purshotam Shah <[email protected]> Committed: Fri Feb 20 15:01:19 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/oozie/cli/OozieCLI.java | 69 ++++- .../org/apache/oozie/client/OozieClient.java | 185 ++++++++++-- .../org/apache/oozie/client/event/SLAEvent.java | 2 +- .../org/apache/oozie/client/rest/JsonTags.java | 2 + .../apache/oozie/client/rest/RestConstants.java | 20 ++ .../main/java/org/apache/oozie/BaseEngine.java | 40 +++ .../java/org/apache/oozie/BundleEngine.java | 42 ++- .../org/apache/oozie/CoordinatorActionBean.java | 49 +-- .../org/apache/oozie/CoordinatorEngine.java | 47 +++ .../org/apache/oozie/CoordinatorJobBean.java | 56 ++-- .../main/java/org/apache/oozie/DagEngine.java | 16 + .../main/java/org/apache/oozie/ErrorCode.java | 2 + .../apache/oozie/command/SLAAlertsXCommand.java | 117 +++++++ .../bundle/BundleSLAAlertsDisableXCommand.java | 44 +++ .../bundle/BundleSLAAlertsEnableXCommand.java | 45 +++ .../command/bundle/BundleSLAAlertsXCommand.java | 149 +++++++++ .../command/bundle/BundleSLAChangeXCommand.java | 57 ++++ .../bundle/BundleStatusTransitXCommand.java | 1 + .../CoordMaterializeTransitionXCommand.java | 19 +- .../coord/CoordSLAAlertsDisableXCommand.java | 71 +++++ .../coord/CoordSLAAlertsEnableXCommand.java | 65 ++++ .../command/coord/CoordSLAAlertsXCommand.java | 233 ++++++++++++++ .../command/coord/CoordSLAChangeXCommand.java | 100 ++++++ .../java/org/apache/oozie/coord/CoordUtils.java | 146 ++++++++- .../executor/jpa/CoordActionQueryExecutor.java | 48 ++- ...dJobGetActionIdsForDateRangeJPAExecutor.java | 69 ----- ...dJobGetActionsByDatesForKillJPAExecutor.java | 108 ------- .../CoordJobGetActionsForDatesJPAExecutor.java | 70 ----- .../executor/jpa/CoordJobQueryExecutor.java | 51 +++- .../CoordJobsToBeMaterializedJPAExecutor.java | 2 +- .../jpa/SLARegistrationQueryExecutor.java | 62 +++- .../executor/jpa/SLASummaryQueryExecutor.java | 29 +- .../service/CoordMaterializeTriggerService.java | 2 +- .../oozie/service/EventHandlerService.java | 24 +- .../apache/oozie/servlet/BaseJobServlet.java | 55 ++++ .../org/apache/oozie/servlet/SLAServlet.java | 1 + .../org/apache/oozie/servlet/V0JobServlet.java | 18 +- .../org/apache/oozie/servlet/V1JobServlet.java | 16 + .../org/apache/oozie/servlet/V2JobServlet.java | 74 ++++- .../org/apache/oozie/servlet/V2SLAServlet.java | 21 +- .../org/apache/oozie/sla/SLACalcStatus.java | 12 +- .../org/apache/oozie/sla/SLACalculator.java | 54 ++++ .../apache/oozie/sla/SLACalculatorMemory.java | 302 ++++++++++++++++--- .../org/apache/oozie/sla/SLAOperations.java | 143 +++++---- .../apache/oozie/sla/SLARegistrationBean.java | 28 +- .../org/apache/oozie/sla/SLASummaryBean.java | 33 +- .../apache/oozie/sla/service/SLAService.java | 94 +++++- .../oozie/util/CoordActionsInDateRange.java | 23 +- core/src/main/resources/oozie-default.xml | 9 + .../oozie/command/TestSLAAlertXCommand.java | 300 ++++++++++++++++++ .../command/coord/TestCoordSubmitXCommand.java | 178 +++++++++++ .../jpa/TestCoordActionQueryExecutor.java | 111 +++++++ ...CoordJobGetActionIdsForDatesJPAExecutor.java | 82 ----- ...stCoordJobGetActionsForDatesJPAExecutor.java | 83 ----- .../apache/oozie/service/TestHASLAService.java | 71 +++++ .../apache/oozie/servlet/TestV2SLAServlet.java | 2 - .../oozie/sla/TestSLACalculatorMemory.java | 125 ++++++-- .../oozie/sla/TestSLAEventGeneration.java | 4 + .../sla/TestSLARegistrationGetJPAExecutor.java | 20 +- core/src/test/resources/coord-action-sla.xml | 2 +- docs/src/site/twiki/DG_CommandLineTool.twiki | 22 +- docs/src/site/twiki/DG_SLAMonitoring.twiki | 46 +++ docs/src/site/twiki/WebServicesAPI.twiki | 42 +++ release-log.txt | 1 + .../webapp/console/sla/js/oozie-sla-table.js | 1 + .../src/main/webapp/console/sla/oozie-sla.html | 1 + 66 files changed, 3327 insertions(+), 689 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/cli/OozieCLI.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java index 6690869..218edf2 100644 --- a/client/src/main/java/org/apache/oozie/cli/OozieCLI.java +++ b/client/src/main/java/org/apache/oozie/cli/OozieCLI.java @@ -135,17 +135,21 @@ public class OozieCLI { public static final String LOCAL_TIME_OPTION = "localtime"; public static final String TIME_ZONE_OPTION = "timezone"; public static final String QUEUE_DUMP_OPTION = "queuedump"; - public static final String RERUN_COORD_OPTION = "coordinator"; public static final String DATE_OPTION = "date"; public static final String RERUN_REFRESH_OPTION = "refresh"; public static final String RERUN_NOCLEANUP_OPTION = "nocleanup"; public static final String RERUN_FAILED_OPTION = "failed"; public static final String ORDER_OPTION = "order"; + public static final String COORD_OPTION = "coordinator"; public static final String UPDATE_SHARELIB_OPTION = "sharelibupdate"; public static final String LIST_SHARELIB_LIB_OPTION = "shareliblist"; + public static final String SLA_DISABLE_ALERT = "sla_disable"; + public static final String SLA_ENABLE_ALERT = "sla_enable"; + public static final String SLA_CHANGE = "sla_change"; + public static final String AUTH_OPTION = "auth"; @@ -328,7 +332,7 @@ public class OozieCLI { + "(requires -log)"); Option date = new Option(DATE_OPTION, true, "coordinator/bundle rerun on action dates (requires -rerun); coordinator log retrieval on action dates (requires -log)"); - Option rerun_coord = new Option(RERUN_COORD_OPTION, true, "bundle rerun on coordinator names (requires -rerun)"); + Option rerun_coord = new Option(COORD_OPTION, true, "bundle rerun on coordinator names (requires -rerun)"); Option rerun_refresh = new Option(RERUN_REFRESH_OPTION, false, "re-materialize the coordinator rerun actions (requires -rerun)"); Option rerun_nocleanup = new Option(RERUN_NOCLEANUP_OPTION, false, @@ -348,6 +352,14 @@ public class OozieCLI { Option interval = new Option(INTERVAL_OPTION, true, "polling interval in minutes (default is 5, requires -poll)"); interval.setType(Integer.class); + Option slaDisableAlert = new Option(SLA_DISABLE_ALERT, true, + "disables sla alerts for the job and its children"); + Option slaEnableAlert = new Option(SLA_ENABLE_ALERT, true, + "enables sla alerts for the job and its children"); + Option slaChange = new Option(SLA_CHANGE, true, + "Update sla param for jobs, supported param are should-start, should-end, nominal-time and max-duration"); + + Option doAs = new Option(DO_AS_OPTION, true, "doAs user, impersonates as the specified user"); OptionGroup actions = new OptionGroup(); @@ -368,6 +380,10 @@ public class OozieCLI { actions.addOption(config_content); actions.addOption(ignore); actions.addOption(poll); + actions.addOption(slaDisableAlert); + actions.addOption(slaEnableAlert); + actions.addOption(slaChange); + actions.setRequired(true); Options jobOptions = new Options(); jobOptions.addOption(oozie); @@ -401,6 +417,7 @@ public class OozieCLI { OptionGroup updateOption = new OptionGroup(); updateOption.addOption(dryrun); jobOptions.addOptionGroup(updateOption); + return jobOptions; } @@ -1014,8 +1031,8 @@ public class OozieCLI { dateScope = commandLine.getOptionValue(DATE_OPTION); } - if (options.contains(RERUN_COORD_OPTION)) { - coordScope = commandLine.getOptionValue(RERUN_COORD_OPTION); + if (options.contains(COORD_OPTION)) { + coordScope = commandLine.getOptionValue(COORD_OPTION); } if (options.contains(RERUN_REFRESH_OPTION)) { @@ -1234,6 +1251,15 @@ public class OozieCLI { boolean verbose = commandLine.hasOption(VERBOSE_OPTION); wc.pollJob(jobId, timeout, interval, verbose); } + else if (options.contains(SLA_ENABLE_ALERT)) { + slaAlertCommand(commandLine.getOptionValue(SLA_ENABLE_ALERT), wc, commandLine, options); + } + else if (options.contains(SLA_DISABLE_ALERT)) { + slaAlertCommand(commandLine.getOptionValue(SLA_DISABLE_ALERT), wc, commandLine, options); + } + else if (options.contains(SLA_CHANGE)) { + slaAlertCommand(commandLine.getOptionValue(SLA_CHANGE), wc, commandLine, options); + } } catch (OozieClientException ex) { throw new OozieCLIException(ex.toString(), ex); @@ -1902,8 +1928,8 @@ public class OozieCLI { "ssh-action-0.2.xsd"))); sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream( "hive2-action-0.1.xsd"))); - sources.add(new StreamSource(Thread.currentThread().getContextClassLoader().getResourceAsStream( - "spark-action-0.1.xsd"))); + sources.add(new StreamSource(Thread.currentThread().getContextClassLoader() + .getResourceAsStream("spark-action-0.1.xsd"))); SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI); Schema schema = factory.newSchema(sources.toArray(new StreamSource[sources.size()])); Validator validator = schema.newValidator(); @@ -2059,4 +2085,35 @@ public class OozieCLI { return allDeps.toString(); } + private void slaAlertCommand(String jobIds, OozieClient wc, CommandLine commandLine, List<String> options) + throws OozieCLIException, OozieClientException { + String actions = null, coordinators = null, dates = null; + + if (options.contains(ACTION_OPTION)) { + actions = commandLine.getOptionValue(ACTION_OPTION); + } + + if (options.contains(DATE_OPTION)) { + dates = commandLine.getOptionValue(DATE_OPTION); + } + + if (options.contains(COORD_OPTION)) { + coordinators = commandLine.getOptionValue(COORD_OPTION); + if (coordinators == null) { + throw new OozieCLIException("No value specified for -coordinator option"); + } + } + + if (options.contains(SLA_ENABLE_ALERT)) { + wc.slaEnableAlert(jobIds, actions, dates, coordinators); + } + else if (options.contains(SLA_DISABLE_ALERT)) { + wc.slaDisableAlert(jobIds, actions, dates, coordinators); + } + else if (options.contains(SLA_CHANGE)) { + String newSlaParams = commandLine.getOptionValue(CHANGE_VALUE_OPTION); + wc.slaChange(jobIds, actions, dates, coordinators, newSlaParams); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/OozieClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/OozieClient.java b/client/src/main/java/org/apache/oozie/client/OozieClient.java index e4c93cd..5de25cc 100644 --- a/client/src/main/java/org/apache/oozie/client/OozieClient.java +++ b/client/src/main/java/org/apache/oozie/client/OozieClient.java @@ -52,6 +52,7 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.concurrent.Callable; @@ -101,10 +102,10 @@ public class OozieClient { public static final String EXTERNAL_ID = "oozie.wf.external.id"; - public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; - public static final String WORKFLOW_NOTIFICATION_PROXY = "oozie.wf.workflow.notification.proxy"; + public static final String WORKFLOW_NOTIFICATION_URL = "oozie.wf.workflow.notification.url"; + public static final String ACTION_NOTIFICATION_URL = "oozie.wf.action.notification.url"; public static final String COORD_ACTION_NOTIFICATION_URL = "oozie.coord.action.notification.url"; @@ -155,6 +156,14 @@ public class OozieClient { public static final String FILTER_CREATED_TIME_END = "endcreatedtime"; + public static final String SLA_DISABLE_ALERT = "oozie.sla.disable.alerts"; + + public static final String SLA_ENABLE_ALERT = "oozie.sla.enable.alerts"; + + public static final String SLA_DISABLE_ALERT_OLDER_THAN = SLA_DISABLE_ALERT + ".older.than"; + + public static final String SLA_DISABLE_ALERT_COORD = SLA_DISABLE_ALERT + ".coord"; + public static final String CHANGE_VALUE_ENDTIME = "endtime"; public static final String CHANGE_VALUE_PAUSETIME = "pausetime"; @@ -1626,33 +1635,137 @@ public class OozieClient { } /** - * Print sla info about coordinator and workflow jobs and actions. + * Sla enable alert. * - * @param start starting offset - * @param len number of results - * @throws OozieClientException + * @param jobIds the job ids + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @throws OozieClientException the oozie client exception */ - public void getSlaInfo(int start, int len, String filter) throws OozieClientException { - new SlaInfo(start, len, filter).call(); + public void slaEnableAlert(String jobIds, String actions, String dates) throws OozieClientException { + new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, jobIds, actions, dates, null).call(); } - private class SlaInfo extends ClientCallable<Void> { + /** + * Sla enable alert for bundle with coord name/id. + * + * @param bundleId the bundle id + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @param coords the coordinators + * @throws OozieClientException the oozie client exception + */ + public void slaEnableAlert(String bundleId, String actions, String dates, String coords) + throws OozieClientException { + new UpdateSLA(RestConstants.SLA_ENABLE_ALERT, bundleId, actions, dates, coords).call(); + } - SlaInfo(int start, int len, String filter) { - super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID, - Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len), - RestConstants.JOBS_FILTER_PARAM, filter)); + /** + * Sla disable alert. + * + * @param jobIds the job ids + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @throws OozieClientException the oozie client exception + */ + public void slaDisableAlert(String jobIds, String actions, String dates) throws OozieClientException { + new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, jobIds, actions, dates, null).call(); + } + + /** + * Sla disable alert for bundle with coord name/id. + * + * @param bundleId the bundle id + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @param coords the coordinators + * @throws OozieClientException the oozie client exception + */ + public void slaDisableAlert(String bundleId, String actions, String dates, String coords) + throws OozieClientException { + new UpdateSLA(RestConstants.SLA_DISABLE_ALERT, bundleId, actions, dates, coords).call(); + } + + /** + * Sla change definations. + * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] + * Supported parameter key names are should-start, should-end and max-duration + * @param jobIds the job ids + * @param actionIds comma separated list of action ids or action id ranges. + * @param dates comma separated list of the nominal times + * @param newSlaParams the new sla params + * @throws OozieClientException the oozie client exception + */ + public void slaChange(String jobIds, String actions, String dates, String newSlaParams) throws OozieClientException { + new UpdateSLA(RestConstants.SLA_CHANGE, jobIds, actions, dates, null, newSlaParams).call(); + } + + /** + * Sla change defination for bundle with coord name/id. + * SLA change definition parameters can be [<key>=<value>,...<key>=<value>] + * Supported parameter key names are should-start, should-end and max-duration + * @param bundleId the bundle id + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @param coords the coords + * @param newSlaParams the new sla params + * @throws OozieClientException the oozie client exception + */ + public void slaChange(String bundleId, String actions, String dates, String coords, String newSlaParams) + throws OozieClientException { + new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, newSlaParams).call(); + } + + /** + * Sla change with new sla param as hasmap. + * Supported parameter key names are should-start, should-end and max-duration + * @param bundleId the bundle id + * @param actionIds comma separated list of action ids or action id ranges + * @param dates comma separated list of the nominal times + * @param coords the coords + * @param newSlaParams the new sla params + * @throws OozieClientException the oozie client exception + */ + public void slaChange(String bundleId, String actions, String dates, String coords, Map<String, String> newSlaParams) + throws OozieClientException { + new UpdateSLA(RestConstants.SLA_CHANGE, bundleId, actions, dates, coords, mapToString(newSlaParams)).call(); + } + + /** + * Convert Map to string. + * + * @param map the map + * @return the string + */ + private String mapToString(Map<String, String> map) { + StringBuilder sb = new StringBuilder(); + Iterator<Entry<String, String>> it = map.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, String> e = (Entry<String, String>) it.next(); + sb.append(e.getKey()).append("=").append(e.getValue()).append(";"); + } + return sb.toString(); + } + + private class UpdateSLA extends ClientCallable<Void> { + + UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords) { + super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, + action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, + dates, RestConstants.COORDINATORS_PARAM, coords)); + } + + UpdateSLA(String action, String jobIds, String coordActions, String dates, String coords, String newSlaParams) { + super("PUT", RestConstants.JOB, notEmpty(jobIds, "jobIds"), prepareParams(RestConstants.ACTION_PARAM, + action, RestConstants.JOB_COORD_SCOPE_ACTION_LIST, coordActions, RestConstants.JOB_COORD_SCOPE_DATE, + dates, RestConstants.COORDINATORS_PARAM, coords, RestConstants.JOB_CHANGE_VALUE, newSlaParams)); } @Override protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { - BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream())); - String line = null; - while ((line = br.readLine()) != null) { - System.out.println(line); - } + System.out.println("Done"); } else { handleError(conn); @@ -1661,6 +1774,42 @@ public class OozieClient { } } + /** + * Print sla info about coordinator and workflow jobs and actions. + * + * @param start starting offset + * @param len number of results + * @throws OozieClientException + */ + public void getSlaInfo(int start, int len, String filter) throws OozieClientException { + new SlaInfo(start, len, filter).call(); + } + + private class SlaInfo extends ClientCallable<Void> { + + SlaInfo(int start, int len, String filter) { + super("GET", WS_PROTOCOL_VERSION_1, RestConstants.SLA, "", prepareParams(RestConstants.SLA_GT_SEQUENCE_ID, + Integer.toString(start), RestConstants.MAX_EVENTS, Integer.toString(len), + RestConstants.JOBS_FILTER_PARAM, filter)); + } + + @Override + protected Void call(HttpURLConnection conn) throws IOException, OozieClientException { + conn.setRequestProperty("content-type", RestConstants.XML_CONTENT_TYPE); + if ((conn.getResponseCode() == HttpURLConnection.HTTP_OK)) { + BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream())); + String line = null; + while ((line = br.readLine()) != null) { + System.out.println(line); + } + } + else { + handleError(conn); + } + return null; + } + } + private class JobIdAction extends ClientCallable<String> { JobIdAction(String externalId) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java b/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java index 27a0e1f..19d732f 100644 --- a/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java +++ b/client/src/main/java/org/apache/oozie/client/event/SLAEvent.java @@ -157,7 +157,7 @@ public abstract class SLAEvent extends Event { * * @return String slaConfig */ - public abstract String getSlaConfig(); + public abstract String getSLAConfig(); /** * Get the actual start time of job for SLA http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java index b7cf0e7..1022dd7 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java +++ b/client/src/main/java/org/apache/oozie/client/rest/JsonTags.java @@ -172,6 +172,8 @@ public interface JsonTags { public static final String SLA_SUMMARY_JOB_STATUS = "jobStatus"; public static final String SLA_SUMMARY_SLA_STATUS = "slaStatus"; public static final String SLA_SUMMARY_LAST_MODIFIED = "lastModified"; + public static final String SLA_ALERT_STATUS = "slaAlertStatus"; + public static final String TO_STRING = "toString"; http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java index 3c2afc3..4c75d2a 100644 --- a/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java +++ b/client/src/main/java/org/apache/oozie/client/rest/RestConstants.java @@ -186,4 +186,24 @@ public interface RestConstants { public static final String LOG_FILTER_OPTION = "logfilter"; public static final String JOB_COORD_RERUN_FAILED_PARAM = "failed"; + + public static final String SLA_DISABLE_ALERT = "sla-disable"; + + public static final String SLA_ENABLE_ALERT = "sla-enable"; + + public static final String SLA_CHANGE = "sla-change"; + + public static final String SLA_ALERT_RANGE = "sla-alert-range"; + + public static final String COORDINATORS_PARAM = "coordinators"; + + public static final String SLA_NOMINAL_TIME = "sla-nominal-time"; + + public static final String SLA_SHOULD_START = "sla-should-start"; + + public static final String SLA_SHOULD_END = "sla-should-end"; + + public static final String SLA_MAX_DURATION = "sla-max-duration"; + + public static final String JOB_COORD_SCOPE_ACTION_LIST = "action-list"; } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/BaseEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BaseEngine.java b/core/src/main/java/org/apache/oozie/BaseEngine.java index bf38a0c..44074ea 100644 --- a/core/src/main/java/org/apache/oozie/BaseEngine.java +++ b/core/src/main/java/org/apache/oozie/BaseEngine.java @@ -239,4 +239,44 @@ public abstract class BaseEngine { * @throws BaseEngineException thrown if the job's status could not be obtained */ public abstract String getJobStatus(String jobId) throws BaseEngineException; + + /** + * Return the status for a Job ID + * + * @param jobId job Id. + * @return the job's status + * @throws BaseEngineException thrown if the job's status could not be obtained + */ + + /** + * Enable SLA alert for job + * @param id + * @param actions + * @param dates + * @param childIds + * @throws BaseEngineException + */ + public abstract void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; + + /** + * Disable SLA alert for job + * @param id + * @param actions + * @param dates + * @param childIds + * @throws BaseEngineException + */ + public abstract void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException; + + /** + * Change SLA properties for job + * @param id + * @param actions + * @param childIds + * @param newParams + * @throws BaseEngineException + */ + public abstract void changeSLA(String id, String actions, String dates, String childIds, String newParams) + throws BaseEngineException; + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/BundleEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/BundleEngine.java b/core/src/main/java/org/apache/oozie/BundleEngine.java index 9818acc..659c8e6 100644 --- a/core/src/main/java/org/apache/oozie/BundleEngine.java +++ b/core/src/main/java/org/apache/oozie/BundleEngine.java @@ -30,7 +30,6 @@ import java.util.Map; import java.util.Set; import java.util.StringTokenizer; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.CoordinatorJob; @@ -40,6 +39,9 @@ import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.BulkResponseImpl; import org.apache.oozie.command.BulkJobsXCommand; import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.bundle.BundleSLAAlertsDisableXCommand; +import org.apache.oozie.command.bundle.BundleSLAAlertsEnableXCommand; +import org.apache.oozie.command.bundle.BundleSLAChangeXCommand; import org.apache.oozie.command.bundle.BundleJobChangeXCommand; import org.apache.oozie.command.bundle.BundleJobResumeXCommand; import org.apache.oozie.command.bundle.BundleJobSuspendXCommand; @@ -55,6 +57,7 @@ import org.apache.oozie.service.DagXLogInfoService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.XLogFilter; import org.apache.oozie.util.XLogUserFilterParam; import org.apache.oozie.util.ParamChecker; @@ -506,4 +509,41 @@ public class BundleEngine extends BaseEngine { throw new BundleEngineException(e); } } + + @Override + public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new BundleSLAAlertsEnableXCommand(id, actions, dates, childIds).call(); + } + catch (CommandException e) { + throw new BundleEngineException(e); + } + } + + @Override + public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new BundleSLAAlertsDisableXCommand(id, actions, dates, childIds).call(); + } + catch (CommandException e) { + throw new BundleEngineException(e); + } + } + + @Override + public void changeSLA(String id, String actions, String dates, String childIds, String newParams) + throws BaseEngineException { + Map<String, String> slaNewParams = null; + try { + + if (newParams != null) { + slaNewParams = JobUtils.parseChangeValue(newParams); + } + new BundleSLAChangeXCommand(id, actions, dates, childIds, slaNewParams).call(); + } + catch (CommandException e) { + throw new BundleEngineException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java index bd01d14..85b7ed4 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorActionBean.java @@ -18,6 +18,23 @@ package org.apache.oozie; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Timestamp; +import java.text.MessageFormat; +import java.util.Date; +import java.util.List; + +import javax.persistence.Basic; +import javax.persistence.Column; +import javax.persistence.Entity; +import javax.persistence.Id; +import javax.persistence.Lob; +import javax.persistence.NamedQueries; +import javax.persistence.NamedQuery; +import javax.persistence.Table; + import org.apache.hadoop.io.Writable; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.rest.JsonBean; @@ -30,25 +47,6 @@ import org.apache.openjpa.persistence.jdbc.Strategy; import org.json.simple.JSONArray; import org.json.simple.JSONObject; -import javax.persistence.Basic; -import javax.persistence.Column; -import javax.persistence.ColumnResult; -import javax.persistence.Entity; -import javax.persistence.Id; -import javax.persistence.Lob; -import javax.persistence.NamedNativeQueries; -import javax.persistence.NamedNativeQuery; -import javax.persistence.NamedQueries; -import javax.persistence.NamedQuery; -import javax.persistence.SqlResultSetMapping; -import javax.persistence.Table; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.Timestamp; -import java.text.MessageFormat; -import java.util.Date; -import java.util.List; @Entity @NamedQueries({ @@ -149,13 +147,13 @@ import java.util.List; @NamedQuery(name = "GET_COORD_ACTIONS_FOR_RECOVERY_OLDER_THAN", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending from CoordinatorActionBean a where a.pending > 0 AND (a.statusStr = 'SUSPENDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'RUNNING') AND a.lastModifiedTimestamp <= :lastModifiedTime"), // Select query used by rerun, requires almost all columns so select * is used - @NamedQuery(name = "GET_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), + @NamedQuery(name = "GET_TERMINATED_ACTIONS_FOR_DATES", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED' OR a.statusStr = 'IGNORED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), // Select query used by log - @NamedQuery(name = "GET_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), + @NamedQuery(name = "GET_TERMINATED_ACTION_IDS_FOR_DATES", query = "select a.id from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'TIMEDOUT' OR a.statusStr = 'SUCCEEDED' OR a.statusStr = 'KILLED' OR a.statusStr = 'FAILED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), // Select query used by rerun, requires almost all columns so select * is used @NamedQuery(name = "GET_ACTION_FOR_NOMINALTIME", query = "select OBJECT(a) from CoordinatorActionBean a where a.jobId = :jobId AND a.nominalTimestamp = :nominalTime"), - @NamedQuery(name = "GET_ACTIONS_BY_DATES_FOR_KILL", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), + @NamedQuery(name = "GET_ACTIVE_ACTIONS_FOR_DATES", query = "select a.id, a.jobId, a.statusStr, a.externalId, a.pending, a.nominalTimestamp, a.createdTimestamp from CoordinatorActionBean a where a.jobId = :jobId AND (a.statusStr = 'WAITING' OR a.statusStr = 'READY' OR a.statusStr = 'SUBMITTED' OR a.statusStr = 'RUNNING' OR a.statusStr = 'SUSPENDED') AND a.nominalTimestamp >= :startTime AND a.nominalTimestamp <= :endTime"), @NamedQuery(name = "GET_COORD_ACTIONS_COUNT", query = "select count(w) from CoordinatorActionBean w"), @@ -163,7 +161,12 @@ import java.util.List; @NamedQuery(name = "GET_COORD_ACTIONS_MAX_MODIFIED_DATE_FOR_RANGE", query = "select max(w.lastModifiedTimestamp) from CoordinatorActionBean w where w.jobId= :jobId and w.id >= :startAction AND w.id <= :endAction"), - @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime")}) + @NamedQuery(name = "GET_READY_ACTIONS_GROUP_BY_JOBID", query = "select a.jobId, min(a.lastModifiedTimestamp) from CoordinatorActionBean a where a.statusStr = 'READY' group by a.jobId having min(a.lastModifiedTimestamp) < :lastModifiedTime"), + + @NamedQuery(name = "GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.id in (:ids) and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')"), + + @NamedQuery(name = "GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE", query = "select a.id, a.nominalTimestamp, a.createdTimestamp, a.actionXml from CoordinatorActionBean a where a.jobId = :jobId and (a.statusStr <> 'FAILED' AND a.statusStr <> 'KILLED' AND a.statusStr <> 'SUCCEEDED' AND a.statusStr <> 'TIMEDOUT' AND a.statusStr <> 'IGNORED')") + }) @Table(name = "COORD_ACTIONS") public class CoordinatorActionBean implements http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java index 136c097..642a82a 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorEngine.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorEngine.java @@ -19,6 +19,7 @@ package org.apache.oozie; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.CoordinatorAction; @@ -36,6 +37,9 @@ import org.apache.oozie.command.coord.CoordJobsXCommand; import org.apache.oozie.command.coord.CoordKillXCommand; import org.apache.oozie.command.coord.CoordRerunXCommand; import org.apache.oozie.command.coord.CoordResumeXCommand; +import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand; +import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand; +import org.apache.oozie.command.coord.CoordSLAChangeXCommand; import org.apache.oozie.command.coord.CoordSubmitXCommand; import org.apache.oozie.command.coord.CoordSuspendXCommand; import org.apache.oozie.command.coord.CoordUpdateXCommand; @@ -49,6 +53,7 @@ import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogStreamingService; import org.apache.oozie.util.CoordActionsInDateRange; import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.Pair; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; @@ -847,4 +852,46 @@ public class CoordinatorEngine extends BaseEngine { throw new CoordinatorEngineException(e); } } + + @Override + public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new CoordSLAAlertsDisableXCommand(id, actions, dates).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + public void changeSLA(String id, String actions, String dates, String childIds, String newParams) + throws BaseEngineException { + Map<String, String> slaNewParams = null; + + try { + + if (newParams != null) { + slaNewParams = JobUtils.parseChangeValue(newParams); + } + + new CoordSLAChangeXCommand(id, actions, dates, slaNewParams).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + + @Override + public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + try { + new CoordSLAAlertsEnableXCommand(id, actions, dates).call(); + + } + catch (CommandException e) { + throw new CoordinatorEngineException(e); + } + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java index 4d6b970..c3ee839 100644 --- a/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java +++ b/core/src/main/java/org/apache/oozie/CoordinatorJobBean.java @@ -18,18 +18,14 @@ package org.apache.oozie; -import org.apache.hadoop.io.Writable; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.client.CoordinatorJob; -import org.apache.oozie.client.rest.JsonBean; -import org.apache.oozie.client.rest.JsonTags; -import org.apache.oozie.client.rest.JsonUtils; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.WritableUtils; -import org.apache.openjpa.persistence.jdbc.Index; -import org.apache.openjpa.persistence.jdbc.Strategy; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.sql.Timestamp; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; import javax.persistence.Basic; import javax.persistence.Column; @@ -42,14 +38,19 @@ import javax.persistence.NamedQueries; import javax.persistence.NamedQuery; import javax.persistence.Table; import javax.persistence.Transient; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.Timestamp; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.rest.JsonBean; +import org.apache.oozie.client.rest.JsonTags; +import org.apache.oozie.client.rest.JsonUtils; +import org.apache.oozie.util.DateUtils; +import org.apache.oozie.util.WritableUtils; +import org.apache.openjpa.persistence.jdbc.Index; +import org.apache.openjpa.persistence.jdbc.Strategy; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; @Entity @NamedQueries( { @@ -79,6 +80,10 @@ import java.util.List; @NamedQuery(name = "UPDATE_COORD_JOB_CHANGE", query = "update CoordinatorJobBean w set w.endTimestamp = :endTime, w.statusStr = :status, w.pending = :pending, w.doneMaterialization = :doneMaterialization, w.concurrency = :concurrency, w.pauseTimestamp = :pauseTime, w.lastActionNumber = :lastActionNumber, w.lastActionTimestamp = :lastActionTime, w.nextMaterializedTimestamp = :nextMatdTime, w.lastModifiedTimestamp = :lastModifiedTime where w.id = :id"), + @NamedQuery(name = "UPDATE_COORD_JOB_CONF", query = "update CoordinatorJobBean w set w.conf = :conf where w.id = :id"), + + @NamedQuery(name = "UPDATE_COORD_JOB_XML", query = "update CoordinatorJobBean w set w.jobXml = :jobXml where w.id = :id"), + @NamedQuery(name = "DELETE_COORD_JOB", query = "delete from CoordinatorJobBean w where w.id IN (:id)"), @NamedQuery(name = "GET_COORD_JOBS", query = "select OBJECT(w) from CoordinatorJobBean w"), @@ -108,7 +113,7 @@ import java.util.List; //TODO need to remove. @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN", query = "select OBJECT(w) from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) order by w.lastModifiedTimestamp"), - @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERILZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(a.jobId) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"), + @NamedQuery(name = "GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION", query = "select w.id from CoordinatorJobBean w where w.startTimestamp <= :matTime AND (w.statusStr = 'PREP' OR w.statusStr = 'RUNNING' or w.statusStr = 'RUNNINGWITHERROR') AND (w.nextMaterializedTimestamp < :matTime OR w.nextMaterializedTimestamp IS NULL) AND (w.nextMaterializedTimestamp IS NULL OR (w.endTimestamp > w.nextMaterializedTimestamp AND (w.pauseTimestamp IS NULL OR w.pauseTimestamp > w.nextMaterializedTimestamp))) and w.matThrottling > ( select count(a.jobId) from CoordinatorActionBean a where a.jobId = w.id and a.statusStr = 'WAITING') order by w.lastModifiedTimestamp"), @NamedQuery(name = "GET_COORD_JOBS_OLDER_THAN_STATUS", query = "select OBJECT(w) from CoordinatorJobBean w where w.statusStr = :status AND w.lastModifiedTimestamp <= :lastModTime order by w.lastModifiedTimestamp"), @@ -134,7 +139,13 @@ import java.util.List; @NamedQuery(name = "GET_COORD_JOB_STATUS_PARENTID", query = "select w.statusStr, w.bundleId from CoordinatorJobBean w where w.id = :id"), - @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN ('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1) and w.statusStr <> 'IGNORED'") + @NamedQuery(name = "GET_COORD_IDS_FOR_STATUS_TRANSIT", query = "select DISTINCT w.id from CoordinatorActionBean a, CoordinatorJobBean w where w.id = a.jobId and a.lastModifiedTimestamp >= :lastModifiedTime and (w.statusStr IN ('PAUSED', 'RUNNING', 'RUNNINGWITHERROR', 'PAUSEDWITHERROR') or w.pending = 1) and w.statusStr <> 'IGNORED'"), + + @NamedQuery(name = "GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID", query = "select w.id from CoordinatorJobBean w where ( w.appName IN (:appName) OR w.id IN (:appName) ) AND w.bundleId = :bundleId"), + + @NamedQuery(name = "GET_COORD_JOB_CONF", query = "select w.conf from CoordinatorJobBean w where w.id = :id"), + + @NamedQuery(name = "GET_COORD_JOB_XML", query = "select w.jobXml from CoordinatorJobBean w where w.id = :id") }) @NamedNativeQueries({ @@ -221,7 +232,6 @@ public class CoordinatorJobBean implements Writable, CoordinatorJob, JsonBean { private java.sql.Timestamp startTimestamp = null; @Basic - @Index @Column(name = "end_time") private java.sql.Timestamp endTimestamp = null; http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/DagEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/DagEngine.java b/core/src/main/java/org/apache/oozie/DagEngine.java index 50aef2f..ac2e7b1 100644 --- a/core/src/main/java/org/apache/oozie/DagEngine.java +++ b/core/src/main/java/org/apache/oozie/DagEngine.java @@ -585,4 +585,20 @@ public class DagEngine extends BaseEngine { throw new DagEngineException(ex); } } + + @Override + public void enableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow")); + } + + @Override + public void disableSLAAlert(String id, String actions, String dates, String childIds) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow")); + } + + @Override + public void changeSLA(String id, String actions, String dates, String childIds, String newParams) throws BaseEngineException { + throw new BaseEngineException(new XException(ErrorCode.E0301, "Not supported for workflow")); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 4444c87..7630c2f 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -209,6 +209,8 @@ public enum ErrorCode { E1023(XLog.STD, "Coord Job update Error: [{0}]"), E1024(XLog.STD, "Cannot run ignore command: [{0}]"), E1025(XLog.STD, "Coord status transit error: [{0}]"), + E1026(XLog.STD, "SLA alert update command failed: {0}"), + E1027(XLog.STD, "SLA change command failed. {0}"), E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"), http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java new file mode 100644 index 0000000..baf3a27 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/SLAAlertsXCommand.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command; + +import java.util.Map; + +import org.apache.oozie.ErrorCode; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.service.ServiceException; +import org.apache.oozie.util.LogUtils; + +public abstract class SLAAlertsXCommand extends XCommand<Void> { + + private String jobId; + + public SLAAlertsXCommand(String jobId, String name, String type) { + super(name, type, 1); + this.jobId = jobId; + } + + @Override + final protected boolean isLockRequired() { + return true; + } + + @Override + final public String getEntityKey() { + return getJobId(); + } + + final public String getJobId() { + return jobId; + } + + @Override + protected void setLogInfo() { + LogUtils.setLogInfo(jobId); + } + + @Override + protected void loadState() throws CommandException { + + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + } + + @Override + protected Void execute() throws CommandException { + try { + if (!executeSlaCommand()) { + if (!isJobRequest()) { + throw new CommandException(ErrorCode.E1026, "No record found"); + } + } + + } + catch (ServiceException e) { + throw new CommandException(e); + } + updateJob(); + return null; + } + + @Override + public String getKey() { + return getName() + "_" + jobId; + } + + protected void validateSLAChangeParam(Map<String, String> slaParams) throws CommandException, PreconditionException { + for (String key : slaParams.keySet()) { + if (key.equals(RestConstants.SLA_NOMINAL_TIME) || key.equals(RestConstants.SLA_SHOULD_START) + || key.equals(RestConstants.SLA_SHOULD_END) || key.equals(RestConstants.SLA_MAX_DURATION)) { + // good. + } + else { + throw new CommandException(ErrorCode.E1027, "Unsupported parameter " + key); + } + } + } + + /** + * Execute sla command. + * + * @return true, if successful + * @throws ServiceException the service exception + * @throws CommandException the command exception + */ + protected abstract boolean executeSlaCommand() throws ServiceException, CommandException; + + /** + * Update job. + * + * @throws CommandException the command exception + */ + protected abstract void updateJob() throws CommandException; + + protected abstract boolean isJobRequest() throws CommandException; + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java new file mode 100644 index 0000000..4f4e2cd --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsDisableXCommand.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.bundle; + +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordSLAAlertsDisableXCommand; +import org.apache.oozie.service.ServiceException; + +public class BundleSLAAlertsDisableXCommand extends BundleSLAAlertsXCommand { + + public BundleSLAAlertsDisableXCommand(String jobId, String actions, String dates, String childIds) { + super(jobId, actions, dates, childIds); + + } + + @Override + protected void loadState() throws CommandException { + } + + @Override + protected void updateJob() throws CommandException { + } + + @Override + protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException, + CommandException { + new CoordSLAAlertsDisableXCommand(id, actions, dates).call(); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java new file mode 100644 index 0000000..4d3b75c --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsEnableXCommand.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.bundle; + +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.coord.CoordSLAAlertsEnableXCommand; +import org.apache.oozie.service.ServiceException; + +public class BundleSLAAlertsEnableXCommand extends BundleSLAAlertsXCommand { + + public BundleSLAAlertsEnableXCommand(String jobId, String actions, String dates, String childIds) { + super(jobId, actions, dates, childIds); + + } + + @Override + protected void loadState() throws CommandException { + } + + @Override + protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException, + CommandException { + new CoordSLAAlertsEnableXCommand(id, actions, dates).call(); + } + + @Override + protected void updateJob() throws CommandException { + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java new file mode 100644 index 0000000..1e6f6ae --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAAlertsXCommand.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.bundle; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.SLAAlertsXCommand; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor.CoordJobQuery; +import org.apache.oozie.service.ServiceException; + +public abstract class BundleSLAAlertsXCommand extends SLAAlertsXCommand { + + private String actions; + + private String dates; + + private String childIds; + + public BundleSLAAlertsXCommand(String jobId, String actions, String dates, String childIds) { + super(jobId, "SLA.command", "SLA.command"); + this.actions = actions; + this.dates = dates; + this.childIds = childIds; + + } + + @Override + protected void loadState() throws CommandException { + } + + /** + * Gets the coord jobs from bundle. + * + * @param id the bundle id + * @param coords the coords name/id + * @return the coord jobs from bundle + * @throws CommandException the command exception + */ + protected Set<String> getCoordJobsFromBundle(String id, String coords) throws CommandException { + Set<String> jobs = new HashSet<String>(); + List<CoordinatorJobBean> coordJobs; + try { + if (coords == null) { + coordJobs = CoordJobQueryExecutor.getInstance() + .getList(CoordJobQuery.GET_COORD_JOBS_WITH_PARENT_ID, id); + } + else { + coordJobs = CoordJobQueryExecutor.getInstance().getList( + CoordJobQuery.GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, Arrays.asList(coords.split(",")), id); + } + } + catch (XException e) { + throw new CommandException(e); + } + for (CoordinatorJobBean jobBean : coordJobs) { + jobs.add(jobBean.getId()); + } + return jobs; + + } + + /** + * Gets the coord jobs. + * + * @return the coord jobs + */ + protected String getCoordJobs() { + return childIds; + } + + /** + * Gets the actions. + * + * @return the actions + */ + protected String getActions() { + return actions; + } + + /** + * Gets the dates. + * + * @return the dates + */ + protected String getDates() { + return dates; + } + + protected boolean isJobRequest() { + return true; + + } + + @Override + protected boolean executeSlaCommand() throws ServiceException, CommandException { + StringBuffer report = new StringBuffer(); + + Set<String> coordJobs = getCoordJobsFromBundle(getJobId(), getCoordJobs()); + + if (coordJobs.isEmpty()) { + throw new CommandException(ErrorCode.E1026, "No record found"); + } + else { + for (String job : coordJobs) { + try { + executeCoordCommand(job, getActions(), getDates()); + } + catch (Exception e) { + // Ignore exception for coords. + String errorMsg = "SLA command for coord job " + job + " failed. Error message is : " + e.getMessage(); + LOG.error(errorMsg, e); + report.append(errorMsg).append(System.getProperty("line.separator")); + } + } + if (!report.toString().isEmpty()) { + throw new CommandException(ErrorCode.E1026, report.toString()); + } + return true; + } + } + + protected abstract void executeCoordCommand(String id, String actions, String dates) throws ServiceException, + CommandException; + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java new file mode 100644 index 0000000..6530451 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleSLAChangeXCommand.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.bundle; + +import java.util.Map; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.command.coord.CoordSLAChangeXCommand; +import org.apache.oozie.service.ServiceException; + +public class BundleSLAChangeXCommand extends BundleSLAAlertsXCommand { + + Map<String, String> newSlaParams; + + public BundleSLAChangeXCommand(String jobId, String actions, String dates, String childIds, + Map<String, String> newSlaParams) { + super(jobId, actions, dates, childIds); + this.newSlaParams = newSlaParams; + + } + + @Override + protected void loadState() throws CommandException { + } + + @Override + protected void executeCoordCommand(String id, String actions, String dates) throws ServiceException, + CommandException { + new CoordSLAChangeXCommand(id, actions, dates, newSlaParams).call(); + } + + @Override + protected void updateJob() throws CommandException { + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + validateSLAChangeParam(newSlaParams); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java index d6a3197..953e899 100644 --- a/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/bundle/BundleStatusTransitXCommand.java @@ -90,6 +90,7 @@ public class BundleStatusTransitXCommand extends StatusTransitXCommand { } if (bAction.isPending()) { + LOG.debug(bAction + " has pending flag set"); foundPending = true; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java index 548946f..39e6ac1 100644 --- a/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordMaterializeTransitionXCommand.java @@ -32,6 +32,7 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.command.MaterializeTransitionXCommand; import org.apache.oozie.command.PreconditionException; import org.apache.oozie.command.bundle.BundleStatusUpdateXCommand; +import org.apache.oozie.coord.CoordUtils; import org.apache.oozie.coord.TimeUnit; import org.apache.oozie.executor.jpa.BatchQueryExecutor; import org.apache.oozie.executor.jpa.BatchQueryExecutor.UpdateEntry; @@ -486,7 +487,7 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo actionBean.setTimeOut(timeout); if (!dryrun) { - storeToDB(actionBean, action); // Storing to table + storeToDB(actionBean, action, jobConf); // Storing to table } else { @@ -524,26 +525,28 @@ public class CoordMaterializeTransitionXCommand extends MaterializeTransitionXCo } } - private void storeToDB(CoordinatorActionBean actionBean, String actionXml) throws Exception { + private void storeToDB(CoordinatorActionBean actionBean, String actionXml, Configuration jobConf) throws Exception { LOG.debug("In storeToDB() coord action id = " + actionBean.getId() + ", size of actionXml = " + actionXml.length()); actionBean.setActionXml(actionXml); insertList.add(actionBean); - writeActionSlaRegistration(actionXml, actionBean); + writeActionSlaRegistration(actionXml, actionBean, jobConf); } - private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean) throws Exception { + private void writeActionSlaRegistration(String actionXml, CoordinatorActionBean actionBean, Configuration jobConf) + throws Exception { Element eAction = XmlUtils.parseXml(actionXml); Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", eAction.getNamespace("sla")); - SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), SlaAppType.COORDINATOR_ACTION, coordJob - .getUser(), coordJob.getGroup(), LOG); - if(slaEvent != null) { + SLAEventBean slaEvent = SLADbOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), + SlaAppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getGroup(), LOG); + if (slaEvent != null) { insertList.add(slaEvent); } // inserting into new table also SLAOperations.createSlaRegistrationEvent(eSla, actionBean.getId(), actionBean.getJobId(), - AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false); + AppType.COORDINATOR_ACTION, coordJob.getUser(), coordJob.getAppName(), LOG, false, + CoordUtils.isSlaAlertDisabled(actionBean, coordJob.getAppName(), jobConf)); } private void updateJobMaterializeInfo(CoordinatorJobBean job) throws CommandException { http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java new file mode 100644 index 0000000..11daa41 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsDisableXCommand.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.coord; + +import java.util.ArrayList; + +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.ServiceException; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLAOperations; +import org.apache.oozie.sla.service.SLAService; +import org.apache.oozie.util.XConfiguration; + +public class CoordSLAAlertsDisableXCommand extends CoordSLAAlertsXCommand { + + public CoordSLAAlertsDisableXCommand(String id, String actions, String dates) { + super(id, "SLA.alerts.disable", "SLA.alerts.disable", actions, dates); + + } + + @SuppressWarnings("serial") + @Override + protected boolean executeSlaCommand() throws ServiceException, CommandException { + if (getActionList() == null) { + // if getActionList() == null, means enable command is for all child job. + return Services.get().get(SLAService.class).disableChildJobAlert(new ArrayList<String>() { + { + add(getJobId()); + + } + }); + } + else { + return Services.get().get(SLAService.class).disableAlert(getActionList()); + } + + } + + @Override + protected void updateJob() throws CommandException { + XConfiguration conf = new XConfiguration(); + if (isJobRequest()) { + LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + SLAOperations.ALL_VALUE); + conf.set(OozieClient.SLA_DISABLE_ALERT, SLAOperations.ALL_VALUE); + } + else { + LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + SLAOperations.ALL_VALUE); + conf.set(OozieClient.SLA_DISABLE_ALERT, getActionDateListAsString()); + } + + updateJobConf(conf); + + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java new file mode 100644 index 0000000..936f13d --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsEnableXCommand.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.coord; + +import java.util.ArrayList; + +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.service.ServiceException; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.service.SLAService; +import org.apache.oozie.util.XConfiguration; + +public class CoordSLAAlertsEnableXCommand extends CoordSLAAlertsXCommand { + + public CoordSLAAlertsEnableXCommand(String id, String actions, String dates) { + super(id, "SLA.alerts.enable", "SLA.alerts.enable", actions, dates); + } + + @SuppressWarnings("serial") + @Override + protected boolean executeSlaCommand() throws ServiceException, CommandException { + if (getActionList() == null) { + // if getActionList() == null, means enable command is for all child job. + return Services.get().get(SLAService.class).enableChildJobAlert(new ArrayList<String>() { + { + add(getJobId()); + } + }); + } + else { + return Services.get().get(SLAService.class).enableAlert(getActionList()); + } + } + + @Override + protected void updateJob() throws CommandException { + XConfiguration conf = new XConfiguration(); + if (isJobRequest()) { + conf.set(OozieClient.SLA_DISABLE_ALERT, ""); + LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = "); + } + else { + conf.set(OozieClient.SLA_ENABLE_ALERT, getActionDateListAsString()); + LOG.debug("Updating job property " + OozieClient.SLA_DISABLE_ALERT + " = " + getActionDateListAsString()); + + } + updateJobConf(conf); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java new file mode 100644 index 0000000..b8affd6 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAAlertsXCommand.java @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.command.coord; + +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.CoordinatorJobBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.XException; +import org.apache.oozie.client.rest.RestConstants; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.SLAAlertsXCommand; +import org.apache.oozie.coord.CoordUtils; +import org.apache.oozie.executor.jpa.CoordJobQueryExecutor; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.sla.SLAOperations; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import org.jdom.JDOMException; + +public abstract class CoordSLAAlertsXCommand extends SLAAlertsXCommand { + + private String scope; + private String dates; + private List<String> actionIds; + + @Override + protected void loadState() throws CommandException { + actionIds = getActionListForScopeAndDate(getJobId(), scope, dates); + + } + + public CoordSLAAlertsXCommand(String jobId, String name, String type, String actions, String dates) { + super(jobId, name, type); + this.scope = actions; + this.dates = dates; + + } + + /** + * Update job conf. + * + * @param newConf the new conf + * @throws CommandException the command exception + */ + protected void updateJobConf(Configuration newConf) throws CommandException { + + try { + CoordinatorJobBean job = new CoordinatorJobBean(); + XConfiguration conf = null; + conf = getJobConf(); + XConfiguration.copy(newConf, conf); + job.setId(getJobId()); + job.setConf(XmlUtils.prettyPrint(conf).toString()); + CoordJobQueryExecutor.getInstance().executeUpdate( + CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_CONF, job); + } + + catch (XException e) { + throw new CommandException(e); + } + } + + /** + * Update job sla. + * + * @param newParams the new params + * @throws CommandException the command exception + */ + protected void updateJobSLA(Map<String, String> newParams) throws CommandException { + + try { + + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_XML, getJobId()); + + Element eAction; + try { + eAction = XmlUtils.parseXml(job.getJobXml()); + } + catch (JDOMException e) { + throw new CommandException(ErrorCode.E1005, e.getMessage(), e); + } + Element eSla = eAction.getChild("action", eAction.getNamespace()).getChild("info", + eAction.getNamespace("sla")); + + if (newParams != null) { + if (newParams.get(RestConstants.SLA_NOMINAL_TIME) != null) { + updateSlaTagElement(eSla, SLAOperations.NOMINAL_TIME, + newParams.get(RestConstants.SLA_NOMINAL_TIME)); + } + if (newParams.get(RestConstants.SLA_SHOULD_START) != null) { + updateSlaTagElement(eSla, SLAOperations.SHOULD_START, + newParams.get(RestConstants.SLA_SHOULD_START)); + } + if (newParams.get(RestConstants.SLA_SHOULD_END) != null) { + updateSlaTagElement(eSla, SLAOperations.SHOULD_END, newParams.get(RestConstants.SLA_SHOULD_END)); + } + if (newParams.get(RestConstants.SLA_MAX_DURATION) != null) { + updateSlaTagElement(eSla, SLAOperations.MAX_DURATION, + newParams.get(RestConstants.SLA_MAX_DURATION)); + } + } + + String actualXml = XmlUtils.prettyPrint(eAction).toString(); + job.setJobXml(actualXml); + job.setId(getJobId()); + + CoordJobQueryExecutor.getInstance().executeUpdate(CoordJobQueryExecutor.CoordJobQuery.UPDATE_COORD_JOB_XML, + job); + } + catch (XException e) { + throw new CommandException(e); + } + + } + + /** + * Gets the action and date list as string. + * + * @return the action date list as string + */ + protected String getActionDateListAsString() { + StringBuffer bf = new StringBuffer(); + if (!StringUtils.isEmpty(dates)) { + bf.append(dates); + } + + if (!StringUtils.isEmpty(scope)) { + if (!StringUtils.isEmpty(bf.toString())) { + bf.append(","); + } + bf.append(scope); + } + + return bf.toString(); + + } + + /** + * Gets the action list for scope and date. + * + * @param id the id + * @param scope the scope + * @param dates the dates + * @return the action list for scope and date + * @throws CommandException the command exception + */ + private List<String> getActionListForScopeAndDate(String id, String scope, String dates) throws CommandException { + List<String> actionIds = new ArrayList<String>(); + + if (scope == null && dates == null) { + return null; + } + List<String> parsed = new ArrayList<String>(); + if (dates != null) { + List<CoordinatorActionBean> actionSet = CoordUtils.getCoordActionsFromDates(id, dates, true); + for (CoordinatorActionBean action : actionSet) { + actionIds.add(action.getId()); + } + parsed.addAll(actionIds); + } + if (scope != null) { + parsed.addAll(CoordUtils.getActionsIds(id, scope)); + } + return parsed; + } + + /** + * Gets the action list. + * + * @return the action list + */ + protected List<String> getActionList() { + return actionIds; + } + + protected boolean isJobRequest() { + return StringUtils.isEmpty(dates) && StringUtils.isEmpty(scope); + } + + + /** + * Update Sla tag element. + * + * @param elem the elem + * @param tagName the tag name + * @param value the value + */ + public void updateSlaTagElement(Element elem, String tagName, String value) { + if (elem != null && elem.getChild(tagName, elem.getNamespace("sla")) != null) { + elem.getChild(tagName, elem.getNamespace("sla")).setText(value); + } + } + + protected XConfiguration getJobConf() throws JPAExecutorException, CommandException { + CoordinatorJobBean job = CoordJobQueryExecutor.getInstance().get( + CoordJobQueryExecutor.CoordJobQuery.GET_COORD_JOB_CONF, getJobId()); + String jobConf = job.getConf(); + XConfiguration conf = null; + try { + conf = new XConfiguration(new StringReader(jobConf)); + } + catch (IOException e) { + throw new CommandException(ErrorCode.E1005, e.getMessage(), e); + } + return conf; + } + +}
