This is an automated email from the ASF dual-hosted git repository.

xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3cd2082  [TE] set max detection task window to 7 days (#4030)
3cd2082 is described below

commit 3cd2082505212a4d43c314d8fd4e089a694d40d9
Author: Xiaohui Sun <[email protected]>
AuthorDate: Fri Mar 29 11:32:02 2019 -0700

    [TE] set max detection task window to 7 days (#4030)
---
 .../thirdeye/detection/DetectionPipelineJob.java   | 54 +++++++++++++---------
 1 file changed, 32 insertions(+), 22 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 3df93d8..6349466 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -47,37 +47,23 @@ public class DetectionPipelineJob implements Job {
   private DetectionConfigManager detectionDAO = 
DAORegistry.getInstance().getDetectionConfigManager();
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
   private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
+  private static final long DETECTION_TASK_MAX_LOOKBACK_WINDOW = 
TimeUnit.DAYS.toMillis(7);
 
   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws 
JobExecutionException {
     JobKey jobKey = jobExecutionContext.getJobDetail().getKey();
     Long id = getIdFromJobKey(jobKey.getName());
     DetectionConfigDTO configDTO = detectionDAO.findById(id);
-    DetectionPipelineTaskInfo taskInfo = new 
DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(), 
System.currentTimeMillis());
 
-    // check if a task for this detection pipeline is already scheduled
+    // Make sure start time is not out of DETECTION_TASK_MAX_LOOKBACK_WINDOW
+    long end = System.currentTimeMillis();
+    long start = Math.max(configDTO.getLastTimestamp(), end  - 
DETECTION_TASK_MAX_LOOKBACK_WINDOW);
+    DetectionPipelineTaskInfo taskInfo = new 
DetectionPipelineTaskInfo(configDTO.getId(), start, end);
+
     String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION, 
id);
-    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
-        Predicate.EQ("name", jobName),
-        Predicate.OR(
-            Predicate.EQ("status", 
TaskConstants.TaskStatus.RUNNING.toString()),
-            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
-        )
-      )
-    );
 
-    List<DetectionPipelineTaskInfo> scheduledTaskInfos = 
scheduledTasks.stream().map(taskDTO -> {
-      try {
-        return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), 
DetectionPipelineTaskInfo.class);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }).collect(Collectors.toList());
-    Optional<DetectionPipelineTaskInfo> latestScheduledTask = 
scheduledTaskInfos.stream()
-        .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > 
taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
-    if (latestScheduledTask.isPresent()
-        && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < 
DETECTION_TASK_TIMEOUT) {
-      // if a task is pending and not time out yet, don't schedule more
+    // if a task is pending and not time out yet, don't schedule more
+    if (checkTaskAlreadyRun(jobName, taskInfo)) {
       LOG.info("Skip scheduling detection task for {} with start time {}. Task 
is already in the queue.", jobName,
           taskInfo.getStart());
       return;
@@ -106,6 +92,30 @@ public class DetectionPipelineJob implements Job {
     String id = tokens[tokens.length - 1];
     return Long.valueOf(id);
   }
+
+  private boolean checkTaskAlreadyRun(String jobName, 
DetectionPipelineTaskInfo taskInfo ) {
+    // check if a task for this detection pipeline is already scheduled
+    List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+        Predicate.EQ("name", jobName),
+        Predicate.OR(
+            Predicate.EQ("status", 
TaskConstants.TaskStatus.RUNNING.toString()),
+            Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+        )
+        )
+    );
+
+    List<DetectionPipelineTaskInfo> scheduledTaskInfos = 
scheduledTasks.stream().map(taskDTO -> {
+      try {
+        return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(), 
DetectionPipelineTaskInfo.class);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+    Optional<DetectionPipelineTaskInfo> latestScheduledTask = 
scheduledTaskInfos.stream()
+        .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() > 
taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+    return latestScheduledTask.isPresent()
+        && taskInfo.getEnd() - latestScheduledTask.get().getEnd() < 
DETECTION_TASK_TIMEOUT;
+  }
 }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to