This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch severity-alerter in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 45d34f050ac628588634e17102c598b7588fd723 Author: Jihao Zhang <[email protected]> AuthorDate: Tue Sep 1 13:54:04 2020 -0700 severity alerter & unit tests --- .../anomaly/monitor/MonitorTaskRunner.java | 10 + .../pinot/thirdeye/constant/AnomalySeverity.java | 35 ++++ .../datalayer/dto/MergedAnomalyResultDTO.java | 10 + .../datalayer/pojo/MergedAnomalyResultBean.java | 10 + .../detection/DetectionPipelineTaskRunner.java | 30 +++ .../thirdeye/detection/DetectionResource.java | 22 +- .../detection/alert/DetectionAlertJob.java | 14 +- .../alert/filter/AnomalySeverityAlertFilter.java | 163 +++++++++++++++ .../translator/SubscriptionConfigTranslator.java | 3 +- .../detection/alert/filter/AlertFilterUtils.java | 9 +- .../filter/AnomalySeverityAlertFilterTest.java | 232 +++++++++++++++++++++ 11 files changed, 526 insertions(+), 12 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java index 763e75b..33fafee 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java @@ -281,6 +281,16 @@ public class MonitorTaskRunner implements TaskRunner { } catch (Exception e) { LOG.error("Exception when deleting old evaluations.", e); } + + // Delete old anomaly subscription notifications. + try { + int deletedRecords = DAO_REGISTRY.getAnomalySubscriptionGroupNotificationManager() + .deleteRecordsOlderThanDays(monitorTaskInfo.getDefaultRetentionDays()); + LOG.info("Deleted {} anomaly subscription notifications that are older than {} days.", deletedRecords, + monitorTaskInfo.getDefaultRetentionDays()); + } catch (Exception e) { + LOG.error("Exception when deleting old anomaly subscription notifications.", e); + } } private Map<Long, JobDTO> findScheduledJobsWithinDays(int days) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java new file mode 100644 index 0000000..48e542e --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/constant/AnomalySeverity.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.pinot.thirdeye.constant; + +public enum AnomalySeverity { + LOW("low"), MODERATE("moderate"), HIGH("high"), CRITICAL("critical"), DEFAULT("default"); + + String userReadableName; + + AnomalySeverity(String userReadableName) { + this.userReadableName = userReadableName; + } + + public String getUserReadableName() { + return this.userReadableName; + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java index 415a252..620b0de 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/dto/MergedAnomalyResultDTO.java @@ -44,6 +44,8 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A private Set<MergedAnomalyResultDTO> children = new HashSet<>(); + private boolean renotify; + public MergedAnomalyResultDTO() { setCreatedTime(System.currentTimeMillis()); } @@ -94,6 +96,14 @@ public class MergedAnomalyResultDTO extends MergedAnomalyResultBean implements A return function; } + public boolean shouldRenotify() { + return renotify; + } + + public void setRenotify(boolean renotify) { + this.renotify = renotify; + } + @Deprecated public void setFunction(AnomalyFunctionDTO function) { this.function = function; diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java index 834356d..e29de67 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/pojo/MergedAnomalyResultBean.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import org.apache.commons.lang3.ObjectUtils; +import org.apache.pinot.thirdeye.constant.AnomalySeverity; public class MergedAnomalyResultBean extends AbstractBean implements Comparable<MergedAnomalyResultBean>, Serializable { @@ -60,6 +61,7 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable< private Set<Long> childIds; // ids of the anomalies this anomaly merged from private boolean isChild; private AnomalyType type; + private AnomalySeverity severity = AnomalySeverity.DEFAULT; public Set<Long> getChildIds() { return childIds; @@ -264,6 +266,14 @@ public class MergedAnomalyResultBean extends AbstractBean implements Comparable< this.type = type; } + public AnomalySeverity getSeverity() { + return severity; + } + + public void setSeverity(AnomalySeverity severity) { + this.severity = severity; + } + @Override public int hashCode() { return Objects.hash(getId(), startTime, endTime, collection, metric, dimensions, score, impactToGlobal, avgBaselineVal, avgCurrentVal, anomalyResultSource, metricUrn, detectionConfigId, childIds, isChild); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java index 86126c7..183be6f 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineTaskRunner.java @@ -27,6 +27,7 @@ import org.apache.pinot.thirdeye.anomaly.task.TaskInfo; import org.apache.pinot.thirdeye.anomaly.task.TaskResult; import org.apache.pinot.thirdeye.anomaly.task.TaskRunner; import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil; +import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager; import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager; @@ -34,9 +35,11 @@ import org.apache.pinot.thirdeye.datalayer.bao.EventManager; import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager; import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; +import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO; import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.EvaluationDTO; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.datalayer.util.Predicate; import org.apache.pinot.thirdeye.datasource.DAORegistry; import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry; import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader; @@ -57,6 +60,7 @@ public class DetectionPipelineTaskRunner implements TaskRunner { private final MergedAnomalyResultManager anomalyDAO; private final EvaluationManager evaluationDAO; private final TaskManager taskDAO; + private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO; private final DetectionPipelineLoader loader; private final DataProvider provider; private final ModelMaintenanceFlow maintenanceFlow; @@ -76,6 +80,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner { this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO(); this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager(); this.taskDAO = DAORegistry.getInstance().getTaskDAO(); + this.anomalySubscriptionGroupNotificationDAO = + DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); MetricConfigManager metricDAO = DAORegistry.getInstance().getMetricConfigDAO(); DatasetConfigManager datasetDAO = DAORegistry.getInstance().getDatasetConfigDAO(); EventManager eventDAO = DAORegistry.getInstance().getEventDAO(); @@ -112,6 +118,8 @@ public class DetectionPipelineTaskRunner implements TaskRunner { this.loader = loader; this.provider = provider; this.taskDAO = DAORegistry.getInstance().getTaskDAO(); + this.anomalySubscriptionGroupNotificationDAO = + DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); this.maintenanceFlow = new ModelRetuneFlow(this.provider, DetectionRegistry.getInstance()); } @@ -167,6 +175,28 @@ public class DetectionPipelineTaskRunner implements TaskRunner { } this.detectionDAO.update(config); + // re-notify the anomalies if any + for (MergedAnomalyResultDTO anomaly : result.getAnomalies()) { + // if an anomaly should be re-notified, update the notification lookup table in the database + if (anomaly.shouldRenotify()) { + List<AnomalySubscriptionGroupNotificationDTO> subscriptionGroupNotificationDTOs = + this.anomalySubscriptionGroupNotificationDAO.findByPredicate(Predicate.EQ("anomalyId", anomaly.getId())); + AnomalySubscriptionGroupNotificationDTO anomalyNotificationDTO; + if (subscriptionGroupNotificationDTOs.isEmpty()) { + // create a new record if it is not existed yet. + anomalyNotificationDTO = new AnomalySubscriptionGroupNotificationDTO(); + new AnomalySubscriptionGroupNotificationDTO(); + anomalyNotificationDTO.setAnomalyId(anomaly.getId()); + anomalyNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId()); + } else { + // update the existing record if the anomaly needs to be re-notified + anomalyNotificationDTO = subscriptionGroupNotificationDTOs.get(0); + anomalyNotificationDTO.setNotifiedSubscriptionGroupIds(Collections.emptyList()); + } + this.anomalySubscriptionGroupNotificationDAO.save(anomalyNotificationDTO); + } + } + ThirdeyeMetricsUtil.detectionTaskSuccessCounter.inc(); LOG.info("End detection for config {} between {} and {}. Detected {} anomalies.", config.getId(), info.start, info.end, result.getAnomalies()); diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java index c0fdb51..f94f700 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionResource.java @@ -123,6 +123,7 @@ public class DetectionResource { private final DetectionAlertConfigFormatter subscriptionConfigFormatter; private final AggregationLoader aggregationLoader; private final DetectionConfigurationResource detectionConfigurationResource; + private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager; @Inject public DetectionResource( @@ -137,6 +138,7 @@ public class DetectionResource { this.detectionAlertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager(); this.evaluationDAO = DAORegistry.getInstance().getEvaluationManager(); this.taskDAO = DAORegistry.getInstance().getTaskDAO(); + this.anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); TimeSeriesLoader timeseriesLoader = new DefaultTimeSeriesLoader(metricDAO, datasetDAO, ThirdEyeCacheRegistry.getInstance().getQueryCache(), ThirdEyeCacheRegistry.getInstance().getTimeSeriesCache()); @@ -573,14 +575,18 @@ public class DetectionResource { return Response.ok(health).build(); } - @GET - @Path(value = "/alert") - public Response alert() { - AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationManager = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); - AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO = new AnomalySubscriptionGroupNotificationDTO(); - anomalySubscriptionGroupNotificationDTO.setAnomalyId(1L); - anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(2L); - anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO); + @POST + @Path(value = "/re-notify") + @ApiOperation("Resend the notification for the anomalies to the subscribed notification groups, if the subscription group supports re-notify") + public Response alert(@QueryParam("id") List<Long> anomalyIds) { + List<MergedAnomalyResultDTO> anomalies = this.anomalyDAO.findByIds(anomalyIds); + for (MergedAnomalyResultDTO anomaly : anomalies) { + AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotificationDTO = + new AnomalySubscriptionGroupNotificationDTO(); + anomalySubscriptionGroupNotificationDTO.setAnomalyId(anomaly.getId()); + anomalySubscriptionGroupNotificationDTO.setDetectionConfigId(anomaly.getDetectionConfigId()); + anomalySubscriptionGroupNotificationManager.save(anomalySubscriptionGroupNotificationDTO); + } return Response.ok().build(); } } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java index 70b318c..4026869 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java @@ -24,9 +24,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; import java.util.Map; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; +import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager; import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; +import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO; import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO; import org.apache.pinot.thirdeye.datalayer.util.Predicate; @@ -48,12 +50,14 @@ public class DetectionAlertJob implements Job { private DetectionAlertConfigManager alertConfigDAO; private TaskManager taskDAO; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); - private MergedAnomalyResultManager anomalyDAO; + private final MergedAnomalyResultManager anomalyDAO; + private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO; public DetectionAlertJob() { this.alertConfigDAO = DAORegistry.getInstance().getDetectionAlertConfigManager(); this.taskDAO = DAORegistry.getInstance().getTaskDAO(); this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO(); + this.anomalySubscriptionGroupNotificationDAO = DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); } @Override @@ -110,6 +114,8 @@ public class DetectionAlertJob implements Job { * For example, if previous anomaly is from t1 to t2 generated at t3, then the timestamp in vectorLock is t3. * If there is a new anomaly from t2 to t4 generated at t5, then we can still get this anomaly as t5 > t3. * + * Also, check if there is any anomaly that needs re-notifying + * * @param configDTO The Subscription Configuration. * @return true if it needs notification task. false otherwise. */ @@ -123,6 +129,10 @@ public class DetectionAlertJob implements Job { return true; } } - return false; + // in addition to checking the watermarks, check if any anomalies need to be re-notified by querying the anomaly subscription group notification table + List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotifications = + this.anomalySubscriptionGroupNotificationDAO.findByPredicate( + Predicate.IN("detectionConfigId", vectorLocks.keySet().toArray())); + return !anomalySubscriptionGroupNotifications.isEmpty(); } } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java new file mode 100644 index 0000000..1042ac7 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilter.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.thirdeye.detection.alert.filter; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.pinot.thirdeye.constant.AnomalySeverity; +import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager; +import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO; +import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.datalayer.util.Predicate; +import org.apache.pinot.thirdeye.datasource.DAORegistry; +import org.apache.pinot.thirdeye.detection.ConfigUtils; +import org.apache.pinot.thirdeye.detection.DataProvider; +import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification; +import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult; +import org.apache.pinot.thirdeye.detection.alert.StatefulDetectionAlertFilter; +import org.apache.pinot.thirdeye.detection.annotation.AlertFilter; + + +/** + * The detection alert filter that can send notifications through multiple channels + * to a set of unconditional and another set of conditional recipients, based on the + * value of a specified anomaly severities. + * + * You can configure anomaly severities along with a variety of alerting + * channels and reference links. + * + * This alert pipeline have the capability of re-notify anomalies if the anomaly's severity is + * changed after it's created. + * + * <pre> + * severityRecipients: + * - severity: + * - LOW + * notify: + * jiraScheme: + * project: PROJECT + * assignee: oncall + * emailScheme: + * recipients: + * to: + * - "[email protected]" + * - severity: + * - HIGH + * - CRITICAL + * notify: + * jiraScheme: + * project: PROJECT + * assignee: manager + * </pre> + */ +@AlertFilter(type = "SEVERITY_ALERTER_PIPELINE") +public class AnomalySeverityAlertFilter extends StatefulDetectionAlertFilter { + public static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds"; + public static final String PROP_SEVERITY = "severity"; + public static final String PROP_NOTIFY = "notify"; + public static final String PROP_REF_LINKS = "referenceLinks"; + public static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients"; + + private final List<Map<String, Object>> severityRecipients; + private final List<Long> detectionConfigIds; + + private final AnomalySubscriptionGroupNotificationManager anomalySubscriptionGroupNotificationDAO; + + + public AnomalySeverityAlertFilter(DataProvider provider, DetectionAlertConfigDTO config, long endTime) { + super(provider, config, endTime); + this.severityRecipients = ConfigUtils.getList(this.config.getProperties().get(PROP_SEVERITY_RECIPIENTS)); + this.detectionConfigIds = ConfigUtils.getLongs(this.config.getProperties().get(PROP_DETECTION_CONFIG_IDS)); + this.anomalySubscriptionGroupNotificationDAO = + DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); + } + + @Override + public DetectionAlertFilterResult run() { + DetectionAlertFilterResult result = new DetectionAlertFilterResult(); + + // retrieve the anomalies based on vector clocks + Set<MergedAnomalyResultDTO> anomalies = this.filter(this.makeVectorClocks(this.detectionConfigIds)); + // find the anomalies that needs re-notifying. + anomalies.addAll(this.retrieveRenotifyAnomalies(this.detectionConfigIds)); + // Prepare mapping from severity-recipients to anomalies + for (Map<String, Object> severityRecipient : this.severityRecipients) { + List<AnomalySeverity> severities = ConfigUtils.getList(severityRecipient.get(PROP_SEVERITY)) + .stream() + .map(s -> AnomalySeverity.valueOf((String) s)) + .collect(Collectors.toList()); + Set<MergedAnomalyResultDTO> notifyAnomalies = new HashSet<>(); + for (MergedAnomalyResultDTO anomaly : anomalies) { + if (severities.contains(anomaly.getSeverity())) { + notifyAnomalies.add(anomaly); + } + } + + if (!notifyAnomalies.isEmpty()) { + DetectionAlertConfigDTO subsConfig = SubscriptionUtils.makeChildSubscriptionConfig(config, + ConfigUtils.getMap(severityRecipient.get(PROP_NOTIFY)), + ConfigUtils.getMap(severityRecipient.get(PROP_REF_LINKS))); + result.addMapping(new DetectionAlertFilterNotification(subsConfig), notifyAnomalies); + } + } + + // Notify the remaining anomalies to default recipients + Set<MergedAnomalyResultDTO> allNotifiedAnomalies = new HashSet<>(result.getAllAnomalies()); + Set<MergedAnomalyResultDTO> defaultAnomalies = new HashSet<>(); + for (MergedAnomalyResultDTO anomaly : anomalies) { + if (!allNotifiedAnomalies.contains(anomaly)) { + defaultAnomalies.add(anomaly); + } + } + if (!defaultAnomalies.isEmpty()) { + result.addMapping(new DetectionAlertFilterNotification(config), defaultAnomalies); + } + + return result; + } + + + protected Collection<MergedAnomalyResultDTO> retrieveRenotifyAnomalies(Collection<Long> detectionConfigIds) { + // find if any notification is needed + List<AnomalySubscriptionGroupNotificationDTO> anomalySubscriptionGroupNotificationDTOs = + this.anomalySubscriptionGroupNotificationDAO.findByPredicate( + Predicate.IN("detectionConfigId", detectionConfigIds.toArray())); + + List<Long> anomalyIds = new ArrayList<>(); + for (AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification : anomalySubscriptionGroupNotificationDTOs) { + // notify the anomalies if this subscription group have not sent out this anomaly yet + if (!anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().contains(this.config.getId())) { + anomalyIds.add(anomalySubscriptionGroupNotification.getAnomalyId()); + // add this subscription group to the notification record and update + anomalySubscriptionGroupNotification.getNotifiedSubscriptionGroupIds().add(this.config.getId()); + this.anomalySubscriptionGroupNotificationDAO.save(anomalySubscriptionGroupNotification); + } + } + return anomalyIds.isEmpty() ? Collections.emptyList() + : DAORegistry.getInstance().getMergedAnomalyResultDAO().findByIds(anomalyIds); + } +} diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java index 9d4044f..a3fec01 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/translator/SubscriptionConfigTranslator.java @@ -68,10 +68,11 @@ public class SubscriptionConfigTranslator extends ConfigTranslator<DetectionAler private static final String PROP_DIMENSION = "dimension"; private static final String PROP_DIMENSION_RECIPIENTS = "dimensionRecipients"; + private static final String PROP_SEVERITY_RECIPIENTS = "severityRecipients"; private static final DetectionAlertRegistry DETECTION_ALERT_REGISTRY = DetectionAlertRegistry.getInstance(); private static final Set<String> PROPERTY_KEYS = new HashSet<>( - Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS)); + Arrays.asList(PROP_RECIPIENTS, PROP_DIMENSION, PROP_DIMENSION_RECIPIENTS, PROP_SEVERITY_RECIPIENTS)); private final DetectionConfigManager detectionConfigDAO; public SubscriptionConfigTranslator(DetectionConfigManager detectionConfigDAO, String yamlConfig) { diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java index a1091cf..72f64db 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AlertFilterUtils.java @@ -26,6 +26,7 @@ import java.util.Map; import java.util.Set; import org.apache.pinot.thirdeye.anomaly.AnomalyType; import org.apache.pinot.thirdeye.common.dimension.DimensionMap; +import org.apache.pinot.thirdeye.constant.AnomalySeverity; import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFeedbackDTO; import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; @@ -89,9 +90,14 @@ public class AlertFilterUtils { return new DetectionAlertFilterNotification(subsConfig); } - static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end, Map<String, String> dimensions, AnomalyFeedbackDTO feedback) { + return makeAnomaly(configId, baseTime, start, end, dimensions, feedback, AnomalySeverity.DEFAULT); + } + + + static MergedAnomalyResultDTO makeAnomaly(Long configId, long baseTime, long start, long end, + Map<String, String> dimensions, AnomalyFeedbackDTO feedback, AnomalySeverity severity) { MergedAnomalyResultDTO anomaly = DetectionTestUtils.makeAnomaly(configId, baseTime + start, baseTime + end); anomaly.setType(AnomalyType.DEVIATION); anomaly.setChildIds(Collections.emptySet()); @@ -108,6 +114,7 @@ public class AlertFilterUtils { anomaly.setCreatedBy("no-auth-user"); anomaly.setUpdatedBy("no-auth-user"); + anomaly.setSeverity(severity); anomaly.setId(DAORegistry.getInstance().getMergedAnomalyResultDAO().save(anomaly)); if (feedback != null) { diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java new file mode 100644 index 0000000..38f9189 --- /dev/null +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/alert/filter/AnomalySeverityAlertFilterTest.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.pinot.thirdeye.detection.alert.filter; + +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.thirdeye.constant.AnomalySeverity; +import org.apache.pinot.thirdeye.datalayer.bao.AnomalySubscriptionGroupNotificationManager; +import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase; +import org.apache.pinot.thirdeye.datalayer.dto.AnomalySubscriptionGroupNotificationDTO; +import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; +import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO; +import org.apache.pinot.thirdeye.datasource.DAORegistry; +import org.apache.pinot.thirdeye.detection.MockDataProvider; +import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilter; +import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterNotification; +import org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterResult; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.apache.pinot.thirdeye.detection.alert.filter.AlertFilterUtils.*; + + +public class AnomalySeverityAlertFilterTest { + private static final String PROP_RECIPIENTS = "recipients"; + private static final String PROP_EMAIL_SCHEME = "emailScheme"; + private static final String PROP_TO = "to"; + private static final String PROP_CC = "cc"; + private static final String PROP_BCC = "bcc"; + private static final Set<String> PROP_TO_FOR_VALUE = + new HashSet<>(Arrays.asList("[email protected]", "[email protected]")); + private static final Set<String> PROP_TO_FOR_ANOTHER_VALUE = + new HashSet<>(Arrays.asList("[email protected]", "[email protected]")); + private static final String PROP_DETECTION_CONFIG_IDS = "detectionConfigIds"; + private static final String PROP_SEVERITY_TO = "severityRecipients"; + private static final List<Object> severityProperty = new ArrayList<>(); + + private DetectionAlertFilter alertFilter; + + private DAOTestBase testDAOProvider; + private DetectionAlertConfigDTO alertConfig; + private List<MergedAnomalyResultDTO> detectionAnomalies; + private long baseTime; + private List<Long> detectionConfigIds; + private MergedAnomalyResultDTO renotifyAnomaly; + private final MockDataProvider provider = new MockDataProvider(); + private final Map<String, Object> notify1 = new HashMap<>(); + private final Map<String, Object> notify2 = new HashMap<>(); + private final Map<String, Object> defaultScheme = new HashMap<>(); + + @BeforeMethod + public void beforeMethod() throws InterruptedException { + testDAOProvider = DAOTestBase.getInstance(); + + DetectionConfigDTO detectionConfig1 = new DetectionConfigDTO(); + detectionConfig1.setName("test detection 1"); + detectionConfig1.setActive(true); + long detectionConfigId1 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig1); + + DetectionConfigDTO detectionConfig2 = new DetectionConfigDTO(); + detectionConfig2.setName("test detection 2"); + detectionConfig2.setActive(true); + long detectionConfigId2 = DAORegistry.getInstance().getDetectionConfigManager().save(detectionConfig2); + + detectionConfigIds = Arrays.asList(detectionConfigId1, detectionConfigId2); + + // Anomaly notification is tracked through create time. Start and end time doesn't matter here. + this.detectionAnomalies = new ArrayList<>(); + renotifyAnomaly = + makeAnomaly(detectionConfigId1, System.currentTimeMillis(), 0, 50, Collections.singletonMap("key", "value"), + null, AnomalySeverity.LOW); + Thread.sleep(100); + this.baseTime = System.currentTimeMillis(); + Thread.sleep(100); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId1, this.baseTime, 0, 100, Collections.singletonMap("key", "value"), null, + AnomalySeverity.LOW)); + Thread.sleep(10); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId1, this.baseTime, 0, 110, Collections.singletonMap("key", "anotherValue"), null, + AnomalySeverity.MODERATE)); + Thread.sleep(20); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId1, this.baseTime, 0, 120, Collections.singletonMap("key", "unknownValue"), null, + AnomalySeverity.HIGH)); + Thread.sleep(30); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId2, this.baseTime, 110, 150, Collections.singletonMap("unknownKey", "value"), + null)); + Thread.sleep(10); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId2, this.baseTime, 120, 160, Collections.singletonMap("key", "value"), null)); + Thread.sleep(40); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId1, this.baseTime, 150, 200, Collections.<String, String>emptyMap(), null)); + Thread.sleep(200); + this.detectionAnomalies.add( + makeAnomaly(detectionConfigId2, this.baseTime, 300, 400, Collections.singletonMap("key", "value"), null)); + Thread.sleep(100); + + this.alertConfig = createDetectionAlertConfig(); + } + + private DetectionAlertConfigDTO createDetectionAlertConfig() { + DetectionAlertConfigDTO alertConfig = new DetectionAlertConfigDTO(); + + notify1.put("severity", Arrays.asList("LOW", "MODERATE")); + notify1.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_VALUE))); + notify2.put("severity", Collections.singleton("HIGH")); + notify2.put("notify", ImmutableMap.of("emailScheme", ImmutableMap.of("recipients", PROP_TO_FOR_ANOTHER_VALUE))); + severityProperty.add(notify1); + severityProperty.add(notify2); + + Map<String, Object> properties = new HashMap<>(); + properties.put(PROP_DETECTION_CONFIG_IDS, detectionConfigIds); + properties.put(PROP_SEVERITY_TO, severityProperty); + alertConfig.setProperties(properties); + + Map<String, Object> emailScheme = new HashMap<>(); + Map<String, Set<String>> recipients = new HashMap<>(); + recipients.put(PROP_TO, AlertFilterUtils.PROP_TO_VALUE); + recipients.put(PROP_CC, AlertFilterUtils.PROP_CC_VALUE); + recipients.put(PROP_BCC, AlertFilterUtils.PROP_BCC_VALUE); + emailScheme.put(PROP_RECIPIENTS, recipients); + defaultScheme.put(PROP_EMAIL_SCHEME, emailScheme); + alertConfig.setAlertSchemes(defaultScheme); + + Map<Long, Long> vectorClocks = new HashMap<>(); + vectorClocks.put(detectionConfigIds.get(0), this.baseTime); + vectorClocks.put(detectionConfigIds.get(1), this.baseTime); + alertConfig.setVectorClocks(vectorClocks); + + return alertConfig; + } + + @Test + public void testAlertFilterRecipients() throws Exception { + this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L); + + DetectionAlertFilterResult result = this.alertFilter.run(); + Assert.assertEquals(result.getResult().size(), 3); + + int verifiedResult = 0; + for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult() + .entrySet()) { + if (entry.getValue().equals(makeSet(0, 1))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify")); + verifiedResult++; + } else if (entry.getValue().equals(makeSet(2))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify")); + verifiedResult++; + } else if (entry.getValue().equals(makeSet(3, 4, 5))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme); + verifiedResult++; + } + } + Assert.assertEquals(verifiedResult, 3); + } + + @Test + public void testRenotifyAnomaly() throws Exception { + AnomalySubscriptionGroupNotificationManager renotificationManager = + DAORegistry.getInstance().getAnomalySubscriptionGroupNotificationManager(); + AnomalySubscriptionGroupNotificationDTO anomalySubscriptionGroupNotification = + new AnomalySubscriptionGroupNotificationDTO(); + anomalySubscriptionGroupNotification.setAnomalyId(renotifyAnomaly.getId()); + anomalySubscriptionGroupNotification.setDetectionConfigId(renotifyAnomaly.getDetectionConfigId()); + renotificationManager.save(anomalySubscriptionGroupNotification); + + this.alertFilter = new AnomalySeverityAlertFilter(provider, alertConfig, this.baseTime + 350L); + + DetectionAlertFilterResult result = this.alertFilter.run(); + Assert.assertEquals(result.getResult().size(), 3); + + int verifiedResult = 0; + for (Map.Entry<DetectionAlertFilterNotification, Set<MergedAnomalyResultDTO>> entry : result.getResult() + .entrySet()) { + if (entry.getValue().equals(makeSet(renotifyAnomaly, 0, 1))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify1.get("notify")); + verifiedResult++; + } else if (entry.getValue().equals(makeSet(2))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), notify2.get("notify")); + verifiedResult++; + } else if (entry.getValue().equals(makeSet(3, 4, 5))) { + Assert.assertEquals(entry.getKey().getSubscriptionConfig().getAlertSchemes(), defaultScheme); + verifiedResult++; + } + } + Assert.assertEquals(verifiedResult, 3); + } + + private Set<MergedAnomalyResultDTO> makeSet(MergedAnomalyResultDTO anomaly, int... anomalyIndices) { + Set<MergedAnomalyResultDTO> set = makeSet(anomalyIndices); + set.add(anomaly); + return set; + } + + private Set<MergedAnomalyResultDTO> makeSet(int... anomalyIndices) { + Set<MergedAnomalyResultDTO> output = new HashSet<>(); + for (int anomalyIndex : anomalyIndices) { + output.add(this.detectionAnomalies.get(anomalyIndex)); + } + return output; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
