This is an automated email from the ASF dual-hosted git repository. jihao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 1f9d096 [TE] detection - task scheduler backoff fix (#3866) 1f9d096 is described below commit 1f9d096041ec141040ba7add08039a957d2e6af9 Author: Jihao Zhang <jihzh...@linkedin.com> AuthorDate: Fri Feb 22 15:20:19 2019 -0800 [TE] detection - task scheduler backoff fix (#3866) Fix the task scheduler backoff mechanism to prevent detection tasks pile up. --- .../thirdeye/detection/DetectionPipelineJob.java | 25 ++++++++++++++++------ .../detection/alert/DetectionAlertJob.java | 19 +++++++++++++++- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java index 2907d7e..3df93d8 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java @@ -21,6 +21,11 @@ package org.apache.pinot.thirdeye.detection; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; import org.apache.pinot.thirdeye.datalayer.bao.DetectionConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; @@ -28,9 +33,6 @@ import org.apache.pinot.thirdeye.datalayer.dto.DetectionConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO; import org.apache.pinot.thirdeye.datalayer.util.Predicate; import org.apache.pinot.thirdeye.datasource.DAORegistry; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import org.quartz.Job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; @@ -57,7 +59,6 @@ public class DetectionPipelineJob implements Job { String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, id); List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND( Predicate.EQ("name", jobName), - Predicate.EQ("startTime", taskInfo.getStart()), Predicate.OR( Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()), Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString()) @@ -65,10 +66,20 @@ public class DetectionPipelineJob implements Job { ) ); - Optional<TaskDTO> latestScheduledTask = scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > task2.getEndTime() ? task1 : task2); - if (latestScheduledTask.isPresent() && taskInfo.getEnd() - latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){ + List<DetectionPipelineTaskInfo> scheduledTaskInfos = scheduledTasks.stream().map(taskDTO -> { + try { + return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), DetectionPipelineTaskInfo.class); + } catch (IOException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + Optional<DetectionPipelineTaskInfo> latestScheduledTask = scheduledTaskInfos.stream() + .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > taskInfo2.getEnd() ? taskInfo1 : taskInfo2); + if (latestScheduledTask.isPresent() + && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < DETECTION_TASK_TIMEOUT) { // if a task is pending and not time out yet, don't schedule more - LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, taskInfo.getStart()); + LOG.info("Skip scheduling detection task for {} with start time {}. Task is already in the queue.", jobName, + taskInfo.getStart()); return; } diff --git a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java index 499e2f5..f0b448e 100644 --- a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java +++ b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/alert/DetectionAlertJob.java @@ -21,11 +21,13 @@ package org.apache.pinot.thirdeye.detection.alert; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; import org.apache.pinot.thirdeye.anomaly.task.TaskConstants; import org.apache.pinot.thirdeye.datalayer.bao.DetectionAlertConfigManager; import org.apache.pinot.thirdeye.datalayer.bao.TaskManager; import org.apache.pinot.thirdeye.datalayer.dto.DetectionAlertConfigDTO; import org.apache.pinot.thirdeye.datalayer.dto.TaskDTO; +import org.apache.pinot.thirdeye.datalayer.util.Predicate; import org.apache.pinot.thirdeye.datasource.DAORegistry; import org.quartz.Job; import org.quartz.JobExecutionContext; @@ -61,6 +63,21 @@ public class DetectionAlertJob implements Job { DetectionAlertTaskInfo taskInfo = new DetectionAlertTaskInfo(detectionAlertConfigId); + // check if a task for this detection alerter is already scheduled + String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId); + List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND( + Predicate.EQ("name", jobName), + Predicate.OR( + Predicate.EQ("status", TaskConstants.TaskStatus.RUNNING.toString()), + Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString()) + )) + ); + + if (!scheduledTasks.isEmpty()){ + // if a task is pending and not time out yet, don't schedule more + LOG.info("Skip scheduling detection alerter task for {}. Task is already in the queue.", jobName); + return; + } String taskInfoJson = null; try { taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo); @@ -70,7 +87,7 @@ public class DetectionAlertJob implements Job { TaskDTO taskDTO = new TaskDTO(); taskDTO.setTaskType(TaskConstants.TaskType.DETECTION_ALERT); - taskDTO.setJobName(String.format("%s_%d", TaskConstants.TaskType.DETECTION_ALERT, detectionAlertConfigId)); + taskDTO.setJobName(jobName); taskDTO.setStatus(TaskConstants.TaskStatus.WAITING); taskDTO.setTaskInfo(taskInfoJson); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org