This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0ea2232 [TE] Skip scheduling detection task if one is already in the
queue (#3660)
0ea2232 is described below
commit 0ea2232cdc6c5a2d8d8ce9001c64f80459cf7712
Author: Jihao Zhang <[email protected]>
AuthorDate: Mon Jan 7 13:42:45 2019 -0800
[TE] Skip scheduling detection task if one is already in the queue (#3660)
This PR changes the detection scheduler to skip scheduling more detection
task if one is already in the queue to avoid system overload.
---
.../thirdeye/detection/DetectionPipelineJob.java | 26 +++++++++++++++++++++-
1 file changed, 25 insertions(+), 1 deletion(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452..e232b72 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@ import
com.linkedin.thirdeye.datalayer.bao.DetectionConfigManager;
import com.linkedin.thirdeye.datalayer.bao.TaskManager;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@ public class DetectionPipelineJob implements Job {
private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
private DetectionConfigManager detectionDAO =
DAORegistry.getInstance().getDetectionConfigManager();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
@@ -45,6 +50,25 @@ public class DetectionPipelineJob implements Job {
DetectionConfigDTO configDTO = detectionDAO.findById(id);
DetectionPipelineTaskInfo taskInfo = new
DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(),
System.currentTimeMillis());
+ // check if a task for this detection pipeline is already scheduled
+ String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION,
id);
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.EQ("startTime", taskInfo.getStart()),
+ Predicate.OR(
+ Predicate.EQ("status",
TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ )
+ )
+ );
+
+ Optional<TaskDTO> latestScheduledTask =
scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() >
task2.getEndTime() ? task1 : task2);
+ if (latestScheduledTask.isPresent() && taskInfo.getEnd() -
latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+ // if a task is pending and not time out yet, don't schedule more
+ LOG.info("Skip scheduling detection task for {} with start time {}. Task
is already in the queue.", jobName, taskInfo.getStart());
+ return;
+ }
+
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public class DetectionPipelineJob implements Job {
TaskDTO taskDTO = new TaskDTO();
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
- taskDTO.setJobName(String.format("%s_%d",
TaskConstants.TaskType.DETECTION, id));
+ taskDTO.setJobName(jobName);
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]