This is an automated email from the ASF dual-hosted git repository.
xhsun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3cd2082 [TE] set max detection task window to 7 days (#4030)
3cd2082 is described below
commit 3cd2082505212a4d43c314d8fd4e089a694d40d9
Author: Xiaohui Sun <[email protected]>
AuthorDate: Fri Mar 29 11:32:02 2019 -0700
[TE] set max detection task window to 7 days (#4030)
---
.../thirdeye/detection/DetectionPipelineJob.java | 54 +++++++++++++---------
1 file changed, 32 insertions(+), 22 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
index 3df93d8..6349466 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/detection/DetectionPipelineJob.java
@@ -47,37 +47,23 @@ public class DetectionPipelineJob implements Job {
private DetectionConfigManager detectionDAO =
DAORegistry.getInstance().getDetectionConfigManager();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
+ private static final long DETECTION_TASK_MAX_LOOKBACK_WINDOW =
TimeUnit.DAYS.toMillis(7);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
JobKey jobKey = jobExecutionContext.getJobDetail().getKey();
Long id = getIdFromJobKey(jobKey.getName());
DetectionConfigDTO configDTO = detectionDAO.findById(id);
- DetectionPipelineTaskInfo taskInfo = new
DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(),
System.currentTimeMillis());
- // check if a task for this detection pipeline is already scheduled
+ // Make sure start time is not out of DETECTION_TASK_MAX_LOOKBACK_WINDOW
+ long end = System.currentTimeMillis();
+ long start = Math.max(configDTO.getLastTimestamp(), end -
DETECTION_TASK_MAX_LOOKBACK_WINDOW);
+ DetectionPipelineTaskInfo taskInfo = new
DetectionPipelineTaskInfo(configDTO.getId(), start, end);
+
String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION,
id);
- List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
- Predicate.EQ("name", jobName),
- Predicate.OR(
- Predicate.EQ("status",
TaskConstants.TaskStatus.RUNNING.toString()),
- Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
- )
- )
- );
- List<DetectionPipelineTaskInfo> scheduledTaskInfos =
scheduledTasks.stream().map(taskDTO -> {
- try {
- return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(),
DetectionPipelineTaskInfo.class);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }).collect(Collectors.toList());
- Optional<DetectionPipelineTaskInfo> latestScheduledTask =
scheduledTaskInfos.stream()
- .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() >
taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
- if (latestScheduledTask.isPresent()
- && taskInfo.getEnd() - latestScheduledTask.get().getEnd() <
DETECTION_TASK_TIMEOUT) {
- // if a task is pending and not time out yet, don't schedule more
+ // if a task is pending and not time out yet, don't schedule more
+ if (checkTaskAlreadyRun(jobName, taskInfo)) {
LOG.info("Skip scheduling detection task for {} with start time {}. Task
is already in the queue.", jobName,
taskInfo.getStart());
return;
@@ -106,6 +92,30 @@ public class DetectionPipelineJob implements Job {
String id = tokens[tokens.length - 1];
return Long.valueOf(id);
}
+
+ private boolean checkTaskAlreadyRun(String jobName,
DetectionPipelineTaskInfo taskInfo ) {
+ // check if a task for this detection pipeline is already scheduled
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.OR(
+ Predicate.EQ("status",
TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ )
+ )
+ );
+
+ List<DetectionPipelineTaskInfo> scheduledTaskInfos =
scheduledTasks.stream().map(taskDTO -> {
+ try {
+ return OBJECT_MAPPER.readValue(taskDTO.getTaskInfo(),
DetectionPipelineTaskInfo.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ Optional<DetectionPipelineTaskInfo> latestScheduledTask =
scheduledTaskInfos.stream()
+ .reduce((taskInfo1, taskInfo2) -> taskInfo1.getEnd() >
taskInfo2.getEnd() ? taskInfo1 : taskInfo2);
+ return latestScheduledTask.isPresent()
+ && taskInfo.getEnd() - latestScheduledTask.get().getEnd() <
DETECTION_TASK_TIMEOUT;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]