This is an automated email from the ASF dual-hosted git repository.

jihao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fbc28d8  [TE] Add timeout for all tasks and clean up RUNNING states if 
timeout (#4554)
fbc28d8 is described below

commit fbc28d8aec1f1b1fb4a7573809914ddf582fd8e2
Author: Vincent Chen <[email protected]>
AuthorDate: Fri Aug 30 11:41:00 2019 -0700

    [TE] Add timeout for all tasks and clean up RUNNING states if timeout 
(#4554)
    
    Add timeout for all tasks and clean up RUNNING states if timeout
---
 .../anomaly/monitor/MonitorTaskRunner.java         | 15 +++++--
 .../pinot/thirdeye/anomaly/task/TaskDriver.java    | 47 +++++++++++++++++-----
 .../anomaly/task/TaskDriverConfiguration.java      |  9 +++++
 .../pinot/thirdeye/datalayer/bao/TaskManager.java  |  2 +
 .../datalayer/bao/jdbc/TaskManagerImpl.java        |  7 ++++
 5 files changed, 68 insertions(+), 12 deletions(-)

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
index b774d82..3b0b3cb 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/monitor/MonitorTaskRunner.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
 public class MonitorTaskRunner implements TaskRunner {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MonitorJobRunner.class);
-  private static final long MAX_TASK_TIME = TimeUnit.MINUTES.toMillis(30);
+  private static final long MAX_TASK_TIME = TimeUnit.HOURS.toMillis(6);
 
   private DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
 
@@ -64,9 +64,19 @@ public class MonitorTaskRunner implements TaskRunner {
 
   private void executeMonitorUpdate(MonitorTaskInfo monitorTaskInfo) {
     LOG.info("Execute monitor update {}", monitorTaskInfo);
+    int jobRetentionDays = monitorTaskInfo.getDefaultRetentionDays();
     try {
+      // Mark expired tasks with RUNNING states as TIMEOUT
+      List<TaskDTO> timeoutTasks = 
DAO_REGISTRY.getTaskDAO().findTimeoutTasksWithinDays(jobRetentionDays, 
MAX_TASK_TIME);
+      if (!timeoutTasks.isEmpty()) {
+        for (TaskDTO task : timeoutTasks) {
+          DAO_REGISTRY.getTaskDAO().updateStatusAndTaskEndTime(task.getId(), 
TaskStatus.RUNNING, TaskStatus.TIMEOUT,
+              System.currentTimeMillis(), "TIMEOUT status updated by 
MonitorTaskRunner");
+        }
+        LOG.warn("TIMEOUT tasks {}", timeoutTasks);
+      }
+
       // Find all jobs in SCHEDULED status
-      int jobRetentionDays = monitorTaskInfo.getDefaultRetentionDays();
       Map<Long, JobDTO> scheduledJobs = 
findScheduledJobsWithinDays(jobRetentionDays);
 
       // Remove SCHEDULED jobs that has WAITING tasks
@@ -199,7 +209,6 @@ public class MonitorTaskRunner implements TaskRunner {
   private Set<Long> findTimeoutJobsWithinDays(int days) {
     Set<Long> timeoutJobs = new HashSet<>();
     List<TaskDTO> timeoutTasks = 
DAO_REGISTRY.getTaskDAO().findByStatusWithinDays(TaskStatus.TIMEOUT, days);
-    
timeoutTasks.addAll(DAO_REGISTRY.getTaskDAO().findTimeoutTasksWithinDays(days, 
MAX_TASK_TIME));
     for (TaskDTO task : timeoutTasks) {
       timeoutJobs.add(task.getJobId());
     }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
index 59e9f4a..1a2571d 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriver.java
@@ -20,6 +20,9 @@
 package org.apache.pinot.thirdeye.anomaly.task;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import 
org.apache.pinot.thirdeye.anomaly.classification.classifier.AnomalyClassifierFactory;
 import org.apache.pinot.thirdeye.anomaly.utils.AnomalyUtils;
 import org.apache.pinot.thirdeye.anomaly.utils.ThirdeyeMetricsUtil;
@@ -56,6 +59,7 @@ public class TaskDriver {
   private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
 
   private ExecutorService taskExecutorService;
+  private ExecutorService taskWatcherExecutorService;
 
   private final TaskManager taskDAO;
   private TaskContext taskContext;
@@ -74,6 +78,9 @@ public class TaskDriver {
     taskExecutorService = Executors.newFixedThreadPool(
             driverConfiguration.getMaxParallelTasks(),
             new 
ThreadFactoryBuilder().setNameFormat("task-executor-%d").build());
+    taskWatcherExecutorService = Executors.newFixedThreadPool(
+            driverConfiguration.getMaxParallelTasks(),
+            new 
ThreadFactoryBuilder().setNameFormat("task-watcher-%d").setDaemon(true).build());
     taskContext = new TaskContext();
     taskContext.setAnomalyFunctionFactory(anomalyFunctionFactory);
     taskContext.setThirdEyeAnomalyConfiguration(thirdEyeAnomalyConfiguration);
@@ -84,9 +91,19 @@ public class TaskDriver {
   }
 
   public void start() throws Exception {
+    // Mark all assigned tasks with RUNNING as FAILED
+    List<TaskDTO> leftoverTasks = 
DAO_REGISTRY.getTaskDAO().findByStatusAndWorkerId(workerId, TaskStatus.RUNNING);
+    if (!leftoverTasks.isEmpty()) {
+      LOG.info("Found {} RUNNING tasks with worker id {} at start", 
leftoverTasks.size(), workerId);
+      for (TaskDTO task : leftoverTasks) {
+        LOG.info("Update task {} from RUNNING to FAILED", task.getId());
+        DAO_REGISTRY.getTaskDAO().updateStatusAndTaskEndTime(task.getId(), 
TaskStatus.RUNNING, TaskStatus.FAILED,
+            System.currentTimeMillis(), "FAILED status updated by the worker 
at start");
+      }
+    }
     for (int i = 0; i < driverConfiguration.getMaxParallelTasks(); i++) {
-      Callable callable = new Callable() {
-        @Override public Object call() throws Exception {
+      Runnable runnable = new Runnable() {
+        @Override public void run() {
           while (!shutdown) {
             LOG.info("Finding next task to execute");
 
@@ -98,21 +115,33 @@ public class TaskDriver {
               ThirdeyeMetricsUtil.taskCounter.inc();
 
               try {
-                LOG.info("Executing task: {} {}", anomalyTaskSpec.getJobName(),
-                    anomalyTaskSpec.getTaskInfo());
+                LOG.info("Executing task: {} {}", 
anomalyTaskSpec.getJobName(), anomalyTaskSpec.getTaskInfo());
 
                 // execute the selected task
                 TaskType taskType = anomalyTaskSpec.getTaskType();
                 TaskRunner taskRunner = 
TaskRunnerFactory.getTaskRunnerFromTaskType(taskType);
                 TaskInfo taskInfo = 
TaskInfoFactory.getTaskInfoFromTaskType(taskType, 
anomalyTaskSpec.getTaskInfo());
-
                 updateTaskStartTime(anomalyTaskSpec.getId());
-                List<TaskResult> taskResults = taskRunner.execute(taskInfo, 
taskContext);
+                Future<List<TaskResult>> future = 
taskExecutorService.submit(new Callable<List<TaskResult>>() {
+                  @Override
+                  public List<TaskResult> call() throws Exception {
+                    return taskRunner.execute(taskInfo, taskContext);
+                  }
+                });
+                try {
+                  List<TaskResult> taskResults = 
future.get(driverConfiguration.getMaxTaskRunTimeMillis(), 
TimeUnit.MILLISECONDS);
+                } catch (TimeoutException e) {
+                  LOG.error("Timeout on executing task", e);
+                  future.cancel(true);
+                  LOG.info("Executor thread gets cancelled successfully: {}", 
future.isCancelled());
+                  updateStatusAndTaskEndTime(anomalyTaskSpec.getId(),
+                      TaskStatus.RUNNING, TaskStatus.TIMEOUT, e.getMessage());
+                  continue;
+                }
                 LOG.info("DONE Executing task: {}", anomalyTaskSpec.getId());
                 // update status to COMPLETED
                 updateStatusAndTaskEndTime(anomalyTaskSpec.getId(), 
TaskStatus.RUNNING, TaskStatus.COMPLETED, "");
                 ThirdeyeMetricsUtil.taskSuccessCounter.inc();
-
               } catch (Exception e) {
                 ThirdeyeMetricsUtil.taskExceptionCounter.inc();
                 LOG.error("Exception in electing and executing task", e);
@@ -133,10 +162,9 @@ public class TaskDriver {
             }
           }
           LOG.info("Thread safely quiting");
-          return 0;
         }
       };
-      taskExecutorService.submit(callable);
+      taskWatcherExecutorService.submit(runnable);
       LOG.info("Starting task driver");
     }
   }
@@ -144,6 +172,7 @@ public class TaskDriver {
   public void shutdown() {
     shutdown = true;
     AnomalyUtils.safelyShutdownExecutionService(taskExecutorService, 
this.getClass());
+    AnomalyUtils.safelyShutdownExecutionService(taskWatcherExecutorService, 
this.getClass());
   }
 
   /**
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
index 9c6a03f..2c63192 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/anomaly/task/TaskDriverConfiguration.java
@@ -25,6 +25,7 @@ public class TaskDriverConfiguration {
   private int randomDelayCapInMillis = 15_000; // 15 seconds
   private int taskFetchSizeCap = 50;
   private int maxParallelTasks = 5;
+  private long maxTaskRunTimeMillis = 6 * 60 * 60_000; // 6 hours
 
   public int getNoTaskDelayInMillis() {
     return noTaskDelayInMillis;
@@ -65,4 +66,12 @@ public class TaskDriverConfiguration {
   public void setMaxParallelTasks(int maxParallelTasks) {
     this.maxParallelTasks = maxParallelTasks;
   }
+
+  public long getMaxTaskRunTimeMillis() {
+    return maxTaskRunTimeMillis;
+  }
+
+  public void setMaxTaskRunTimeMillis(long maxTaskRunTimeMillis) {
+    this.maxTaskRunTimeMillis = maxTaskRunTimeMillis;
+  }
 }
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
index ce22cb4..499cb78 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/TaskManager.java
@@ -37,6 +37,8 @@ public interface TaskManager extends AbstractManager<TaskDTO>{
 
   List<TaskDTO> findByStatusOrderByCreateTime(TaskStatus status, int 
fetchSize, boolean asc);
 
+  List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus status);
+
   boolean updateStatusAndWorkerId(Long workerId, Long id, Set<TaskStatus> 
allowedOldStatus,
       TaskStatus newStatus, int expectedVersion);
 
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
index 4154c79..d41bf80 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datalayer/bao/jdbc/TaskManagerImpl.java
@@ -174,6 +174,13 @@ public class TaskManagerImpl extends 
AbstractManagerImpl<TaskDTO> implements Tas
   }
 
   @Override
+  public List<TaskDTO> findByStatusAndWorkerId(Long workerId, TaskStatus 
status) {
+    Predicate statusPredicate = Predicate.EQ("status", status.toString());
+    Predicate workerIdPredicate = Predicate.EQ("workerId", workerId);
+    return findByPredicate(Predicate.AND(statusPredicate, workerIdPredicate));
+  }
+
+  @Override
   public int countWaiting() {
     // NOTE: this aggregation should be supported by genericPojoDAO directly
     // ensure each resource is closed at the end of the statement


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to