This is an automated email from the ASF dual-hosted git repository.
jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fbc28d8 [TE] Add timeout for all tasks and clean up RUNNING states if
timeout (#4554)
fbc28d8 is described below
commit fbc28d8aec1f1b1fb4a7573809914ddf582fd8e2
Author: Vincent Chen <[email protected]>
AuthorDate: Fri Aug 30 11:41:00 2019 -0700
[TE] Add timeout for all tasks and clean up RUNNING states if timeout
(#4554)
Add timeout for all tasks and clean up RUNNING states if timeout
---
.../anomaly/monitor/MonitorTaskRunner.java | 15 +++++--
.../pinot/thirdeye/anomaly/task/TaskDriver.java | 47 +++++++++++++++++-----
.../anomaly/task/TaskDriverConfiguration.java | 9 +++++
.../pinot/thirdeye/datalayer/bao/TaskManager.java | 2 +
.../datalayer/bao/jdbc/TaskManagerImpl.java | 7 ++++
5 files changed, 68 insertions(+), 12 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
index b774d82..3b0b3cb 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class MonitorTaskRunner implements TaskRunner {
private static final Logger LOG =
LoggerFactory.getLogger(MonitorJobRunner.class);
- private static final long MAX_TASK_TIME = TimeUnit.MINUTES.toMillis(30);
+ private static final long MAX_TASK_TIME = TimeUnit.HOURS.toMillis(6);
private DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
@@ -64,9 +64,19 @@ public class MonitorTaskRunner implements TaskRunner {
private void executeMonitorUpdate(MonitorTaskInfo monitorTaskInfo) {
LOG.info("Execute monitor update {}", monitorTaskInfo);
+ int jobRetentionDays = monitorTaskInfo.getDefaultRetentionDays();
try {
+ // Mark expired tasks with RUNNING states as TIMEOUT
+ List<TaskDTO> timeoutTasks =
DAO_REGISTRY.getTaskDAO().findTimeoutTasksWithinDays(jobRetentionDays,
MAX_TASK_TIME);
+ if (!timeoutTasks.isEmpty()) {
+ for (TaskDTO task : timeoutTasks) {
+ DAO_REGISTRY.getTaskDAO().updateStatusAndTaskEndTime(task.getId(),
TaskStatus.RUNNING, TaskStatus.TIMEOUT,
+ System.currentTimeMillis(), "TIMEOUT status updated by
MonitorTaskRunner");
+ }
+ LOG.warn("TIMEOUT tasks {}", timeoutTasks);
+ }
+
// Find all jobs in SCHEDULED status
- int jobRetentionDays = monitorTaskInfo.getDefaultRetentionDays();
Map<Long, JobDTO> scheduledJobs =
findScheduledJobsWithinDays(jobRetentionDays);
// Remove SCHEDULED jobs that has WAITING tasks
@@ -199,7 +209,6 @@ public class MonitorTaskRunner implements TaskRunner {
private Set<Long> findTimeoutJobsWithinDays(int days) {
Set<Long> timeoutJobs = new HashSet<>();
List<TaskDTO> timeoutTasks =
DAO_REGISTRY.getTaskDAO().findByStatusWithinDays(TaskStatus.TIMEOUT, days);
-
timeoutTasks.addAll(DAO_REGISTRY.getTaskDAO().findTimeoutTasksWithinDays(days,
MAX_TASK_TIME));
for (TaskDTO task : timeoutTasks) {
timeoutJobs.add(task.getJobId());
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index 59e9f4a..1a2571d 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -20,6 +20,9 @@
package org.apache.pinot.thirdeye.anomaly.task;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import
org.apache.pinot.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
@@ -56,6 +59,7 @@ public class TaskDriver {
private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
private ExecutorService taskExecutorService;
+ private ExecutorService taskWatcherExecutorService;
private final TaskManager taskDAO;
private TaskContext taskContext;
@@ -74,6 +78,9 @@ public class TaskDriver {
taskExecutorService = Executors.newFixedThreadPool(
driverConfiguration.getMaxParallelTasks(),
new
ThreadFactoryBuilder().setNameFormat("task-executor-%d").build());
+ taskWatcherExecutorService = Executors.newFixedThreadPool(
+ driverConfiguration.getMaxParallelTasks(),
+ new
ThreadFactoryBuilder().setNameFormat("task-watcher-%d").setDaemon(true).build());
taskContext = new TaskContext();
taskContext.setAnomalyFunctionFactory(anomalyFunctionFactory);
taskContext.setThirdEyeAnomalyConfiguration(thirdEyeAnomalyConfiguration);
@@ -84,9 +91,19 @@ public class TaskDriver {
}
public void start() throws Exception {
+ // Mark all assigned tasks with RUNNING as FAILED
+ List<TaskDTO> leftoverTasks =
DAO_REGISTRY.getTaskDAO().findByStatusAndWorkerId(workerId, TaskStatus.RUNNING);
+ if (!leftoverTasks.isEmpty()) {
+ LOG.info("Found {} RUNNING tasks with worker id {} at start",
leftoverTasks.size(), workerId);
+ for (TaskDTO task : leftoverTasks) {
+ LOG.info("Update task {} from RUNNING to FAILED", task.getId());
+ DAO_REGISTRY.getTaskDAO().updateStatusAndTaskEndTime(task.getId(),
TaskStatus.RUNNING, TaskStatus.FAILED,
+ System.currentTimeMillis(), "FAILED status updated by the worker
at start");
+ }
+ }
for (int i = 0; i < driverConfiguration.getMaxParallelTasks(); i++) {
- Callable callable = new Callable() {
- @Override public Object call() throws Exception {
+ Runnable runnable = new Runnable() {
+ @Override public void run() {
while (!shutdown) {
LOG.info("Finding next task to execute");
@@ -98,21 +115,33 @@ public class TaskDriver {
ThirdeyeMetricsUtil.taskCounter.inc();
try {
- LOG.info("Executing task: {} {}", anomalyTaskSpec.getJobName(),
- anomalyTaskSpec.getTaskInfo());
+ LOG.info("Executing task: {} {}",
anomalyTaskSpec.getJobName(), anomalyTaskSpec.getTaskInfo());
// execute the selected task
TaskType taskType = anomalyTaskSpec.getTaskType();
TaskRunner taskRunner =
TaskRunnerFactory.getTaskRunnerFromTaskType(taskType);
TaskInfo taskInfo =
TaskInfoFactory.getTaskInfoFromTaskType(taskType,
anomalyTaskSpec.getTaskInfo());
-
updateTaskStartTime(anomalyTaskSpec.getId());
- List<TaskResult> taskResults = taskRunner.execute(taskInfo,
taskContext);
+ Future<List<TaskResult>> future =
taskExecutorService.submit(new Callable<List<TaskResult>>() {
+ @Override
+ public List<TaskResult> call() throws Exception {
+ return taskRunner.execute(taskInfo, taskContext);
+ }
+ });
+ try {
+ List<TaskResult> taskResults =
future.get(driverConfiguration.getMaxTaskRunTimeMillis(),
TimeUnit.MILLISECONDS);
+ } catch (TimeoutException e) {
+ LOG.error("Timeout on executing task", e);
+ future.cancel(true);
+ LOG.info("Executor thread gets cancelled successfully: {}",
future.isCancelled());
+ updateStatusAndTaskEndTime(anomalyTaskSpec.getId(),
+ TaskStatus.RUNNING, TaskStatus.TIMEOUT, e.getMessage());
+ continue;
+ }
LOG.info("DONE Executing task: {}", anomalyTaskSpec.getId());
// update status to COMPLETED
updateStatusAndTaskEndTime(anomalyTaskSpec.getId(),
TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
ThirdeyeMetricsUtil.taskSuccessCounter.inc();
-
} catch (Exception e) {
ThirdeyeMetricsUtil.taskExceptionCounter.inc();
LOG.error("Exception in electing and executing task", e);
@@ -133,10 +162,9 @@ public class TaskDriver {
}
}
LOG.info("Thread safely quiting");
- return 0;
}
};
- taskExecutorService.submit(callable);
+ taskWatcherExecutorService.submit(runnable);
LOG.info("Starting task driver");
}
}
@@ -144,6 +172,7 @@ public class TaskDriver {
public void shutdown() {
shutdown = true;
AnomalyUtils.safelyShutdownExecutionService(taskExecutorService,
this.getClass());
+ AnomalyUtils.safelyShutdownExecutionService(taskWatcherExecutorService,
this.getClass());
}
/**
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
index 9c6a03f..2c63192 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
@@ -25,6 +25,7 @@ public class TaskDriverConfiguration {
private int randomDelayCapInMillis = 15_000; // 15 seconds
private int taskFetchSizeCap = 50;
private int maxParallelTasks = 5;
+ private long maxTaskRunTimeMillis = 6 * 60 * 60_000; // 6 hours
public int getNoTaskDelayInMillis() {
return noTaskDelayInMillis;
@@ -65,4 +66,12 @@ public class TaskDriverConfiguration {
public void setMaxParallelTasks(int maxParallelTasks) {
this.maxParallelTasks = maxParallelTasks;
}
+
+ public long getMaxTaskRunTimeMillis() {
+ return maxTaskRunTimeMillis;
+ }
+
+ public void setMaxTaskRunTimeMillis(long maxTaskRunTimeMillis) {
+ this.maxTaskRunTimeMillis = maxTaskRunTimeMillis;
+ }
}
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
index ce22cb4..499cb78 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
@@ -37,6 +37,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
List<TaskDTO> findByStatusOrderByCreateTime(TaskStatus status, int
fetchSize, boolean asc);
+ List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus status);
+
boolean updateStatusAndWorkerId(Long workerId, Long id, Set<TaskStatus>
allowedOldStatus,
TaskStatus newStatus, int expectedVersion);
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 4154c79..d41bf80 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -174,6 +174,13 @@ public class TaskManagerImpl extends
AbstractManagerImpl<TaskDTO> implements Tas
}
@Override
+ public List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus
status) {
+ Predicate statusPredicate = Predicate.EQ("status", status.toString());
+ Predicate workerIdPredicate = Predicate.EQ("workerId", workerId);
+ return findByPredicate(Predicate.AND(statusPredicate, workerIdPredicate));
+ }
+
+ @Override
public int countWaiting() {
// NOTE: this aggregation should be supported by genericPojoDAO directly
// ensure each resource is closed at the end of the statement
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]