This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch distribute_tasks
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 1ebdc7fbfc43bcd96a507d7a76166d61c480c3f6
Author: Xiaohui Sun <[email protected]>
AuthorDate: Fri May 17 12:19:19 2019 -0700

    [TE] distribute detection and notification tasks
---
 .../pinot/thirdeye/detection/DetectionPipelineJob.java  | 13 ++++++++++---
 .../thirdeye/detection/alert/DetectionAlertJob.java     | 12 ++++++++++--
 .../detection/alert/DetectionAlertTaskRunner.java       |  7 -------
 .../yaml/CompositePipelineConfigTranslator.java         | 17 +++++++++++++----
 .../yaml/YamlDetectionAlertConfigTranslator.java        | 15 +++++++--------
 .../yaml/YamlDetectionAlertConfigTranslatorTest.java    |  2 +-
 6 files changed, 41 insertions(+), 25 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 6349466..875d728 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -82,9 +83,15 @@ public class DetectionPipelineJob implements Job {
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 
-    long taskId = taskDAO.save(taskDTO);
-    LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, 
taskId);
-
+    // Sleep random 0 - 1000 milliseconds to distribute load to mysql.
+    Random random = new Random();
+    try {
+      Thread.sleep(random.nextInt(1000));
+      long taskId = taskDAO.save(taskDTO);
+      LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, 
taskId);
+    } catch (InterruptedException e) {
+      LOG.error(e.toString());
+    }
   }
 
   private Long getIdFromJobKey(String jobKey) {
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index bf89932..f11425a 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.List;
+import java.util.Random;
 import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -91,8 +92,15 @@ public class DetectionAlertJob implements Job {
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 
-    long taskId = taskDAO.save(taskDTO);
-    LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+    // Sleep random 0 - 1000 milliseconds to distribute load to mysql.
+    Random random = new Random();
+    try {
+      Thread.sleep(random.nextInt(1000));
+      long taskId = taskDAO.save(taskDTO);
+      LOG.info("Created subscription task {} with settings {}", taskId, 
taskDTO);
+    } catch (InterruptedException e) {
+      LOG.error(e.toString());
+    }
   }
 
   private Long getIdFromJobKey(String jobKey) {
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
index b4fba8f..d977c39 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java
@@ -107,13 +107,6 @@ public class DetectionAlertTaskRunner implements 
TaskRunner {
       DetectionAlertFilter alertFilter = 
detAlertTaskFactory.loadAlertFilter(alertConfig, System.currentTimeMillis());
       DetectionAlertFilterResult result = alertFilter.run();
 
-      // TODO: The old UI relies on notified tag to display the anomalies. 
After the migration
-      // we need to clean up all references to notified tag.
-      for (MergedAnomalyResultDTO anomaly : result.getAllAnomalies()) {
-        anomaly.setNotified(true);
-        mergedAnomalyDAO.update(anomaly);
-      }
-
       // Suppress alerts if any and get the filtered anomalies to be notified
       Set<DetectionAlertSuppressor> alertSuppressors = 
detAlertTaskFactory.loadAlertSuppressors(alertConfig);
       for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) {
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 9f14ded..e5377e9 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -31,6 +31,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.collections.MapUtils;
@@ -379,16 +380,24 @@ public class CompositePipelineConfigTranslator extends 
YamlDetectionConfigTransl
     return properties;
   }
 
+  //  Default schedule:
+  //  minute granularity: every 15 minutes, starts at 0 minute
+  //  hourly: every hour, starts at 0 minute
+  //  daily: every day, starts at 2 pm UTC
+  //  others: every day, start at 12 am UTC
   private String buildCron() {
+    // starts at random second to reduce task spike
+    Random random = new Random();
+    String second = Integer.toString(random.nextInt(59));
     switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
       case MINUTES:
-        return "0 0/15 * * * ? *";
+        return second + " 0/15 * * * ? *";
       case HOURS:
-        return "0 0 * * * ? *";
+        return second + " 0 * * * ? *";
       case DAYS:
-        return "0 0 14 * * ? *";
+        return second + " 0 14 * * ? *";
       default:
-        return "0 0 0 * * ?";
+        return second + " 0 0 * * ?";
     }
   }
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index 9f6d8d9..6b15e98 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -20,19 +20,14 @@
 package org.apache.pinot.thirdeye.detection.yaml;
 
 import com.google.common.base.CaseFormat;
-import com.google.common.base.Preconditions;
+import java.util.Random;
 import java.util.stream.Collectors;
-import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
 import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager;
-import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO;
 import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO;
-import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO;
 import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean;
 import org.apache.pinot.thirdeye.datalayer.util.Predicate;
-import org.apache.pinot.thirdeye.datasource.DAORegistry;
 import org.apache.pinot.thirdeye.detection.ConfigUtils;
 import 
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry;
-import 
org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -68,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator {
   static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors";
   static final String PROP_REFERENCE_LINKS = "referenceLinks";
   static final String PROP_TIME_WINDOWS = "timeWindows";
-  static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min
+  // Every 5 minutes. Second needs to be provided.
+  static final String CRON_SCHEDULE_DEFAULT_NO_SECOND = " 0/5 * * * ? *";
 
   private static final String PROP_ONLY_FETCH_LEGACY_ANOMALIES = 
"onlyFetchLegacyAnomalies";
   private static final String PROP_DIMENSION = "dimension";
@@ -168,7 +164,10 @@ public class YamlDetectionAlertConfigTranslator {
     alertConfigDTO.setApplication(MapUtils.getString(yamlAlertConfig, 
PROP_APPLICATION));
     alertConfigDTO.setFrom(MapUtils.getString(yamlAlertConfig, PROP_FROM));
 
-    alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, 
PROP_CRON, CRON_SCHEDULE_DEFAULT));
+    // starts at random second to reduce task spike
+    Random random = new Random();
+    String second = Integer.toString(random.nextInt(59));
+    alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, 
PROP_CRON, second + CRON_SCHEDULE_DEFAULT_NO_SECOND));
     alertConfigDTO.setActive(MapUtils.getBooleanValue(yamlAlertConfig, 
PROP_ACTIVE, true));
 
     alertConfigDTO.setSubjectType(AlertConfigBean.SubjectType.valueOf(
diff --git 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
index cb57d1c..f2aaeb3 100644
--- 
a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
+++ 
b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java
@@ -35,7 +35,7 @@ public class YamlDetectionAlertConfigTranslatorTest {
     alertYamlConfigs.put(PROP_SUBS_GROUP_NAME, "test_group_name");
     alertYamlConfigs.put(PROP_APPLICATION, "test_application");
     alertYamlConfigs.put(PROP_FROM, "thirdeye@thirdeye");
-    alertYamlConfigs.put(PROP_CRON, CRON_SCHEDULE_DEFAULT);
+    alertYamlConfigs.put(PROP_CRON, 0 + CRON_SCHEDULE_DEFAULT_NO_SECOND);
     alertYamlConfigs.put(PROP_ACTIVE, true);
     alertYamlConfigs.put(PROP_DETECTION_NAMES, 
Collections.singletonList("test_pipeline_1"));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to