This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ea2232  [TE] Skip scheduling detection task if one is already in the 
queue (#3660)
0ea2232 is described below

commit 0ea2232cdc6c5a2d8d8ce9001c64f80459cf7712
Author: Jihao Zhang <[email protected]>
AuthorDate: Mon Jan 7 13:42:45 2019 -0800

    [TE] Skip scheduling detection task if one is already in the queue (#3660)
    
    This PR changes the detection scheduler to skip scheduling more detection 
task if one is already in the queue to avoid system overload.
---
 .../thirdeye/detection/DetectionPipelineJob.java   | 26 +++++++++++++++++++++-
 1 file changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452..e232b72 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@ import 
com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
 import com.linkedin.thirdeye.datalayer.bao.TaskManager;
 import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
 import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
 import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import org.quartz.Job;
 import org.quartz.JobExecutionContext;
 import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@ public class DetectionPipelineJob implements Job {
   private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
   private DetectionConfigManager detectionDAO = 
DAORegistry.getInstance().getDetectionConfigManager();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
 
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
@@ -45,6 +50,25 @@ public class DetectionPipelineJob implements Job {
     DetectionConfigDTO configDTO = detectionDAO.findById(id);
     DetectionPipelineTaskInfo taskInfo = new 
DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), 
System.currentTimeMillis());
 
+    // check if a task for this detection pipeline is already scheduled
+    String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, 
id);
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.EQ("startTime", taskInfo.getStart()),
+        Predicate.OR(
+            Predicate.EQ("status", 
TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        )
+      )
+    );
+
+    Optional<TaskDTO> latestScheduledTask = 
scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() > 
task2.getEndTime() ? task1 : task2);
+    if (latestScheduledTask.isPresent() && taskInfo.getEnd() - 
latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+      // if a task is pending and not time out yet, don't schedule more
+      LOG.info("Skip scheduling detection task for {} with start time {}. Task 
is already in the queue.", jobName, taskInfo.getStart());
+      return;
+    }
+
     String taskInfoJson = null;
     try {
       taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public class DetectionPipelineJob implements Job {
 
     TaskDTO taskDTO = new TaskDTO();
     taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
-    taskDTO.setJobName(String.format("%s_%d", 
TaskConstants.TaskType.DETECTION, id));
+    taskDTO.setJobName(jobName);
     taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
     taskDTO.setTaskInfo(taskInfoJson);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to