This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d3c2695  [TE] Skip creating notification task when there is no anomaly 
to notify (#4544)
d3c2695 is described below

commit d3c269534ceacfbeac6568311276b9d101cd3e90
Author: Xiaohui Sun <[email protected]>
AuthorDate: Wed Aug 21 08:16:58 2019 -0700

    [TE] Skip creating notification task when there is no anomaly to notify 
(#4544)
---
 .../detection/alert/DetectionAlertJob.java         |  30 +++-
 .../pinot/thirdeye/datalayer/DaoTestUtils.java     |  40 +++++
 .../integration/NotificationTaskSchedulerTest.java | 183 +++++++++++++++++++++
 .../src/test/resources/sample-alert-config.yml     |  38 +++++
 .../src/test/resources/sample-detection-config.yml |  19 +++
 5 files changed, 309 insertions(+), 1 deletion(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index 8a86170..619bc1f 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,9 +22,10 @@ package org.apache.pinot.thirdeye.detection.alert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
-import java.util.Random;
+import java.util.Map;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
@@ -46,10 +47,12 @@ public class DetectionAlertJob implements Job {
   private DetectionAlertConfigManager alertConfigDAO;
   private TaskManager taskDAO;
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private MergedAnomalyResultManager anomalyDAO;
 
   public DetectionAlertJob() {
     this.alertConfigDAO = 
DAORegistry.getInstance().getDetectionAlertConfigManager();
     this.taskDAO = DAORegistry.getInstance().getTaskDAO();
+    this.anomalyDAO = DAORegistry.getInstance().getMergedAnomalyResultDAO();
   }
 
   @Override
@@ -79,6 +82,11 @@ public class DetectionAlertJob implements Job {
       return;
     }
 
+    if (configDTO != null && !needNotification(configDTO)) {
+      LOG.info("Skip scheduling subscription task {}. No anomaly to notify.", 
jobName);
+      return;
+    }
+
     String taskInfoJson = null;
     try {
       taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -96,6 +104,26 @@ public class DetectionAlertJob implements Job {
     LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
   }
 
+  /**
+   * Check if we need to create a notification task.
+   * If there is no anomaly generated between last notification time till now 
then no need to create this task.
+   *
+   * @param configDTO The DetectionAlert Configuration.
+   * @return true if it needs notification task. false otherwise.
+   */
+  private boolean needNotification(DetectionAlertConfigDTO configDTO) {
+    Map<Long, Long> vectorLocks = configDTO.getVectorClocks();
+    for (Map.Entry<Long, Long> vectorLock : vectorLocks.entrySet()) {
+      long configId = vectorLock.getKey();
+      long lastNotifiedTime = vectorLock.getValue();
+      if 
(anomalyDAO.findByStartTimeInRangeAndDetectionConfigId(lastNotifiedTime, 
System.currentTimeMillis(), configId)
+          .stream().anyMatch(x -> !x.isChild())) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   private Long getIdFromJobKey(String jobKey) {
     String[] tokens = jobKey.split("_");
     String id = tokens[tokens.length - 1];
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
index cf92740..43d0fb5 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/datalayer/DaoTestUtils.java
@@ -19,6 +19,8 @@ package org.apache.pinot.thirdeye.datalayer;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
+import java.io.IOException;
+import org.apache.commons.io.IOUtils;
 import org.apache.pinot.thirdeye.alert.commons.AnomalyFeedConfig;
 import org.apache.pinot.thirdeye.alert.commons.AnomalyFetcherConfig;
 import org.apache.pinot.thirdeye.alert.commons.AnomalyNotifiedStatus;
@@ -33,6 +35,7 @@ import 
org.apache.pinot.thirdeye.anomalydetection.performanceEvaluation.Performa
 import org.apache.pinot.thirdeye.common.dimension.DimensionMap;
 import org.apache.pinot.thirdeye.common.metric.MetricType;
 import org.apache.pinot.thirdeye.constant.MetricAggFunction;
+import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
 import org.apache.pinot.thirdeye.datalayer.dto.AlertConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.AlertSnapshotDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
@@ -41,6 +44,8 @@ import 
org.apache.pinot.thirdeye.datalayer.dto.ClassificationConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.ConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DataCompletenessConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionStatusDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.EntityToEntityMappingDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.JobDTO;
@@ -51,7 +56,11 @@ import 
org.apache.pinot.thirdeye.datalayer.dto.OverrideConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.RootcauseSessionDTO;
 import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean;
 import org.apache.pinot.thirdeye.datasource.pinot.PinotThirdEyeDataSource;
+import org.apache.pinot.thirdeye.detection.DataProvider;
 import 
org.apache.pinot.thirdeye.detection.alert.DetectionAlertFilterRecipients;
+import org.apache.pinot.thirdeye.detection.validators.DetectionConfigValidator;
+import 
org.apache.pinot.thirdeye.detection.yaml.translator.DetectionConfigTranslator;
+import 
org.apache.pinot.thirdeye.detection.yaml.translator.SubscriptionConfigTranslator;
 import org.apache.pinot.thirdeye.detector.email.filter.AlphaBetaAlertFilter;
 import org.apache.pinot.thirdeye.detector.metric.transfer.ScalingFactor;
 import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
@@ -65,6 +74,37 @@ import org.joda.time.DateTime;
 
 
 public class DaoTestUtils {
+
+  public static DetectionConfigDTO getTestDetectionConfig(DataProvider 
provider, String detectionConfigFile) throws
+                                                                               
                              IOException {
+    String yamlConfig = 
IOUtils.toString(DaoTestUtils.class.getResourceAsStream(detectionConfigFile));
+
+    // Translate
+    DetectionConfigTranslator translator = new 
DetectionConfigTranslator(yamlConfig, provider);
+    DetectionConfigDTO detectionConfig = translator.translate();
+
+    Map<String, Object> properties = detectionConfig.getProperties();
+    properties.put("timezone", "UTC");
+    detectionConfig.setProperties(properties);
+    detectionConfig.setCron("0/10 * * * * ?");
+
+    DetectionConfigValidator validator = new 
DetectionConfigValidator(provider);
+    validator.validateConfig(detectionConfig);
+
+    detectionConfig.setLastTimestamp(DateTime.now().minusDays(2).getMillis());
+    return detectionConfig;
+  }
+
+  public static DetectionAlertConfigDTO 
getTestDetectionAlertConfig(DetectionConfigManager detectionConfigManager, 
String alertConfigFile) throws IOException {
+
+    String yamlConfig = 
IOUtils.toString(DaoTestUtils.class.getResourceAsStream(alertConfigFile));
+
+    DetectionAlertConfigDTO alertConfig = new SubscriptionConfigTranslator(
+        detectionConfigManager, yamlConfig).translate();
+    alertConfig.setCronExpression("0/10 * * * * ?");
+    return alertConfig;
+  }
+
   public static AnomalyFunctionDTO getTestFunctionSpec(String metricName, 
String collection) {
     AnomalyFunctionDTO functionSpec = new AnomalyFunctionDTO();
     functionSpec.setFunctionName("integration test function 1");
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
new file mode 100644
index 0000000..8c26244
--- /dev/null
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/integration/NotificationTaskSchedulerTest.java
@@ -0,0 +1,183 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pinot.thirdeye.integration;
+
+import java.util.List;
+import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
+import org.apache.pinot.thirdeye.datalayer.DaoTestUtils;
+import org.apache.pinot.thirdeye.datalayer.bao.DAOTestBase;
+import org.apache.pinot.thirdeye.datalayer.bao.DatasetConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EvaluationManager;
+import org.apache.pinot.thirdeye.datalayer.bao.EventManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MergedAnomalyResultManager;
+import org.apache.pinot.thirdeye.datalayer.bao.MetricConfigManager;
+import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
+import org.apache.pinot.thirdeye.datalayer.dto.MergedAnomalyResultDTO;
+import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO;
+import org.apache.pinot.thirdeye.datasource.DAORegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.loader.AggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultAggregationLoader;
+import org.apache.pinot.thirdeye.datasource.loader.DefaultTimeSeriesLoader;
+import org.apache.pinot.thirdeye.datasource.loader.TimeSeriesLoader;
+import org.apache.pinot.thirdeye.detection.DataProvider;
+import org.apache.pinot.thirdeye.detection.DefaultDataProvider;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineLoader;
+import org.apache.pinot.thirdeye.detection.DetectionPipelineScheduler;
+import org.apache.pinot.thirdeye.detection.alert.DetectionAlertScheduler;
+import 
org.apache.pinot.thirdeye.detection.alert.filter.ToAllRecipientsDetectionAlertFilter;
+import org.apache.pinot.thirdeye.detection.alert.scheme.DetectionEmailAlerter;
+import 
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry;
+import 
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
+import org.apache.pinot.thirdeye.detection.components.ThresholdRuleDetector;
+import org.quartz.SchedulerException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.thirdeye.datalayer.DaoTestUtils.*;
+
+
+/**
+ * This tests the notification task scheduler.
+ * The notification task scheduler should not schedule notification task if 
there is no anomaly generated.
+ *
+ */
+public class NotificationTaskSchedulerTest {
+
+  private DetectionPipelineScheduler detectionJobScheduler = null;
+  private DetectionAlertScheduler alertJobScheduler = null;
+  private String detectionConfigFile = "/sample-detection-config.yml";
+  private String alertConfigFile = "/sample-alert-config.yml";
+  private String metric = "cost";
+  private String collection = "test-collection";
+  private DAOTestBase testDAOProvider = null;
+  private DAORegistry daoRegistry = null;
+  private MetricConfigManager metricDAO;
+  private DatasetConfigManager datasetDAO;
+  private EventManager eventDAO;
+  private MergedAnomalyResultManager anomalyDAO;
+  private TaskManager taskDAO;
+  private EvaluationManager evaluationDAO;
+  private DetectionPipelineLoader detectionPipelineLoader;
+  private long detectionId;
+
+  @BeforeClass
+  void beforeClass() {
+    testDAOProvider = DAOTestBase.getInstance();
+    daoRegistry = DAORegistry.getInstance();
+    Assert.assertNotNull(daoRegistry.getJobDAO());
+    initDao();
+    initRegistries();
+  }
+
+  @AfterClass(alwaysRun = true)
+  void afterClass() throws Exception {
+    cleanup_schedulers();
+    testDAOProvider.cleanup();
+  }
+
+  void initRegistries() {
+    DetectionRegistry.registerComponent(ThresholdRuleDetector.class.getName(), 
"THRESHOLD");
+    DetectionAlertRegistry.getInstance().registerAlertScheme("EMAIL", 
DetectionEmailAlerter.class.getName());
+    
DetectionAlertRegistry.getInstance().registerAlertFilter("DEFAULT_ALERTER_PIPELINE",
+        ToAllRecipientsDetectionAlertFilter.class.getName() );
+  }
+
+  void initDao() {
+    daoRegistry = DAORegistry.getInstance();
+    metricDAO = daoRegistry.getMetricConfigDAO();
+    datasetDAO = daoRegistry.getDatasetConfigDAO();
+    eventDAO = daoRegistry.getEventDAO();
+    taskDAO = daoRegistry.getTaskDAO();
+    anomalyDAO = daoRegistry.getMergedAnomalyResultDAO();
+    evaluationDAO = daoRegistry.getEvaluationManager();
+    detectionPipelineLoader = new DetectionPipelineLoader();
+  }
+
+  private void cleanup_schedulers() throws SchedulerException {
+    if (detectionJobScheduler != null) {
+      detectionJobScheduler.shutdown();
+    }
+    if (alertJobScheduler != null) {
+      alertJobScheduler.shutdown();
+    }
+  }
+
+  private void setup() throws Exception {
+    // create test dataset config
+    datasetDAO.save(getTestDatasetConfig(collection));
+    metricDAO.save(getTestMetricConfig(collection, metric, null));
+
+    TimeSeriesLoader timeseriesLoader =
+        new DefaultTimeSeriesLoader(daoRegistry.getMetricConfigDAO(), 
datasetDAO, null);
+    AggregationLoader aggregationLoader =
+        new DefaultAggregationLoader(metricDAO, datasetDAO, 
ThirdEyeCacheRegistry.getInstance().getQueryCache(),
+            ThirdEyeCacheRegistry.getInstance().getDatasetMaxDataTimeCache());
+
+    DataProvider provider = new DefaultDataProvider(metricDAO, datasetDAO, 
eventDAO, anomalyDAO, evaluationDAO,
+        timeseriesLoader, aggregationLoader, detectionPipelineLoader);
+
+    detectionId = 
daoRegistry.getDetectionConfigManager().save(DaoTestUtils.getTestDetectionConfig(provider,
 detectionConfigFile));
+    // create test alert configuration
+    daoRegistry.getDetectionAlertConfigManager()
+        
.save(DaoTestUtils.getTestDetectionAlertConfig(daoRegistry.getDetectionConfigManager(),
 alertConfigFile));
+  }
+
+  @Test
+  public void testNotificationJobCreation() throws Exception {
+    // setup test environment
+    setup();
+
+    // start detection scheduler
+    startDetectionScheduler();
+
+    // start alert scheduler
+    startAlertScheduler();
+
+    // check only detection task is created, but detection alert task is not 
created
+    Thread.sleep(10000);
+    List<TaskDTO> tasks = taskDAO.findAll();
+    Assert.assertTrue(tasks.size() > 0);
+    Assert.assertTrue(tasks.stream().anyMatch(x -> x.getTaskType() == 
TaskConstants.TaskType.DETECTION));
+    Assert.assertTrue(tasks.stream().noneMatch(x -> x.getTaskType() == 
TaskConstants.TaskType.DETECTION_ALERT));
+
+    // generate an anomaly
+    MergedAnomalyResultDTO anomaly = new MergedAnomalyResultDTO();
+    anomaly.setDetectionConfigId(detectionId);
+    anomaly.setStartTime(System.currentTimeMillis() - 1000);
+    anomaly.setEndTime(System.currentTimeMillis());
+    anomalyDAO.save(anomaly);
+
+    // check the detection alert task is created
+    Thread.sleep(10000);
+    tasks = taskDAO.findAll();
+    Assert.assertTrue(tasks.size() > 0);
+    Assert.assertTrue(tasks.stream().anyMatch(x -> x.getTaskType() == 
TaskConstants.TaskType.DETECTION_ALERT));
+  }
+
+  private void startAlertScheduler() throws SchedulerException {
+    alertJobScheduler = new DetectionAlertScheduler();
+    alertJobScheduler.start();
+  }
+
+  private void startDetectionScheduler() throws Exception {
+    detectionJobScheduler = new 
DetectionPipelineScheduler(DAORegistry.getInstance().getDetectionConfigManager());
+    detectionJobScheduler.start();
+  }
+}
diff --git a/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml 
b/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml
new file mode 100644
index 0000000..f54e851
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/sample-alert-config.yml
@@ -0,0 +1,38 @@
+# Below is a sample subscription group template. You may refer the 
documentation and update accordingly.
+
+# The name of the subscription group. You may choose an existing or a provide 
a new subscription group name
+subscriptionGroupName: test_subscription_group
+
+# Every alert in ThirdEye is attached to an application. Please specify the 
registered application name here. You may request for a new application by 
dropping an email to ask_thirdeye
+application: thirdeye-internal
+
+# The default notification type. See additional settings for details and 
exploring other notification types like dimension alerter.
+type: DEFAULT_ALERTER_PIPELINE
+
+# List of detection names that you want to subscribe. Copy-paste the detection 
name from the above anomaly detection config here.
+subscribedDetections:
+  - sample_detection
+
+# Configure how you want to be alerted. You can receive the standard ThirdEye 
email alert (recommended)
+# or for advanced critical use-cases setup Iris alert by referring to the 
documentation
+alertSchemes:
+  - type: EMAIL
+recipients:
+  to:
+    - "[email protected]"          # Specify alert recipient email address here
+    - "[email protected]"
+  cc:
+    - "[email protected]"
+fromAddress: [email protected]
+
+# The frequency at which you want to be notified. Typically you want to be 
notified immediately after
+# an anomaly is detected. The below cron runs every 5 minutes. Use online 
cronmaker to compute this.
+cron: "0 0/5 * 1/1 * ? *"
+
+# Enable or disable notification of alert
+active: true
+
+# The below links will appear in the email alerts. This will help alert 
recipients to quickly refer and act on.
+referenceLinks:
+  "Oncall Runbook": "http://go/oncall";
+  "Thirdeye FAQs": "http://go/thirdeyefaqs";
diff --git 
a/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml 
b/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml
new file mode 100644
index 0000000..442d7e6
--- /dev/null
+++ b/thirdeye/thirdeye-pinot/src/test/resources/sample-detection-config.yml
@@ -0,0 +1,19 @@
+detectionName: sample_detection
+
+# Tell the alert recipients what it means if this alert is fired.
+description: If this alert fires then it means so-and-so and check so-and-so 
for irregularities
+
+# The metric you want to do anomaly detection on. You may type a few 
characters and look ahead (ctrl + space) to auto-fill.
+metric: cost
+
+# The dataset or UMP table name to which the metric belongs. Look ahead should 
auto populate this field.
+dataset: test-collection
+
+rules:
+  - detection:
+      - name: detection_rule_1
+        type: THRESHOLD
+        params:
+          max: 500
+          min: NaN
+          monitoringGranularity: 1_HOURS


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to