This is an automated email from the ASF dual-hosted git repository. xhsun pushed a commit to branch distribute_tasks in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 1ebdc7fbfc43bcd96a507d7a76166d61c480c3f6 Author: Xiaohui Sun <[email protected]> AuthorDate: Fri May 17 12:19:19 2019 -0700 [TE] distribute detection and notification tasks --- .../pinot/thirdeye/detection/DetectionPipelineJob.java | 13 ++++++++++--- .../thirdeye/detection/alert/DetectionAlertJob.java | 12 ++++++++++-- .../detection/alert/DetectionAlertTaskRunner.java | 7 ------- .../yaml/CompositePipelineConfigTranslator.java | 17 +++++++++++++---- .../yaml/YamlDetectionAlertConfigTranslator.java | 15 +++++++-------- .../yaml/YamlDetectionAlertConfigTranslatorTest.java | 2 +- 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java index 6349466..875d728 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; @@ -82,9 +83,15 @@ public class DetectionPipelineJob implements Job { taskDTO.setStatus(TaskConstants.TaskStatus.WAITING); taskDTO.setTaskInfo(taskInfoJson); - long taskId = taskDAO.save(taskDTO); - LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId); - + // Sleep random 0 - 1000 milliseconds to distribute load to mysql. + Random random = new Random(); + try { + Thread.sleep(random.nextInt(1000)); + long taskId = taskDAO.save(taskDTO); + LOG.info("Created detection pipeline task {} with taskId {}", taskDTO, taskId); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } } private Long getIdFromJobKey(String jobKey) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java index bf89932..f11425a 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java @@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.util.List; +import java.util.Random; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; @@ -91,8 +92,15 @@ public class DetectionAlertJob implements Job { taskDTO.setStatus(TaskConstants.TaskStatus.WAITING); taskDTO.setTaskInfo(taskInfoJson); - long taskId = taskDAO.save(taskDTO); - LOG.info("Created subscription task {} with settings {}", taskId, taskDTO); + // Sleep random 0 - 1000 milliseconds to distribute load to mysql. + Random random = new Random(); + try { + Thread.sleep(random.nextInt(1000)); + long taskId = taskDAO.save(taskDTO); + LOG.info("Created subscription task {} with settings {}", taskId, taskDTO); + } catch (InterruptedException e) { + LOG.error(e.toString()); + } } private Long getIdFromJobKey(String jobKey) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java index b4fba8f..d977c39 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertTaskRunner.java @@ -107,13 +107,6 @@ public class DetectionAlertTaskRunner implements TaskRunner { DetectionAlertFilter alertFilter = detAlertTaskFactory.loadAlertFilter(alertConfig, System.currentTimeMillis()); DetectionAlertFilterResult result = alertFilter.run(); - // TODO: The old UI relies on notified tag to display the anomalies. After the migration - // we need to clean up all references to notified tag. - for (MergedAnomalyResultDTO anomaly : result.getAllAnomalies()) { - anomaly.setNotified(true); - mergedAnomalyDAO.update(anomaly); - } - // Suppress alerts if any and get the filtered anomalies to be notified Set<DetectionAlertSuppressor> alertSuppressors = detAlertTaskFactory.loadAlertSuppressors(alertConfig); for (DetectionAlertSuppressor alertSuppressor : alertSuppressors) { diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java index 9f14ded..e5377e9 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java @@ -31,6 +31,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.commons.collections.MapUtils; @@ -379,16 +380,24 @@ public class CompositePipelineConfigTranslator extends YamlDetectionConfigTransl return properties; } + // Default schedule: + // minute granularity: every 15 minutes, starts at 0 minute + // hourly: every hour, starts at 0 minute + // daily: every day, starts at 2 pm UTC + // others: every day, start at 12 am UTC private String buildCron() { + // starts at random second to reduce task spike + Random random = new Random(); + String second = Integer.toString(random.nextInt(59)); switch (this.datasetConfig.bucketTimeGranularity().getUnit()) { case MINUTES: - return "0 0/15 * * * ? *"; + return second + " 0/15 * * * ? *"; case HOURS: - return "0 0 * * * ? *"; + return second + " 0 * * * ? *"; case DAYS: - return "0 0 14 * * ? *"; + return second + " 0 14 * * ? *"; default: - return "0 0 0 * * ?"; + return second + " 0 0 * * ?"; } } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java index 9f6d8d9..6b15e98 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java @@ -20,19 +20,14 @@ package org.apache.pinot.thirdeye.detection.yaml; import com.google.common.base.CaseFormat; -import com.google.common.base.Preconditions; +import java.util.Random; import java.util.stream.Collectors; -import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; -import org.apache.pinot.thirdeye.datalayer.dto.AnomalyFunctionDTO; import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; -import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; import org.apache.pinot.thirdeye.datalayer.pojo.AlertConfigBean; import org.apache.pinot.thirdeye.datalayer.util.Predicate; -import org.apache.pinot.thirdeye.datasource.DAORegistry; import org.apache.pinot.thirdeye.detection.ConfigUtils; import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionAlertRegistry; -import org.apache.pinot.thirdeye.detection.annotation.registry.DetectionRegistry; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -68,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator { static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors"; static final String PROP_REFERENCE_LINKS = "referenceLinks"; static final String PROP_TIME_WINDOWS = "timeWindows"; - static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min + // Every 5 minutes. Second needs to be provided. + static final String CRON_SCHEDULE_DEFAULT_NO_SECOND = " 0/5 * * * ? *"; private static final String PROP_ONLY_FETCH_LEGACY_ANOMALIES = "onlyFetchLegacyAnomalies"; private static final String PROP_DIMENSION = "dimension"; @@ -168,7 +164,10 @@ public class YamlDetectionAlertConfigTranslator { alertConfigDTO.setApplication(MapUtils.getString(yamlAlertConfig, PROP_APPLICATION)); alertConfigDTO.setFrom(MapUtils.getString(yamlAlertConfig, PROP_FROM)); - alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, PROP_CRON, CRON_SCHEDULE_DEFAULT)); + // starts at random second to reduce task spike + Random random = new Random(); + String second = Integer.toString(random.nextInt(59)); + alertConfigDTO.setCronExpression(MapUtils.getString(yamlAlertConfig, PROP_CRON, second + CRON_SCHEDULE_DEFAULT_NO_SECOND)); alertConfigDTO.setActive(MapUtils.getBooleanValue(yamlAlertConfig, PROP_ACTIVE, true)); alertConfigDTO.setSubjectType(AlertConfigBean.SubjectType.valueOf( diff --git a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java index cb57d1c..f2aaeb3 100644 --- a/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java +++ b/thirdeye/thirdeye-pinot/src/test/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslatorTest.java @@ -35,7 +35,7 @@ public class YamlDetectionAlertConfigTranslatorTest { alertYamlConfigs.put(PROP_SUBS_GROUP_NAME, "test_group_name"); alertYamlConfigs.put(PROP_APPLICATION, "test_application"); alertYamlConfigs.put(PROP_FROM, "thirdeye@thirdeye"); - alertYamlConfigs.put(PROP_CRON, CRON_SCHEDULE_DEFAULT); + alertYamlConfigs.put(PROP_CRON, 0 + CRON_SCHEDULE_DEFAULT_NO_SECOND); alertYamlConfigs.put(PROP_ACTIVE, true); alertYamlConfigs.put(PROP_DETECTION_NAMES, Collections.singletonList("test_pipeline_1")); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
