This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3c2695 [TE] Skip creating notification task when there is no anomaly
to notify (#4544)
d3c2695 is described below
commit d3c269534ceacfbeac6568311276b9d101cd3e90
Author: Xiaohui Sun <[email protected]>
AuthorDate: Wed Aug 21 08:16:58 2019 -0700
[TE] Skip creating notification task when there is no anomaly to notify
(#4544)
---
.../detection/alert/DetectionAlertJob.java | 30 +++-
.../pinot/thirdeye/datalayer/DaoTestUtils.java | 40 +++++
.../integration/NotificationTaskSchedulerTest.java | 183 +++++++++++++++++++++
.../src/test/resources/sample-alert-config.yml | 38 +++++
.../src/test/resources/sample-detection-config.yml | 19 +++
5 files changed, 309 insertions(+), 1 deletion(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 8a86170..619bc1f 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,9 +22,10 @@ package org.apache.pinot.thirdeye.detection.alert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
-import java.util.Random;
+import java.util.Map;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
@@ -46,10 +47,12 @@ public class DetectionAlertJob implements Job {
private DetectionAlertConfigManager alertConfigDAO;
private TaskManager taskDAO;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private MergedAnomalyResultManager anomalyDAO;
public DetectionAlertJob() {
this.alertConfigDAO =
DAORegistry.getInstance().getDetectionAlertConfigManager();
this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+ this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
}
@Override
@@ -79,6 +82,11 @@ public class DetectionAlertJob implements Job {
return;
}
+ if (configDTO != null && !needNotification(configDTO)) {
+ LOG.info("Skip scheduling subscription task {}. No anomaly to notify.",
jobName);
+ return;
+ }
+
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -96,6 +104,26 @@ public class DetectionAlertJob implements Job {
LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
}
+ /**
+ * Check if we need to create a notification task.
+ * If there is no anomaly generated between last notification time till now
then no need to create this task.
+ *
+ * @param configDTO The DetectionAlert Configuration.
+ * @return true if it needs notification task. false otherwise.
+ */
+ private boolean needNotification(DetectionAlertConfigDTO configDTO) {
+ Map<Long, Long> vectorLocks = configDTO.getVectorClocks();
+ for (Map.Entry<Long, Long> vectorLock : vectorLocks.entrySet()) {
+ long configId = vectorLock.getKey();
+ long lastNotifiedTime = vectorLock.getValue();
+ if
(anomalyDAO.findByStartTimeInRangeAndDetectionConfigId(lastNotifiedTime,
System.currentTimeMillis(), configId)
+ .stream().anyMatch(x -> !x.isChild())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private Long getIdFromJobKey(String jobKey) {
String[] tokens = jobKey.split("_");
String id = tokens[tokens.length - 1];
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
index cf92740..43d0fb5 100644
---
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
@@ -19,6 +19,8 @@ package org.apache.pinot.thirdeye.datalayer;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
import org.apache.pinot.thirdeye.alert.commons.AnomalyFeedConfig;
import org.apache.pinot.thirdeye.alert.commons.AnomalyFetcherConfig;
import org.apache.pinot.thirdeye.alert.commons.AnomalyNotifiedStatus;
@@ -33,6 +35,7 @@ import
org.apache.pinot.thirdeye.anomalydetection.performanceEvaluation.Performa
import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
import org.apache.pinot.thirdeye.common.metric.MetricType;
import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.AlertSnapshotDTO;
import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
@@ -41,6 +44,8 @@ import
org.apache.pinot.thirdeye.datalayer.dto.ClassificationConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.ConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DataCompletenessConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.DetectionStatusDTO;
import org.apache.pinot.thirdeye.datalayer.dto.EntityToEntityMappingDTO;
import org.apache.pinot.thirdeye.datalayer.dto.JobDTO;
@@ -51,7 +56,11 @@ import
org.apache.pinot.thirdeye.datalayer.dto.OverrideConfigDTO;
import org.apache.pinot.thirdeye.datalayer.dto.RootcauseSessionDTO;
import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean;
import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
+import org.apache.pinot.thirdeye.detection.DataProvider;
import
org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import
org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import
org.apache.pinot.thirdeye.detection.yaml.translator.SubscriptionConfigTranslator;
import org.apache.pinot.thirdeye.detector.email.filter.AlphaBetaAlertFilter;
import org.apache.pinot.thirdeye.detector.metric.transfer.ScalingFactor;
import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
@@ -65,6 +74,37 @@ import org.joda.time.DateTime;
public class DaoTestUtils {
+
+ public static DetectionConfigDTO getTestDetectionConfig(DataProvider
provider, String detectionConfigFile) throws
+
IOException {
+ String yamlConfig =
IOUtils.toString(DaoTestUtils.class.getResourceAsStream(detectionConfigFile));
+
+ // Translate
+ DetectionConfigTranslator translator = new
DetectionConfigTranslator(yamlConfig, provider);
+ DetectionConfigDTO detectionConfig = translator.translate();
+
+ Map<String, Object> properties = detectionConfig.getProperties();
+ properties.put("timezone", "UTC");
+ detectionConfig.setProperties(properties);
+ detectionConfig.setCron("0/10 * * * * ?");
+
+ DetectionConfigValidator validator = new
DetectionConfigValidator(provider);
+ validator.validateConfig(detectionConfig);
+
+ detectionConfig.setLastTimestamp(DateTime.now().minusDays(2).getMillis());
+ return detectionConfig;
+ }
+
+ public static DetectionAlertConfigDTO
getTestDetectionAlertConfig(DetectionConfigManager detectionConfigManager,
String alertConfigFile) throws IOException {
+
+ String yamlConfig =
IOUtils.toString(DaoTestUtils.class.getResourceAsStream(alertConfigFile));
+
+ DetectionAlertConfigDTO alertConfig = new SubscriptionConfigTranslator(
+ detectionConfigManager, yamlConfig).translate();
+ alertConfig.setCronExpression("0/10 * * * * ?");
+ return alertConfig;
+ }
+
public static AnomalyFunctionDTO getTestFunctionSpec(String metricName,
String collection) {
AnomalyFunctionDTO functionSpec = new AnomalyFunctionDTO();
functionSpec.setFunctionName("integration test function 1");
diff --git
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
new file mode 100644
index 0000000..8c26244
--- /dev/null
+++
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
@@ -0,0 +1,183 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.integration;
+
+import java.util.List;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.datalayer.DaoTestUtils;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineScheduler;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertScheduler;
+import
org.apache.pinot.thirdeye.detection.alert.filter.ToAllRecipientsDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.scheme.DetectionEmailAlerter;
+import
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry;
+import
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector;
+import org.quartz.SchedulerException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.datalayer.DaoTestUtils.*;
+
+
+/**
+ * This tests the notification task scheduler.
+ * The notification task scheduler should not schedule notification task if
there is no anomaly generated.
+ *
+ */
+public class NotificationTaskSchedulerTest {
+
+ private DetectionPipelineScheduler detectionJobScheduler = null;
+ private DetectionAlertScheduler alertJobScheduler = null;
+ private String detectionConfigFile = "/sample-detection-config.yml";
+ private String alertConfigFile = "/sample-alert-config.yml";
+ private String metric = "cost";
+ private String collection = "test-collection";
+ private DAOTestBase testDAOProvider = null;
+ private DAORegistry daoRegistry = null;
+ private MetricConfigManager metricDAO;
+ private DatasetConfigManager datasetDAO;
+ private EventManager eventDAO;
+ private MergedAnomalyResultManager anomalyDAO;
+ private TaskManager taskDAO;
+ private EvaluationManager evaluationDAO;
+ private DetectionPipelineLoader detectionPipelineLoader;
+ private long detectionId;
+
+ @BeforeClass
+ void beforeClass() {
+ testDAOProvider = DAOTestBase.getInstance();
+ daoRegistry = DAORegistry.getInstance();
+ Assert.assertNotNull(daoRegistry.getJobDAO());
+ initDao();
+ initRegistries();
+ }
+
+ @AfterClass(alwaysRun = true)
+ void afterClass() throws Exception {
+ cleanup_schedulers();
+ testDAOProvider.cleanup();
+ }
+
+ void initRegistries() {
+ DetectionRegistry.registerComponent(ThresholdRuleDetector.class.getName(),
"THRESHOLD");
+ DetectionAlertRegistry.getInstance().registerAlertScheme("EMAIL",
DetectionEmailAlerter.class.getName());
+
DetectionAlertRegistry.getInstance().registerAlertFilter("DEFAULT_ALERTER_PIPELINE",
+ ToAllRecipientsDetectionAlertFilter.class.getName() );
+ }
+
+ void initDao() {
+ daoRegistry = DAORegistry.getInstance();
+ metricDAO = daoRegistry.getMetricConfigDAO();
+ datasetDAO = daoRegistry.getDatasetConfigDAO();
+ eventDAO = daoRegistry.getEventDAO();
+ taskDAO = daoRegistry.getTaskDAO();
+ anomalyDAO = daoRegistry.getMergedAnomalyResultDAO();
+ evaluationDAO = daoRegistry.getEvaluationManager();
+ detectionPipelineLoader = new DetectionPipelineLoader();
+ }
+
+ private void cleanup_schedulers() throws SchedulerException {
+ if (detectionJobScheduler != null) {
+ detectionJobScheduler.shutdown();
+ }
+ if (alertJobScheduler != null) {
+ alertJobScheduler.shutdown();
+ }
+ }
+
+ private void setup() throws Exception {
+ // create test dataset config
+ datasetDAO.save(getTestDatasetConfig(collection));
+ metricDAO.save(getTestMetricConfig(collection, metric, null));
+
+ TimeSeriesLoader timeseriesLoader =
+ new DefaultTimeSeriesLoader(daoRegistry.getMetricConfigDAO(),
datasetDAO, null);
+ AggregationLoader aggregationLoader =
+ new DefaultAggregationLoader(metricDAO, datasetDAO,
ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+ ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+ DataProvider provider = new DefaultDataProvider(metricDAO, datasetDAO,
eventDAO, anomalyDAO, evaluationDAO,
+ timeseriesLoader, aggregationLoader, detectionPipelineLoader);
+
+ detectionId =
daoRegistry.getDetectionConfigManager().save(DaoTestUtils.getTestDetectionConfig(provider,
detectionConfigFile));
+ // create test alert configuration
+ daoRegistry.getDetectionAlertConfigManager()
+
.save(DaoTestUtils.getTestDetectionAlertConfig(daoRegistry.getDetectionConfigManager(),
alertConfigFile));
+ }
+
+ @Test
+ public void testNotificationJobCreation() throws Exception {
+ // setup test environment
+ setup();
+
+ // start detection scheduler
+ startDetectionScheduler();
+
+ // start alert scheduler
+ startAlertScheduler();
+
+ // check only detection task is created, but detection alert task is not
created
+ Thread.sleep(10000);
+ List<TaskDTO> tasks = taskDAO.findAll();
+ Assert.assertTrue(tasks.size() > 0);
+ Assert.assertTrue(tasks.stream().anyMatch(x -> x.getTaskType() ==
TaskConstants.TaskType.DETECTION));
+ Assert.assertTrue(tasks.stream().noneMatch(x -> x.getTaskType() ==
TaskConstants.TaskType.DETECTION_ALERT));
+
+ // generate an anomaly
+ MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+ anomaly.setDetectionConfigId(detectionId);
+ anomaly.setStartTime(System.currentTimeMillis() - 1000);
+ anomaly.setEndTime(System.currentTimeMillis());
+ anomalyDAO.save(anomaly);
+
+ // check the detection alert task is created
+ Thread.sleep(10000);
+ tasks = taskDAO.findAll();
+ Assert.assertTrue(tasks.size() > 0);
+ Assert.assertTrue(tasks.stream().anyMatch(x -> x.getTaskType() ==
TaskConstants.TaskType.DETECTION_ALERT));
+ }
+
+ private void startAlertScheduler() throws SchedulerException {
+ alertJobScheduler = new DetectionAlertScheduler();
+ alertJobScheduler.start();
+ }
+
+ private void startDetectionScheduler() throws Exception {
+ detectionJobScheduler = new
DetectionPipelineScheduler(DAORegistry.getInstance().getDetectionConfigManager());
+ detectionJobScheduler.start();
+ }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml
b/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml
new file mode 100644
index 0000000..f54e851
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml
@@ -0,0 +1,38 @@
+# Below is a sample subscription group template. You may refer the
documentation and update accordingly.
+
+# The name of the subscription group. You may choose an existing or a provide
a new subscription group name
+subscriptionGroupName: test_subscription_group
+
+# Every alert in ThirdEye is attached to an application. Please specify the
registered application name here. You may request for a new application by
dropping an email to ask_thirdeye
+application: thirdeye-internal
+
+# The default notification type. See additional settings for details and
exploring other notification types like dimension alerter.
+type: DEFAULT_ALERTER_PIPELINE
+
+# List of detection names that you want to subscribe. Copy-paste the detection
name from the above anomaly detection config here.
+subscribedDetections:
+ - sample_detection
+
+# Configure how you want to be alerted. You can receive the standard ThirdEye
email alert (recommended)
+# or for advanced critical use-cases setup Iris alert by referring to the
documentation
+alertSchemes:
+ - type: EMAIL
+recipients:
+ to:
+ - "[email protected]" # Specify alert recipient email address here
+ - "[email protected]"
+ cc:
+ - "[email protected]"
+fromAddress: [email protected]
+
+# The frequency at which you want to be notified. Typically you want to be
notified immediately after
+# an anomaly is detected. The below cron runs every 5 minutes. Use online
cronmaker to compute this.
+cron: "0 0/5 * 1/1 * ? *"
+
+# Enable or disable notification of alert
+active: true
+
+# The below links will appear in the email alerts. This will help alert
recipients to quickly refer and act on.
+referenceLinks:
+ "Oncall Runbook": "http://go/oncall"
+ "Thirdeye FAQs": "http://go/thirdeyefaqs"
diff --git
a/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml
b/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml
new file mode 100644
index 0000000..442d7e6
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml
@@ -0,0 +1,19 @@
+detectionName: sample_detection
+
+# Tell the alert recipients what it means if this alert is fired.
+description: If this alert fires then it means so-and-so and check so-and-so
for irregularities
+
+# The metric you want to do anomaly detection on. You may type a few
characters and look ahead (ctrl + space) to auto-fill.
+metric: cost
+
+# The dataset or UMP table name to which the metric belongs. Look ahead should
auto populate this field.
+dataset: test-collection
+
+rules:
+ - detection:
+ - name: detection_rule_1
+ type: THRESHOLD
+ params:
+ max: 500
+ min: NaN
+ monitoringGranularity: 1_HOURS
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]