http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java new file mode 100644 index 0000000..4d24388 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/command/coord/CoordSLAChangeXCommand.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.command.coord; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.CoordinatorActionBean; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.command.CommandException; +import org.apache.oozie.command.PreconditionException; +import org.apache.oozie.coord.CoordELEvaluator; +import org.apache.oozie.coord.CoordELFunctions; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor; +import org.apache.oozie.executor.jpa.CoordActionQueryExecutor.CoordActionQuery; +import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.ServiceException; +import org.apache.oozie.service.Services; +import org.apache.oozie.sla.service.SLAService; +import org.apache.oozie.util.ELEvaluator; +import org.apache.oozie.util.Pair; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; + +public class CoordSLAChangeXCommand extends CoordSLAAlertsXCommand { + + Map<String, String> newParams; + + public CoordSLAChangeXCommand(String jobId, String actions, String dates, Map<String, String> newParams) { + super(jobId, "SLA.alerts.change", "SLA.alerts.change", actions, dates); + this.newParams = newParams; + } + + @Override + protected boolean executeSlaCommand() throws ServiceException, CommandException { + try { + List<Pair<String, Map<String, String>>> idSlaDefinitionList = new ArrayList<Pair<String, Map<String, String>>>(); + List<CoordinatorActionBean> coordinatorActionBeanList = getNotTerminatedActions(); + Configuration conf = getJobConf(); + for (CoordinatorActionBean coordAction : coordinatorActionBeanList) { + Map<String, String> slaDefinitionMap = new HashMap<String, String>(newParams); + for (String key : slaDefinitionMap.keySet()) { + Element eAction = XmlUtils.parseXml(coordAction.getActionXml().toString()); + ELEvaluator evalSla = CoordELEvaluator.createSLAEvaluator(eAction, coordAction, conf); + String updateValue = CoordELFunctions.evalAndWrap(evalSla, slaDefinitionMap.get(key)); + slaDefinitionMap.put(key, updateValue); + } + idSlaDefinitionList.add(new Pair<String, Map<String, String>>(coordAction.getId(), slaDefinitionMap)); + } + return Services.get().get(SLAService.class).changeDefinition(idSlaDefinitionList); + } + catch (Exception e) { + throw new CommandException(ErrorCode.E1027, e.getMessage(), e); + } + + } + + @Override + protected void updateJob() throws CommandException { + if (isJobRequest()) { + updateJobSLA(newParams); + } + } + + private List<CoordinatorActionBean> getNotTerminatedActions() throws JPAExecutorException { + if (isJobRequest()) { + return CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, getJobId()); + } + else { + return CoordActionQueryExecutor.getInstance().getList( + CoordActionQuery.GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, getActionList()); + } + + } + + @Override + protected void verifyPrecondition() throws CommandException, PreconditionException { + validateSLAChangeParam(newParams); + } +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/coord/CoordUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java index 4643d73..90050b3 100644 --- a/core/src/main/java/org/apache/oozie/coord/CoordUtils.java +++ b/core/src/main/java/org/apache/oozie/coord/CoordUtils.java @@ -20,11 +20,14 @@ package org.apache.oozie.coord; import java.text.ParseException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; @@ -35,15 +38,19 @@ import org.apache.oozie.command.CommandException; import org.apache.oozie.executor.jpa.CoordActionGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetActionForNominalTimeJPAExecutor; import org.apache.oozie.executor.jpa.JPAExecutorException; +import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.XLogService; +import org.apache.oozie.sla.SLAOperations; import org.apache.oozie.util.CoordActionsInDateRange; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XLog; import org.jdom.Element; +import com.google.common.annotations.VisibleForTesting; + public class CoordUtils { public static final String HADOOP_USER = "user.name"; @@ -92,7 +99,8 @@ public class CoordUtils { * @return the list of Coordinator actions for the date range * @throws CommandException thrown if failed to get coordinator actions by given date range */ - static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active) + @VisibleForTesting + public static List<CoordinatorActionBean> getCoordActionsFromDates(String jobId, String scope, boolean active) throws CommandException { JPAService jpaService = Services.get().get(JPAService.class); ParamChecker.notEmpty(jobId, "jobId"); @@ -132,7 +140,12 @@ public class CoordUtils { throw new CommandException(ErrorCode.E0302, s.trim(), e); } catch (JPAExecutorException e) { - throw new CommandException(e); + if (e.getErrorCode() == ErrorCode.E0605) { + XLog.getLog(CoordUtils.class).info("No action for nominal time:" + s + ". Skipping over"); + } + else { + throw new CommandException(e); + } } } @@ -145,16 +158,7 @@ public class CoordUtils { return coordActions; } - /** - * Get the list of actions for given id ranges - * - * @param jobId coordinator job id - * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-' - * @return the list of all Coordinator actions for action range - * @throws CommandException thrown if failed to get coordinator actions by given id range - */ - public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException { - JPAService jpaService = Services.get().get(JPAService.class); + public static Set<String> getActionsIds(String jobId, String scope) throws CommandException { ParamChecker.notEmpty(jobId, "jobId"); ParamChecker.notEmpty(scope, "scope"); @@ -202,6 +206,21 @@ public class CoordUtils { actions.add(jobId + "@" + s); } } + return actions; + } + + /** + * Get the list of actions for given id ranges + * + * @param jobId coordinator job id + * @param scope a comma-separated list of action ranges. The action range is specified with two action numbers separated by '-' + * @return the list of all Coordinator actions for action range + * @throws CommandException thrown if failed to get coordinator actions by given id range + */ + @VisibleForTesting + public static List<CoordinatorActionBean> getCoordActionsFromIds(String jobId, String scope) throws CommandException { + JPAService jpaService = Services.get().get(JPAService.class); + Set<String> actions = getActionsIds(jobId, scope); // Retrieve the actions using the corresponding actionIds List<CoordinatorActionBean> coordActions = new ArrayList<CoordinatorActionBean>(); for (String id : actions) { @@ -225,4 +244,107 @@ public class CoordUtils { return coordActions; } + /** + * Check if sla alert is disabled for action. + * @param actionBean + * @param coordName + * @param jobConf + * @return + * @throws ParseException + */ + public static boolean isSlaAlertDisabled(CoordinatorActionBean actionBean, String coordName, Configuration jobConf) + throws ParseException { + + int disableSlaNotificationOlderThan = jobConf.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, + ConfigurationService.getInt(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN)); + + if (disableSlaNotificationOlderThan > 0) { + // Disable alert for catchup jobs + long timeDiffinHrs = TimeUnit.MILLISECONDS.toHours(new Date().getTime() + - actionBean.getNominalTime().getTime()); + if (timeDiffinHrs > jobConf.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN, + ConfigurationService.getLong(OozieClient.SLA_DISABLE_ALERT_OLDER_THAN))) { + return true; + } + } + + boolean disableAlert = false; + if (jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD) != null) { + String coords = jobConf.get(OozieClient.SLA_DISABLE_ALERT_COORD); + Set<String> coordsToDisableFor = new HashSet<String>(Arrays.asList(coords.split(","))); + if (coordsToDisableFor.contains(coordName)) { + return true; + } + if (coordsToDisableFor.contains(actionBean.getJobId())) { + return true; + } + } + + // Check if sla alert is disabled for that action + if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_DISABLE_ALERT)) + && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_DISABLE_ALERT)) { + return true; + } + + // Check if sla alert is enabled for that action + if (!StringUtils.isEmpty(jobConf.get(OozieClient.SLA_ENABLE_ALERT)) + && getCoordActionSLAAlertStatus(actionBean, coordName, jobConf, OozieClient.SLA_ENABLE_ALERT)) { + return false; + } + + return disableAlert; + } + + /** + * Get coord action SLA alert status. + * @param actionBean + * @param coordName + * @param jobConf + * @param slaAlertType + * @return + * @throws ParseException + */ + private static boolean getCoordActionSLAAlertStatus(CoordinatorActionBean actionBean, String coordName, + Configuration jobConf, String slaAlertType) throws ParseException { + String slaAlertList; + + if (!StringUtils.isEmpty(jobConf.get(slaAlertType))) { + slaAlertList = jobConf.get(slaAlertType); + // check if ALL or date/action-num range + if (slaAlertList.equalsIgnoreCase(SLAOperations.ALL_VALUE)) { + return true; + } + String[] values = slaAlertList.split(","); + for (String value : values) { + value = value.trim(); + if (value.contains("::")) { + String[] datesInRange = value.split("::"); + Date start = DateUtils.parseDateOozieTZ(datesInRange[0].trim()); + Date end = DateUtils.parseDateOozieTZ(datesInRange[1].trim()); + // check if nominal time in this range + if (actionBean.getNominalTime().compareTo(start) >= 0 + || actionBean.getNominalTime().compareTo(end) <= 0) { + return true; + } + } + else if (value.contains("-")) { + String[] actionsInRange = value.split("-"); + int start = Integer.parseInt(actionsInRange[0].trim()); + int end = Integer.parseInt(actionsInRange[1].trim()); + // check if action number in this range + if (actionBean.getActionNumber() >= start || actionBean.getActionNumber() <= end) { + return true; + } + } + else { + int actionNumber = Integer.parseInt(value.trim()); + if (actionBean.getActionNumber() == actionNumber) { + return true; + } + } + } + } + return false; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java index e6ab09b..c6a60a1 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordActionQueryExecutor.java @@ -28,6 +28,7 @@ import javax.persistence.Query; import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.ErrorCode; +import org.apache.oozie.StringBlob; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; @@ -51,7 +52,12 @@ public class CoordActionQueryExecutor extends GET_COORD_ACTIVE_ACTIONS_COUNT_BY_JOBID, GET_COORD_ACTIONS_BY_LAST_MODIFIED_TIME, GET_COORD_ACTIONS_STATUS_UNIGNORED, - GET_COORD_ACTIONS_PENDING_COUNT + GET_COORD_ACTIONS_PENDING_COUNT, + GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE, + GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE, + GET_TERMINATED_ACTIONS_FOR_DATES, + GET_TERMINATED_ACTION_IDS_FOR_DATES, + GET_ACTIVE_ACTIONS_FOR_DATES }; private static CoordActionQueryExecutor instance = new CoordActionQueryExecutor(); @@ -180,6 +186,19 @@ public class CoordActionQueryExecutor extends case GET_COORD_ACTIONS_PENDING_COUNT: query.setParameter("jobId", parameters[0]); break; + case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: + query.setParameter("ids", parameters[0]); + break; + case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: + query.setParameter("jobId", parameters[0]); + break; + case GET_TERMINATED_ACTIONS_FOR_DATES: + case GET_TERMINATED_ACTION_IDS_FOR_DATES: + case GET_ACTIVE_ACTIONS_FOR_DATES: + query.setParameter("jobId", parameters[0]); + query.setParameter("startTime", new Timestamp(((Date) parameters[1]).getTime())); + query.setParameter("endTime", new Timestamp(((Date) parameters[2]).getTime())); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " @@ -247,6 +266,33 @@ public class CoordActionQueryExecutor extends bean.setStatusStr((String)arr[0]); bean.setPending((Integer)arr[1]); break; + case GET_ACTIVE_ACTIONS_IDS_FOR_SLA_CHANGE: + case GET_ACTIVE_ACTIONS_JOBID_FOR_SLA_CHANGE: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String)arr[0]); + bean.setNominalTime((Timestamp)arr[1]); + bean.setCreatedTime((Timestamp)arr[2]); + bean.setActionXmlBlob((StringBlob)arr[3]); + break; + case GET_TERMINATED_ACTIONS_FOR_DATES: + bean = (CoordinatorActionBean) ret; + break; + case GET_TERMINATED_ACTION_IDS_FOR_DATES: + bean = new CoordinatorActionBean(); + bean.setId((String) ret); + break; + case GET_ACTIVE_ACTIONS_FOR_DATES: + arr = (Object[]) ret; + bean = new CoordinatorActionBean(); + bean.setId((String)arr[0]); + bean.setJobId((String)arr[1]); + bean.setStatusStr((String) arr[2]); + bean.setExternalId((String) arr[3]); + bean.setPending((Integer) arr[4]); + bean.setNominalTime((Timestamp) arr[5]); + bean.setCreatedTime((Timestamp) arr[6]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct action bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java deleted file mode 100644 index 1862c7c..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionIdsForDateRangeJPAExecutor.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.util.ParamChecker; - -/** - * Load coordinator action ids by date range. - */ -public class CoordJobGetActionIdsForDateRangeJPAExecutor implements JPAExecutor<List<String>> { - - private String jobId = null; - private Date startDate, endDate; - - public CoordJobGetActionIdsForDateRangeJPAExecutor(String jobId, Date startDate, Date endDate) { - ParamChecker.notNull(jobId, "jobId"); - this.jobId = jobId; - this.startDate = startDate; - this.endDate = endDate; - } - - @Override - public String getName() { - return "CoordJobGetActionIdsForDateRangeJPAExecutor"; - } - - @Override - @SuppressWarnings("unchecked") - public List<String> execute(EntityManager em) throws JPAExecutorException { - try { - Query q = em.createNamedQuery("GET_ACTION_IDS_FOR_DATES"); - q.setParameter("jobId", jobId); - q.setParameter("startTime", new Timestamp(startDate.getTime())); - q.setParameter("endTime", new Timestamp(endDate.getTime())); - List<String> coordActionIds= q.getResultList(); - return coordActionIds; - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java deleted file mode 100644 index eb95591..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsByDatesForKillJPAExecutor.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.client.CoordinatorAction; -import org.apache.oozie.util.DateUtils; -import org.apache.oozie.util.ParamChecker; - -/** - * Load non-terminal coordinator actions by dates. - */ -public class CoordJobGetActionsByDatesForKillJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> { - - private String jobId = null; - private Date startDate, endDate; - - public CoordJobGetActionsByDatesForKillJPAExecutor(String jobId, Date startDate, Date endDate) { - ParamChecker.notNull(jobId, "jobId"); - this.jobId = jobId; - this.startDate = startDate; - this.endDate = endDate; - } - - @Override - public String getName() { - return "CoordJobGetActionsByDatesForKillJPAExecutor"; - } - - @Override - @SuppressWarnings("unchecked") - public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException { - List<CoordinatorActionBean> actionList = new ArrayList<CoordinatorActionBean>(); - try { - Query q = em.createNamedQuery("GET_ACTIONS_BY_DATES_FOR_KILL"); - q.setParameter("jobId", jobId); - q.setParameter("startTime", new Timestamp(startDate.getTime())); - q.setParameter("endTime", new Timestamp(endDate.getTime())); - List<Object[]> actions = q.getResultList(); - - for (Object[] a : actions) { - CoordinatorActionBean aa = getBeanForRunningCoordAction(a); - actionList.add(aa); - } - return actionList; - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - - private CoordinatorActionBean getBeanForRunningCoordAction(Object[] arr) { - CoordinatorActionBean action = new CoordinatorActionBean(); - if (arr[0] != null) { - action.setId((String) arr[0]); - } - - if (arr[1] != null) { - action.setJobId((String) arr[1]); - } - - if (arr[2] != null) { - action.setStatus(CoordinatorAction.Status.valueOf((String) arr[2])); - } - - if (arr[3] != null) { - action.setExternalId((String) arr[3]); - } - - if (arr[4] != null) { - action.setPending((Integer) arr[4]); - } - - if (arr[5] != null) { - action.setNominalTime(DateUtils.toDate((Timestamp) arr[5])); - } - - if (arr[6] != null) { - action.setCreatedTime(DateUtils.toDate((Timestamp) arr[6])); - } - return action; - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java deleted file mode 100644 index d1856ae..0000000 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobGetActionsForDatesJPAExecutor.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.executor.jpa; - -import java.sql.Timestamp; -import java.util.ArrayList; -import java.util.Date; -import java.util.List; - -import javax.persistence.EntityManager; -import javax.persistence.Query; - -import org.apache.oozie.CoordinatorActionBean; -import org.apache.oozie.ErrorCode; -import org.apache.oozie.util.ParamChecker; - -/** - * Load coordinator actions by dates. - */ -public class CoordJobGetActionsForDatesJPAExecutor implements JPAExecutor<List<CoordinatorActionBean>> { - - private String jobId = null; - private Date startDate, endDate; - - public CoordJobGetActionsForDatesJPAExecutor(String jobId, Date startDate, Date endDate) { - ParamChecker.notNull(jobId, "jobId"); - this.jobId = jobId; - this.startDate = startDate; - this.endDate = endDate; - } - - @Override - public String getName() { - return "CoordJobGetActionsForDatesJPAExecutor"; - } - - @Override - @SuppressWarnings("unchecked") - public List<CoordinatorActionBean> execute(EntityManager em) throws JPAExecutorException { - List<CoordinatorActionBean> actions; - try { - Query q = em.createNamedQuery("GET_ACTIONS_FOR_DATES"); - q.setParameter("jobId", jobId); - q.setParameter("startTime", new Timestamp(startDate.getTime())); - q.setParameter("endTime", new Timestamp(endDate.getTime())); - actions = q.getResultList(); - return actions; - } - catch (Exception e) { - throw new JPAExecutorException(ErrorCode.E0603, e.getMessage(), e); - } - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java index 4bccef4..1518686 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobQueryExecutor.java @@ -26,7 +26,6 @@ import java.util.List; import javax.persistence.EntityManager; import javax.persistence.Query; -import org.apache.oozie.CoordinatorActionBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.StringBlob; @@ -53,6 +52,8 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo UPDATE_COORD_JOB_STATUS_PENDING_TIME, UPDATE_COORD_JOB_MATERIALIZE, UPDATE_COORD_JOB_CHANGE, + UPDATE_COORD_JOB_CONF, + UPDATE_COORD_JOB_XML, GET_COORD_JOB, GET_COORD_JOB_USER_APPNAME, GET_COORD_JOB_INPUT_CHECK, @@ -63,9 +64,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo GET_COORD_JOB_STATUS, GET_COORD_JOB_STATUS_PARENTID, GET_COORD_JOBS_CHANGED, - GET_COORD_JOBS_OLDER_FOR_MATERILZATION, + GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, GET_COORD_FOR_ABANDONEDCHECK, - GET_COORD_IDS_FOR_STATUS_TRANSIT + GET_COORD_IDS_FOR_STATUS_TRANSIT, + GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID, + GET_COORD_JOBS_WITH_PARENT_ID, + GET_COORD_JOB_CONF, + GET_COORD_JOB_XML }; private static CoordJobQueryExecutor instance = new CoordJobQueryExecutor(); @@ -177,6 +182,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo query.setParameter("lastModifiedTime", cjBean.getLastModifiedTimestamp()); query.setParameter("id", cjBean.getId()); break; + case UPDATE_COORD_JOB_CONF: + query.setParameter("conf", cjBean.getConfBlob()); + query.setParameter("id", cjBean.getId()); + break; + case UPDATE_COORD_JOB_XML: + query.setParameter("jobXml", cjBean.getJobXmlBlob()); + query.setParameter("id", cjBean.getId()); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -198,12 +212,14 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOB_SUSPEND_KILL: case GET_COORD_JOB_STATUS: case GET_COORD_JOB_STATUS_PARENTID: + case GET_COORD_JOB_CONF: + case GET_COORD_JOB_XML: query.setParameter("id", parameters[0]); break; case GET_COORD_JOBS_CHANGED: query.setParameter("lastModifiedTime", new Timestamp(((Date)parameters[0]).getTime())); break; - case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: + case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION: query.setParameter("matTime", new Timestamp(((Date)parameters[0]).getTime())); int limit = (Integer) parameters[1]; if (limit > 0) { @@ -218,7 +234,13 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_IDS_FOR_STATUS_TRANSIT: query.setParameter("lastModifiedTime", new Timestamp(((Date) parameters[0]).getTime())); break; - + case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID: + query.setParameter("appName", parameters[0]); + query.setParameter("bundleId", parameters[1]); + break; + case GET_COORD_JOBS_WITH_PARENT_ID: + query.setParameter("parentId", parameters[0]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -335,7 +357,15 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo case GET_COORD_JOBS_CHANGED: bean = (CoordinatorJobBean) ret; break; - case GET_COORD_JOBS_OLDER_FOR_MATERILZATION: + case GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION: + bean = new CoordinatorJobBean(); + bean.setId((String) ret); + break; + case GET_COORD_JOBS_FOR_BUNDLE_BY_APPNAME_ID: + bean = new CoordinatorJobBean(); + bean.setId((String) ret); + break; + case GET_COORD_JOBS_WITH_PARENT_ID: bean = new CoordinatorJobBean(); bean.setId((String) ret); break; @@ -347,11 +377,18 @@ public class CoordJobQueryExecutor extends QueryExecutor<CoordinatorJobBean, Coo bean.setGroup((String) arr[2]); bean.setAppName((String) arr[3]); break; - case GET_COORD_IDS_FOR_STATUS_TRANSIT: bean = new CoordinatorJobBean(); bean.setId((String) ret); break; + case GET_COORD_JOB_CONF: + bean = new CoordinatorJobBean(); + bean.setConfBlob((StringBlob) ret); + break; + case GET_COORD_JOB_XML: + bean = new CoordinatorJobBean(); + bean.setJobXmlBlob((StringBlob) ret); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java index 5e018c7..6d13ed1 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/CoordJobsToBeMaterializedJPAExecutor.java @@ -56,7 +56,7 @@ public class CoordJobsToBeMaterializedJPAExecutor implements JPAExecutor<List<Co public List<CoordinatorJobBean> execute(EntityManager em) throws JPAExecutorException { List<CoordinatorJobBean> cjBeans; try { - Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERILZATION"); + Query q = em.createNamedQuery("GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION"); q.setParameter("matTime", new Timestamp(this.dateInput.getTime())); if (limit > 0) { q.setMaxResults(limit); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java index e220c01..bded634 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLARegistrationQueryExecutor.java @@ -18,6 +18,8 @@ package org.apache.oozie.executor.jpa; +import java.sql.Timestamp; +import java.util.ArrayList; import java.util.List; import javax.persistence.EntityManager; @@ -36,8 +38,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB public enum SLARegQuery { UPDATE_SLA_REG_ALL, + UPDATE_SLA_CONFIG, + UPDATE_SLA_EXPECTED_VALUE, GET_SLA_REG_ALL, - GET_SLA_REG_ON_RESTART + GET_SLA_EXPECTED_VALUE_CONFIG, + GET_SLA_REG_FOR_PARENT_ID, + GET_SLA_REG_ON_RESTART, + GET_SLA_CONFIGS }; private static SLARegistrationQueryExecutor instance = new SLARegistrationQueryExecutor(); @@ -70,6 +77,17 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB query.setParameter("parentId", bean.getParentId()); query.setParameter("jobData", bean.getJobData()); break; + case UPDATE_SLA_EXPECTED_VALUE: + query.setParameter("jobId", bean.getId()); + query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp()); + query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp()); + query.setParameter("expectedDuration", bean.getExpectedDuration()); + break; + case UPDATE_SLA_CONFIG: + query.setParameter("jobId", bean.getId()); + query.setParameter("slaConfig", bean.getSlaConfig()); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -86,6 +104,16 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB case GET_SLA_REG_ON_RESTART: query.setParameter("id", parameters[0]); break; + case GET_SLA_CONFIGS: + query.setParameter("ids", parameters[0]); + break; + case GET_SLA_EXPECTED_VALUE_CONFIG: + query.setParameter("id", parameters[0]); + break; + case GET_SLA_REG_FOR_PARENT_ID: + query.setParameter("parentId", parameters[0]); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -120,9 +148,13 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB JPAService jpaService = Services.get().get(JPAService.class); EntityManager em = jpaService.getEntityManager(); Query query = getSelectQuery(namedQuery, em, parameters); - @SuppressWarnings("unchecked") - List<SLARegistrationBean> beanList = (List<SLARegistrationBean>) jpaService.executeGetList(namedQuery.name(), - query, em); + List<?> retList = (List<?>) jpaService.executeGetList(namedQuery.name(), query, em); + List<SLARegistrationBean> beanList = new ArrayList<SLARegistrationBean>(); + if (retList != null) { + for (Object ret : retList) { + beanList.add(constructBean(namedQuery, ret)); + } + } return beanList; } @@ -145,6 +177,28 @@ public class SLARegistrationQueryExecutor extends QueryExecutor<SLARegistrationB bean.setSlaConfig((String) arr[2]); bean.setJobData((String) arr[3]); break; + case GET_SLA_CONFIGS: + bean = new SLARegistrationBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setSlaConfig((String) arr[1]); + break; + case GET_SLA_EXPECTED_VALUE_CONFIG: + bean = new SLARegistrationBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setSlaConfig((String) arr[1]); + bean.setExpectedStart((Timestamp)arr[2]); + bean.setExpectedEnd((Timestamp)arr[3]); + bean.setExpectedDuration((Long)arr[4]); + bean.setNominalTime((Timestamp)arr[5]); + break; + case GET_SLA_REG_FOR_PARENT_ID: + bean = new SLARegistrationBean(); + arr = (Object[]) ret; + bean.setId((String) arr[0]); + bean.setSlaConfig((String) arr[1]); + break; default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java index c3197b7..0057c89 100644 --- a/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java +++ b/core/src/main/java/org/apache/oozie/executor/jpa/SLASummaryQueryExecutor.java @@ -18,6 +18,7 @@ package org.apache.oozie.executor.jpa; +import java.sql.Timestamp; import java.util.List; import javax.persistence.EntityManager; @@ -39,8 +40,11 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu UPDATE_SLA_SUMMARY_FOR_ACTUAL_TIMES, UPDATE_SLA_SUMMARY_ALL, UPDATE_SLA_SUMMARY_EVENTPROCESSED, + UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES, + UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME, GET_SLA_SUMMARY, - GET_SLA_SUMMARY_EVENTPROCESSED + GET_SLA_SUMMARY_EVENTPROCESSED, + GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED }; private static SLASummaryQueryExecutor instance = new SLASummaryQueryExecutor(); @@ -95,10 +99,24 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu query.setParameter("actualStartTS", bean.getActualStartTimestamp()); query.setParameter("jobId", bean.getId()); break; + case UPDATE_SLA_SUMMARY_FOR_EXPECTED_TIMES: + query.setParameter("nominalTime", bean.getNominalTimestamp()); + query.setParameter("expectedStartTime", bean.getExpectedStartTimestamp()); + query.setParameter("expectedEndTime", bean.getExpectedEndTimestamp()); + query.setParameter("expectedDuration", bean.getExpectedDuration()); + query.setParameter("lastModTime", bean.getLastModifiedTimestamp()); + query.setParameter("jobId", bean.getId()); + break; + case UPDATE_SLA_SUMMARY_EVENTPROCESSED: query.setParameter("eventProcessed", bean.getEventProcessed()); query.setParameter("jobId", bean.getId()); break; + case UPDATE_SLA_SUMMARY_LAST_MODIFIED_TIME: + query.setParameter("lastModifiedTS", bean.getLastModifiedTime()); + query.setParameter("jobId", bean.getId()); + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot set parameters for " + namedQuery.name()); @@ -113,6 +131,7 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu switch (namedQuery) { case GET_SLA_SUMMARY: case GET_SLA_SUMMARY_EVENTPROCESSED: + case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED: query.setParameter("id", parameters[0]); break; } @@ -174,6 +193,14 @@ public class SLASummaryQueryExecutor extends QueryExecutor<SLASummaryBean, SLASu bean = new SLASummaryBean(); bean.setEventProcessed(((Byte)ret).intValue()); break; + case GET_SLA_SUMMARY_EVENTPROCESSED_LAST_MODIFIED: + Object[] arr = (Object[]) ret; + bean = new SLASummaryBean(); + bean.setEventProcessed((Byte)arr[0]); + bean.setLastModifiedTime((Timestamp)arr[1]); + + break; + default: throw new JPAExecutorException(ErrorCode.E0603, "QueryExecutor cannot construct job bean for " + namedQuery.name()); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java index fa16d1d..1cbd474 100644 --- a/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java +++ b/core/src/main/java/org/apache/oozie/service/CoordMaterializeTriggerService.java @@ -160,7 +160,7 @@ public class CoordMaterializeTriggerService implements Service { throws JPAExecutorException { try { List<CoordinatorJobBean> materializeJobs = CoordJobQueryExecutor.getInstance().getList( - CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERILZATION, currDate, limit); + CoordJobQuery.GET_COORD_JOBS_OLDER_FOR_MATERIALIZATION, currDate, limit); LOG.info("CoordMaterializeTriggerService - Curr Date= " + DateUtils.formatDateOozieTZ(currDate) + ", Num jobs to materialize = " + materializeJobs.size()); for (CoordinatorJobBean coordJob : materializeJobs) { http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/service/EventHandlerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java index 7c0d3be..22c6fb0 100644 --- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java +++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java @@ -19,32 +19,32 @@ package org.apache.oozie.service; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.hadoop.conf.Configuration; import org.apache.oozie.ErrorCode; -import org.apache.oozie.event.BundleJobEvent; -import org.apache.oozie.event.CoordinatorActionEvent; -import org.apache.oozie.event.CoordinatorJobEvent; import org.apache.oozie.client.event.Event; import org.apache.oozie.client.event.Event.MessageType; import org.apache.oozie.client.event.JobEvent; +import org.apache.oozie.client.event.SLAEvent; +import org.apache.oozie.event.BundleJobEvent; +import org.apache.oozie.event.CoordinatorActionEvent; +import org.apache.oozie.event.CoordinatorJobEvent; import org.apache.oozie.event.EventQueue; import org.apache.oozie.event.MemoryEventQueue; import org.apache.oozie.event.WorkflowActionEvent; import org.apache.oozie.event.WorkflowJobEvent; import org.apache.oozie.event.listener.JobEventListener; import org.apache.oozie.sla.listener.SLAEventListener; -import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.XLog; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Service class that handles the events system - creating events queue, * managing configured properties and managing and invoking various event http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java index 5690787..a581f8b 100644 --- a/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/BaseJobServlet.java @@ -177,6 +177,27 @@ public abstract class BaseJobServlet extends JsonRestServlet { startCron(); sendJsonResponse(response, HttpServletResponse.SC_OK, json); } + else if (action.equals(RestConstants.SLA_ENABLE_ALERT)) { + validateContentType(request, RestConstants.XML_CONTENT_TYPE); + stopCron(); + slaEnableAlert(request, response); + startCron(); + response.setStatus(HttpServletResponse.SC_OK); + } + else if (action.equals(RestConstants.SLA_DISABLE_ALERT)) { + validateContentType(request, RestConstants.XML_CONTENT_TYPE); + stopCron(); + slaDisableAlert(request, response); + startCron(); + response.setStatus(HttpServletResponse.SC_OK); + } + else if (action.equals(RestConstants.SLA_CHANGE)) { + validateContentType(request, RestConstants.XML_CONTENT_TYPE); + stopCron(); + slaChange(request, response); + startCron(); + response.setStatus(HttpServletResponse.SC_OK); + } else { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0303, RestConstants.ACTION_PARAM, action); @@ -498,4 +519,38 @@ public abstract class BaseJobServlet extends JsonRestServlet { */ abstract String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException; + + /** + * Abstract method to enable SLA alert. + * + * @param request the request + * @param response the response + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + abstract void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + + /** + * Abstract method to disable SLA alert. + * + * @param request the request + * @param response the response + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + abstract void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + + /** + * Abstract method to change SLA definition. + * + * @param request the request + * @param response the response + * @throws XServletException the x servlet exception + * @throws IOException Signals that an I/O exception has occurred. + */ + abstract void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException; + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java index 2578e41..f897652 100644 --- a/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/SLAServlet.java @@ -31,6 +31,7 @@ import java.util.StringTokenizer; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import org.apache.oozie.ErrorCode; import org.apache.oozie.SLAEventBean; import org.apache.oozie.client.OozieClient; http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java index 3e186f9..3cb9168 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V0JobServlet.java @@ -237,6 +237,22 @@ public class V0JobServlet extends BaseJobServlet { @Override protected String getJobStatus(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { - throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + } + + @Override + void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + } + + @Override + void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); + } + + @Override + void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v0"); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java index 64b97c2..d4564c6 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V1JobServlet.java @@ -1103,4 +1103,20 @@ public class V1JobServlet extends BaseJobServlet { IOException { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); } + + @Override + void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + } + + @Override + void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + } + + @Override + void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ErrorCode.E0302, "Not supported in v1"); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java index 5238426..7100c98 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V2JobServlet.java @@ -149,6 +149,53 @@ public class V2JobServlet extends V1JobServlet { } + @Override + protected void slaEnableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + String jobId = getResourceName(request); + String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); + String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); + String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); + try { + getBaseEngine(jobId, getUser(request)).enableSLAAlert(jobId, actions, dates, childIds); + } + catch (BaseEngineException e) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); + } + + } + + @Override + protected void slaDisableAlert(HttpServletRequest request, HttpServletResponse response) throws XServletException, + IOException { + String jobId = getResourceName(request); + String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); + String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); + String childIds = request.getParameter(RestConstants.COORDINATORS_PARAM); + try { + getBaseEngine(jobId, getUser(request)).disableSLAAlert(jobId, actions, dates, childIds); + } + catch (BaseEngineException e) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); + } + } + + @Override + protected void slaChange(HttpServletRequest request, HttpServletResponse response) throws XServletException, IOException { + String jobId = getResourceName(request); + String actions = request.getParameter(RestConstants.JOB_COORD_SCOPE_ACTION_LIST); + String dates = request.getParameter(RestConstants.JOB_COORD_SCOPE_DATE); + String newParams = request.getParameter(RestConstants.JOB_CHANGE_VALUE); + String coords = request.getParameter(RestConstants.COORDINATORS_PARAM); + + try { + getBaseEngine(jobId, getUser(request)).changeSLA(jobId, actions, dates, coords, newParams); + } + catch (BaseEngineException e) { + throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, e); + } + } + /** * Ignore a coordinator job/action * @@ -199,21 +246,18 @@ public class V2JobServlet extends V1JobServlet { String status; String jobId = getResourceName(request); try { - if (jobId.endsWith("-B")) { - BundleEngine engine = Services.get().get(BundleEngineService.class).getBundleEngine(getUser(request)); - status = engine.getJobStatus(jobId); - } else if (jobId.endsWith("-W")) { - DagEngine engine = Services.get().get(DagEngineService.class).getDagEngine(getUser(request)); - status = engine.getJobStatus(jobId); - } else { - CoordinatorEngine engine = - Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(getUser(request)); - if (jobId.contains("-C@")) { - status = engine.getActionStatus(jobId); - } else { - status = engine.getJobStatus(jobId); - } + if (jobId.endsWith("-B") || jobId.endsWith("-W")) { + status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); + } + else if (jobId.contains("C@")) { + CoordinatorEngine engine = Services.get().get(CoordinatorEngineService.class) + .getCoordinatorEngine(getUser(request)); + status = engine.getActionStatus(jobId); } + else { + status = getBaseEngine(jobId, getUser(request)).getJobStatus(jobId); + } + } catch (BaseEngineException ex) { throw new XServletException(HttpServletResponse.SC_BAD_REQUEST, ex); } @@ -251,7 +295,7 @@ public class V2JobServlet extends V1JobServlet { else if (jobId.endsWith("-B")) { return Services.get().get(BundleEngineService.class).getBundleEngine(user); } - else if (jobId.endsWith("-C")) { + else if (jobId.contains("-C")) { return Services.get().get(CoordinatorEngineService.class).getCoordinatorEngine(user); } else { http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java index a0fe1b6..57170e1 100644 --- a/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/V2SLAServlet.java @@ -22,7 +22,9 @@ import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.URLDecoder; import java.text.ParseException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -31,15 +33,19 @@ import java.util.Set; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; + import org.apache.oozie.ErrorCode; import org.apache.oozie.XException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.rest.RestConstants; import org.apache.oozie.command.CommandException; +import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor; +import org.apache.oozie.executor.jpa.SLARegistrationQueryExecutor.SLARegQuery; import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor; import org.apache.oozie.executor.jpa.sla.SLASummaryGetForFilterJPAExecutor.SLASummaryFilter; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; +import org.apache.oozie.sla.SLARegistrationBean; import org.apache.oozie.sla.SLASummaryBean; import org.apache.oozie.util.DateUtils; import org.apache.oozie.util.XLog; @@ -146,7 +152,20 @@ public class V2SLAServlet extends SLAServlet { else { XLog.getLog(getClass()).error(ErrorCode.E0610); } - return SLASummaryBean.toJSONObject(slaSummaryList, timeZoneId); + + List<String> jobIds = new ArrayList<String>(); + for(SLASummaryBean summaryBean:slaSummaryList){ + jobIds.add(summaryBean.getId()); + } + List<SLARegistrationBean> SLARegistrationList = SLARegistrationQueryExecutor.getInstance().getList( + SLARegQuery.GET_SLA_CONFIGS, jobIds); + + Map<String, Map<String, String>> jobIdSLAConfigMap = new HashMap<String, Map<String, String>>(); + for(SLARegistrationBean registrationBean:SLARegistrationList){ + jobIdSLAConfigMap.put(registrationBean.getId(), registrationBean.getSLAConfigMap()); + } + + return SLASummaryBean.toJSONObject(slaSummaryList, jobIdSLAConfigMap, timeZoneId); } catch (XException ex) { throw new CommandException(ex); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java index 189d5ea..0d7123a 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalcStatus.java @@ -20,8 +20,10 @@ package org.apache.oozie.sla; import java.util.Date; +import java.util.Map; import org.apache.oozie.AppType; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.event.SLAEvent; import org.apache.oozie.lock.LockToken; import org.apache.oozie.service.JobsConcurrencyService; @@ -65,6 +67,10 @@ public class SLACalcStatus extends SLAEvent { reg.setAlertContact(regBean.getAlertContact()); reg.setAlertEvents(regBean.getAlertEvents()); reg.setJobData(regBean.getJobData()); + if (regBean.getSLAConfigMap().containsKey(OozieClient.SLA_DISABLE_ALERT)) { + reg.addToSLAConfigMap(OozieClient.SLA_DISABLE_ALERT, + regBean.getSLAConfigMap().get(OozieClient.SLA_DISABLE_ALERT)); + } reg.setId(summary.getId()); reg.setAppType(summary.getAppType()); reg.setUser(summary.getUser()); @@ -267,10 +273,14 @@ public class SLACalcStatus extends SLAEvent { } @Override - public String getSlaConfig() { + public String getSLAConfig() { return regBean.getSlaConfig(); } + public Map<String, String> getSLAConfigMap() { + return regBean.getSLAConfigMap(); + } + @Override public MessageType getMsgType() { return regBean.getMsgType(); http://git-wip-us.apache.org/repos/asf/oozie/blob/0f4b0181/core/src/main/java/org/apache/oozie/sla/SLACalculator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java index 20f93b5..f238321 100644 --- a/core/src/main/java/org/apache/oozie/sla/SLACalculator.java +++ b/core/src/main/java/org/apache/oozie/sla/SLACalculator.java @@ -20,11 +20,14 @@ package org.apache.oozie.sla; import java.util.Date; import java.util.Iterator; +import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.event.JobEvent.EventStatus; import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.service.ServiceException; +import org.apache.oozie.util.Pair; public interface SLACalculator { @@ -51,4 +54,55 @@ public interface SLACalculator { SLACalcStatus get(String jobId) throws JPAExecutorException; + /** + * Enable jobs sla alert. + * + * @param jobId the job ids + * @return true, if successful + * @throws JPAExecutorException the JPA executor exception + * @throws ServiceException the service exception + */ + boolean enableAlert(List<String> jobId) throws JPAExecutorException, ServiceException; + + /** + * Enable sla alert for child jobs. + * @param jobId the parent job ids + * @return + * @throws JPAExecutorException + * @throws ServiceException + */ + boolean enableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException; + + /** + * Disable jobs Sla alert. + * + * @param jobId the job ids + * @return true, if successful + * @throws JPAExecutorException the JPA executor exception + * @throws ServiceException the service exception + */ + boolean disableAlert(List<String> jobId) throws JPAExecutorException, ServiceException; + + + /** + * Disable Sla alert for child jobs. + * @param jobId the parent job ids + * @return + * @throws JPAExecutorException + * @throws ServiceException + */ + boolean disableChildJobAlert(List<String> parentJobIds) throws JPAExecutorException, ServiceException; + + /** + * Change jobs Sla definitions + * It takes list of pairs of jobid and key/value pairs of el evaluated sla definition. + * Support definition are sla-should-start, sla-should-end, sla-nominal-time and sla-max-duration. + * + * @param jobIdsSLAPair the job ids sla pair + * @return true, if successful + * @throws JPAExecutorException the JPA executor exception + * @throws ServiceException the service exception + */ + public boolean changeDefinition(List<Pair<String, Map<String,String>>> jobIdsSLAPair ) throws JPAExecutorException, + ServiceException; }
