jihaozh closed pull request #3660: [TE] Skip scheduling detection task if one
is already in the queue
URL: https://github.com/apache/incubator-pinot/pull/3660
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
index 65b7452a55..e232b72a54 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/detection/DetectionPipelineJob.java
@@ -23,7 +23,11 @@
import com.linkedin.thirdeye.datalayer.bao.TaskManager;
import com.linkedin.thirdeye.datalayer.dto.DetectionConfigDTO;
import com.linkedin.thirdeye.datalayer.dto.TaskDTO;
+import com.linkedin.thirdeye.datalayer.util.Predicate;
import com.linkedin.thirdeye.datasource.DAORegistry;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@@ -37,6 +41,7 @@
private TaskManager taskDAO = DAORegistry.getInstance().getTaskDAO();
private DetectionConfigManager detectionDAO =
DAORegistry.getInstance().getDetectionConfigManager();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long DETECTION_TASK_TIMEOUT = TimeUnit.DAYS.toMillis(1);
@Override
public void execute(JobExecutionContext jobExecutionContext) throws
JobExecutionException {
@@ -45,6 +50,25 @@ public void execute(JobExecutionContext jobExecutionContext)
throws JobExecution
DetectionConfigDTO configDTO = detectionDAO.findById(id);
DetectionPipelineTaskInfo taskInfo = new
DetectionPipelineTaskInfo(configDTO.getId(), configDTO.getLastTimestamp(),
System.currentTimeMillis());
+ // check if a task for this detection pipeline is already scheduled
+ String jobName = String.format("%s_%d", TaskConstants.TaskType.DETECTION,
id);
+ List<TaskDTO> scheduledTasks = taskDAO.findByPredicate(Predicate.AND(
+ Predicate.EQ("name", jobName),
+ Predicate.EQ("startTime", taskInfo.getStart()),
+ Predicate.OR(
+ Predicate.EQ("status",
TaskConstants.TaskStatus.RUNNING.toString()),
+ Predicate.EQ("status", TaskConstants.TaskStatus.WAITING.toString())
+ )
+ )
+ );
+
+ Optional<TaskDTO> latestScheduledTask =
scheduledTasks.stream().reduce((task1, task2) -> task1.getEndTime() >
task2.getEndTime() ? task1 : task2);
+ if (latestScheduledTask.isPresent() && taskInfo.getEnd() -
latestScheduledTask.get().getEndTime() < DETECTION_TASK_TIMEOUT){
+ // if a task is pending and not time out yet, don't schedule more
+ LOG.info("Skip scheduling detection task for {} with start time {}. Task
is already in the queue.", jobName, taskInfo.getStart());
+ return;
+ }
+
String taskInfoJson = null;
try {
taskInfoJson = OBJECT_MAPPER.writeValueAsString(taskInfo);
@@ -54,7 +78,7 @@ public void execute(JobExecutionContext jobExecutionContext)
throws JobExecution
TaskDTO taskDTO = new TaskDTO();
taskDTO.setTaskType(TaskConstants.TaskType.DETECTION);
- taskDTO.setJobName(String.format("%s_%d",
TaskConstants.TaskType.DETECTION, id));
+ taskDTO.setJobName(jobName);
taskDTO.setStatus(TaskConstants.TaskStatus.WAITING);
taskDTO.setTaskInfo(taskInfoJson);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]