This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 866289c [TE] distribute detection and notification tasks (#4217)
866289c is described below
commit 866289cf2671316d73195434c5aabd8e315ba18f
Author: Xiaohui Sun <[email protected]>
AuthorDate: Tue May 21 14:20:17 2019 -0700
[TE] distribute detection and notification tasks (#4217)
* [TE] distribute detection and notification tasks
* [TE] fix integration test by adding cron explicitly
* [TE] revert removing notification flag
* [TE] randomization when creating the task
* [TE] randomization when creating the task
* [TE] Distribute job load to add random delays
* [TE] Fix minor bug when logging sleep time
---
.../pinot/thirdeye/detection/DetectionPipelineJob.java | 17 ++++++++++++++---
.../thirdeye/detection/alert/DetectionAlertJob.java | 16 ++++++++++++++--
.../yaml/CompositePipelineConfigTranslator.java | 5 +++++
.../yaml/YamlDetectionAlertConfigTranslator.java | 3 ++-
4 files changed, 35 insertions(+), 6 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 6349466..99f2708 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
@@ -82,9 +83,19 @@ public class DetectionPipelineJob implements Job {
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
- long taskId = taskDAO.save(taskDTO);
- LOG.info("Created detection pipeline task {} with taskId {}", taskDTO,
taskId);
-
+ // TODO: revisit it after identifying bottlenecks
+ // Here will write the task information to mysql.
+ // Sleep random 0 - 5 seconds to distribute load to mysql.
+ Random random = new Random();
+ try {
+ int sleepTime = random.nextInt(5000);
+ LOG.info("Wait for " + sleepTime + " milliseconds.");
+ Thread.sleep(sleepTime);
+ long taskId = taskDAO.save(taskDTO);
+ LOG.info("Created detection pipeline task {} with taskId {}", taskDTO,
taskId);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString());
+ }
}
private Long getIdFromJobKey(String jobKey) {
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
index bf89932..41ebcd5 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java
@@ -22,6 +22,7 @@ package org.apache.pinot.thirdeye.detection.alert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.List;
+import java.util.Random;
import org.apache.pinot.thirdeye.anomaly.task.TaskConstants;
import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager;
import org.apache.pinot.thirdeye.datalayer.bao.TaskManager;
@@ -91,8 +92,19 @@ public class DetectionAlertJob implements Job {
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
- long taskId = taskDAO.save(taskDTO);
- LOG.info("Created subscription task {} with settings {}", taskId, taskDTO);
+ // TODO: revisit it after identifying bottlenecks
+ // Here will write the task information to mysql.
+ // Sleep random 0 - 5 seconds to distribute load to mysql.
+ Random random = new Random();
+ try {
+ int sleepTime = random.nextInt(5000);
+ LOG.info("Wait for " + sleepTime + " milliseconds.");
+ Thread.sleep(sleepTime);
+ long taskId = taskDAO.save(taskDTO);
+ LOG.info("Created subscription task {} with settings {}", taskId,
taskDTO);
+ } catch (InterruptedException e) {
+ LOG.error(e.toString());
+ }
}
private Long getIdFromJobKey(String jobKey) {
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
index 5c30ae2..dde36f4 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/CompositePipelineConfigTranslator.java
@@ -401,6 +401,11 @@ public class CompositePipelineConfigTranslator extends
YamlDetectionConfigTransl
return properties;
}
+ // Default schedule:
+ // minute granularity: every 15 minutes, starts at 0 minute
+ // hourly: every hour, starts at 0 minute
+ // daily: every day, starts at 2 pm UTC
+ // others: every day, start at 12 am UTC
private String buildCron() {
switch (this.datasetConfig.bucketTimeGranularity().getUnit()) {
case MINUTES:
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
index 04501ab..6b90e1b 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/yaml/YamlDetectionAlertConfigTranslator.java
@@ -63,7 +63,8 @@ public class YamlDetectionAlertConfigTranslator {
static final String PROP_ALERT_SUPPRESSORS = "alertSuppressors";
static final String PROP_REFERENCE_LINKS = "referenceLinks";
static final String PROP_TIME_WINDOWS = "timeWindows";
- static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *"; // Every 5 min
+ // Every 5 minutes.
+ static final String CRON_SCHEDULE_DEFAULT = "0 0/5 * * * ? *";
private static final String PROP_DIMENSION = "dimension";
private static final String PROP_DIMENSION_RECIPIENTS =
"dimensionRecipients";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]