jasonk000 commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195776038


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = 
config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", 
restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", 
tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", 
config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", 
startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : 
ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.

Review Comment:
   Technically this is blocking now, not for long though.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful 
to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually 
modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage 
facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = 
toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = 
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, 
Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
- prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = 
taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), 
oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(
+        "Synced [%d] tasks from storage. Added [%d] tasks, removed [%d] 
tasks.",
+        newTasks.size(), addedTasks.size(), removedTasks.size()
+    );
 
-  public Map<String, Long> getSuccessfulTaskCount()
-  {
-    Map<String, Long> total = 
CollectionUtils.mapValues(totalSuccessfulTaskCount, AtomicLong::get);
-    synchronized (totalSuccessfulTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, 
prevTotalSuccessfulTaskCount);
-      prevTotalSuccessfulTaskCount = total;
-      return delta;
-    }
+    requestManagement();
   }
 
-  public Map<String, Long> getFailedTaskCount()
+  public Map<String, Long> getAndResetSuccessfulTaskCounts()
   {
-    Map<String, Long> total = CollectionUtils.mapValues(totalFailedTaskCount, 
AtomicLong::get);
-    synchronized (totalFailedTaskCount) {
-      Map<String, Long> delta = getDeltaValues(total, 
prevTotalFailedTaskCount);
-      prevTotalFailedTaskCount = total;
-      return delta;
-    }
+    Map<String, Long> total = new HashMap<>(datasourceToSuccessfulTaskCount);
+    datasourceToSuccessfulTaskCount.clear();

Review Comment:
   This and following fns have a small race if the CHM was changed between the 
call to clear. There could be some alternatives, like iterating entries and 
using replaceAll, or remove() or merge() in a loop, that would allow a more 
atomic unload of the CHM? Or, maybe make the value in the map an AtomicLong, 
and CAS modify it to zero during unload?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed

Review Comment:
   Observation - This is an interesting comment, given that there's no 
guarantee the management has run by now. I wonder if it's stale, or is there an 
issue here? ... , since it has same behaviour as previous code.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -652,35 +569,21 @@ private void notifyStatus(final Task task, final 
TaskStatus taskStatus, String r
     catch (Throwable e) {
       // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
       // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
-      log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
+      log.warn(e, "TaskRunner failed to cleanup task [%s] after completion", 
task.getId());
     }
 
-    // Critical section: remove this task from all of our tracking data 
structures.
-    giant.lock();
-    try {
-      if (removeTaskInternal(task.getId())) {

Review Comment:
   re: earlier note about possibly re-introducing #12901.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful 
to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually 
modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage 
facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = 
toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = 
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, 
Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
- prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = 
taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), 
oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);

Review Comment:
   :+1: much neater!



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = 
config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", 
restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", 
tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", 
config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", 
startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : 
ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for 
activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of 
start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), 
complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - 
(TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - 
MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least 
until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks 
later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this 
condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the 
TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related 
to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; 
notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will 
handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; 
notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is 
not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", 
task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to 
run. "
-                                          + "See overlord logs for more 
details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), 
errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also 
let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", 
task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), 
errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = 
Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally 
expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", 
knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   I'm a little wary of reintroducing this bug: 
https://github.com/apache/druid/pull/12901, in the case that `tasks` was 
changed. Can you check over it? It seems like if `tasks` has changed we might 
have an issue. But - you've covered most of them with `synchronized`. Can you 
check in `add()` and `removeTasksInternal`, I think these operate on tasks 
without being synchronized, so it might lead to some race?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -749,131 +651,69 @@ private void handleStatus(final TaskStatus status)
           }
         }
     );
-    return statusFuture;
   }
 
   /**
-   * Resync the contents of this task queue with our storage facility. Useful 
to make sure our in-memory state
-   * corresponds to the storage facility even if the latter is manually 
modified.
+   * Resync the contents of this task queue with our storage facility.
+   * Useful to make sure our in-memory state corresponds to the storage 
facility
+   * even if the latter is manually modified.
+   * <p>
+   * This method must be called only when queue is {@link #active}, except when
+   * starting up.
    */
-  private void syncFromStorage()
+  private synchronized void syncFromStorage()
   {
-    giant.lock();
-
-    try {
-      if (active) {
-        final Map<String, Task> newTasks = 
toTaskIDMap(taskStorage.getActiveTasks());
-        final int tasksSynced = newTasks.size();
-        final Map<String, Task> oldTasks = new HashMap<>(tasks);
-
-        // Calculate differences on IDs instead of Task Objects.
-        Set<String> commonIds = 
Sets.newHashSet(Sets.intersection(newTasks.keySet(), oldTasks.keySet()));
-        for (String taskID : commonIds) {
-          newTasks.remove(taskID);
-          oldTasks.remove(taskID);
-        }
-        Collection<Task> addedTasks = newTasks.values();
-        Collection<Task> removedTasks = oldTasks.values();
-
-        // Clean up removed Tasks
-        for (Task task : removedTasks) {
-          removeTaskInternal(task.getId());
-        }
-
-        // Add newly Added tasks to the queue
-        for (Task task : addedTasks) {
-          addTaskInternal(task);
-        }
-
-        log.info(
-            "Synced %d tasks from storage (%d tasks added, %d tasks removed).",
-            tasksSynced,
-            addedTasks.size(),
-            removedTasks.size()
-        );
-        requestManagement();
-      } else {
-        log.info("Not active. Skipping storage sync.");
-      }
-    }
-    catch (Exception e) {
-      log.warn(e, "Failed to sync tasks from storage!");
-      throw new RuntimeException(e);
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  private static Map<String, Task> toTaskIDMap(List<Task> taskList)
-  {
-    Map<String, Task> rv = new HashMap<>();
-    for (Task task : taskList) {
-      rv.put(task.getId(), task);
-    }
-    return rv;
-  }
-
-  private Map<String, Long> getDeltaValues(Map<String, Long> total, 
Map<String, Long> prev)
-  {
-    return total.entrySet()
-                .stream()
-                .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue() 
- prev.getOrDefault(e.getKey(), 0L)));
-  }
+    final Map<String, Task> newTasks = 
taskStorage.getActiveTasks().stream().collect(
+        Collectors.toMap(Task::getId, Function.identity())
+    );
+    final Map<String, Task> oldTasks = new HashMap<>(tasks);
+
+    // Calculate differences on IDs instead of Task Objects.
+    Set<String> commonIds = Sets.intersection(newTasks.keySet(), 
oldTasks.keySet());
+    for (String taskId : commonIds) {
+      newTasks.remove(taskId);
+      oldTasks.remove(taskId);
+    }
+    Collection<Task> addedTasks = newTasks.values();
+    Collection<Task> removedTasks = oldTasks.values();
+
+    // Add new tasks and clean up removed tasks
+    addedTasks.forEach(this::addTaskInternal);
+    removedTasks.forEach(this::removeTaskInternal);
+    log.info(

Review Comment:
   This existed before, but it might be wise to remove as much as possible, 
including logging, from the `synchronized` block, and tighten it down a bit.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -488,58 +426,56 @@ public boolean add(final Task task) throws 
EntryExistsException
     // Set forceTimeChunkLock before adding task spec to taskStorage, so that 
we can see always consistent task spec.
     task.addToContextIfAbsent(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, 
lockConfig.isForceTimeChunkLock());
     defaultTaskConfig.getContext().forEach(task::addToContextIfAbsent);
-    // Every task shuold use the lineage-based segment allocation protocol 
unless it is explicitly set to
+    // Every task should use the lineage-based segment allocation protocol 
unless it is explicitly set to
     // using the legacy protocol.
     task.addToContextIfAbsent(
         
SinglePhaseParallelIndexTaskRunner.CTX_USE_LINEAGE_BASED_SEGMENT_ALLOCATION_KEY,
         
SinglePhaseParallelIndexTaskRunner.DEFAULT_USE_LINEAGE_BASED_SEGMENT_ALLOCATION
     );
 
-    giant.lock();
-
-    try {
-      Preconditions.checkState(active, "Queue is not active!");
-      Preconditions.checkNotNull(task, "task");
-      Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many 
tasks (max = %,d)", config.getMaxSize());
-
-      // If this throws with any sort of exception, including 
TaskExistsException, we don't want to
-      // insert the task into our queue. So don't catch it.
-      taskStorage.insert(task, TaskStatus.running(task.getId()));
-      addTaskInternal(task);
-      requestManagement();
-      return true;
-    }
-    finally {
-      giant.unlock();
-    }
+    // Do not add the task to queue if insert into metadata fails for any 
reason
+    taskStorage.insert(task, TaskStatus.running(task.getId()));
+    addTaskInternal(task);
+    requestManagement();
+    return true;
   }
 
-  @GuardedBy("giant")
+  /**
+   * Atomically adds this task to the TaskQueue.
+   */
   private void addTaskInternal(final Task task)
   {
-    final Task existingTask = tasks.putIfAbsent(task.getId(), task);
-
-    if (existingTask == null) {
-      taskLockbox.add(task);
-    } else if (!existingTask.equals(task)) {
-      throw new ISE("Cannot add task ID [%s] with same ID as task that has 
already been added", task.getId());
-    }
+    tasks.computeIfAbsent(
+        task.getId(),
+        taskId -> {

Review Comment:
   It seems the implementation here relies on an implicit lock being taken by 
CHM. Might be worth noting given that it is implementations-specific with CHM, 
in case implementation is switched in the future.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = 
config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", 
restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", 
tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", 
config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", 
startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : 
ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for 
activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of 
start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), 
complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - 
(TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - 
MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);

Review Comment:
   I think this can be a set() or even lazySet(), unless I missed something.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -692,7 +595,7 @@ private ListenableFuture<TaskStatus> attachCallbacks(final 
Task task, final List
           @Override
           public void onSuccess(final TaskStatus status)
           {
-            log.info("Received %s status for task: %s", 
status.getStatusCode(), status.getId());
+            log.info("Received status [%s] for task [%s]", 
status.getStatusCode(), status.getId());

Review Comment:
   :+1: 
   



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = 
config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", 
restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", 
tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", 
config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", 
startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : 
ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.
-   *
-   * Other callers (such as notifyStatus) should trigger activity on the
-   * TaskQueue thread by requesting management here.
+   * <p>
+   * Callers (such as notifyStatus) can trigger task management by calling
+   * this method.
    */
-  void requestManagement()
+  private void requestManagement()
   {
-    // use a BlockingQueue since the offer/poll/wait behaviour is simple
-    // and very easy to reason about
-
-    // the request has to be offer (non blocking), since someone might request
-    // while already holding giant lock
-
-    // do not care if the item fits into the queue:
-    // if the queue is already full, request has been triggered anyway
-    managementMayBeNecessary.offer(this);
+    synchronized (managementRequested) {
+      managementRequested.set(true);
+      managementRequested.notify();
+    }
   }
 
   /**
-   * Await for an event to manage.
-   *
-   * This should only be called from the management thread to wait for 
activity.
+   * Waits for a management request to be triggered by another thread.
    *
-   * @param nanos
-   * @throws InterruptedException
+   * @throws InterruptedException if the thread is interrupted while waiting.
    */
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using queue as notification mechanism, result has no value")
-  void awaitManagementNanos(long nanos) throws InterruptedException
+  private void awaitManagementRequest() throws InterruptedException
   {
     // mitigate a busy loop, it can get pretty busy when there are a lot of 
start/stops
     try {
-      Thread.sleep(MIN_WAIT_TIME_MS);
+      Thread.sleep(MIN_WAIT_TIME_MILLIS);
     }
     catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
 
-    // wait for an item, if an item arrives (or is already available), 
complete immediately
-    // (does not actually matter what the item is)
-    managementMayBeNecessary.poll(nanos - 
(TimeUnit.MILLISECONDS.toNanos(MIN_WAIT_TIME_MS)), TimeUnit.NANOSECONDS);
-
-    // there may have been multiple requests, clear them all
-    managementMayBeNecessary.clear();
+    // Wait for management to be requested
+    synchronized (managementRequested) {
+      while (!managementRequested.get()) {
+        managementRequested.wait(MANAGEMENT_WAIT_TIMEOUT_MILLIS - 
MIN_WAIT_TIME_MILLIS);
+      }
+      managementRequested.compareAndSet(true, false);
+    }
   }
 
   /**
    * Main task runner management loop. Meant to run forever, or, at least 
until we're stopped.
    */
-  private void manage() throws InterruptedException
+  private void runTaskManagement() throws InterruptedException
   {
-    log.info("Beginning management in %s.", config.getStartDelay());
-    Thread.sleep(config.getStartDelay().getMillis());
-
     // Ignore return value- we'll get the IDs and futures from getKnownTasks 
later.
     taskRunner.restore();
 
     while (active) {
-      manageInternal();
-
-      // awaitNanos because management may become necessary without this 
condition signalling,
-      // due to e.g. tasks becoming ready when other folks mess with the 
TaskLockbox.
-      awaitManagementNanos(MANAGEMENT_WAIT_TIMEOUT_NANOS);
+      manageTasks();
+      awaitManagementRequest();
     }
   }
 
   @VisibleForTesting
-  void manageInternal()
+  void manageTasks()
   {
-    Set<String> knownTaskIds = new HashSet<>();
-    Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
-
-    giant.lock();
-
-    try {
-      manageInternalCritical(knownTaskIds, runnerTaskFutures);
-    }
-    finally {
-      giant.unlock();
-    }
-
-    manageInternalPostCritical(knownTaskIds, runnerTaskFutures);
+    runReadyTasks();
+    killUnknownTasks();
   }
 
 
   /**
-   * Management loop critical section tasks.
-   *
-   * @param knownTaskIds will be modified - filled with known task IDs
-   * @param runnerTaskFutures will be modified - filled with futures related 
to getting the running tasks
+   * Submits ready tasks to the TaskRunner.
+   * <p>
+   * This method should be called only by the management thread.
    */
-  @GuardedBy("giant")
-  private void manageInternalCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  private synchronized void runReadyTasks()
   {
     // Task futures available from the taskRunner
+    final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = new 
HashMap<>();
     for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
-      if (!recentlyCompletedTasks.contains(workItem.getTaskId())) {
-        // Don't do anything with tasks that have recently finished; 
notifyStatus will handle it.
+      if (!recentlyCompletedTaskIds.contains(workItem.getTaskId())) {
+        // Don't do anything with recently completed tasks; notifyStatus will 
handle it.
         runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
       }
     }
+
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final Task task : ImmutableList.copyOf(tasks.values())) {
-      if (recentlyCompletedTasks.contains(task.getId())) {
-        // Don't do anything with tasks that have recently finished; 
notifyStatus will handle it.
-        continue;
+    for (final String taskId : Lists.newArrayList(activeTaskIdQueue)) {
+      final Task task = tasks.get(taskId);
+      if (task == null || recentlyCompletedTaskIds.contains(taskId)) {
+        // Don't do anything for unknown tasks or recently completed tasks
+      } else if (submittedTaskIds.contains(taskId)) {
+        // Re-trigger execution of pending task to avoid unnecessary delays
+        // see https://github.com/apache/druid/pull/6991
+        if (isTaskPending(task)) {
+          taskRunner.run(task);
+        }
+      } else if (runnerTaskFutures.containsKey(taskId)) {
+        attachCallbacks(task, runnerTaskFutures.get(taskId));
+        submittedTaskIds.add(taskId);
+      } else if (isTaskReady(task)) {
+        log.info("Asking taskRunner to run ready task [%s].", taskId);
+        attachCallbacks(task, taskRunner.run(task));
+        submittedTaskIds.add(taskId);
+      } else {
+        // Release all locks (possibly acquired by task.isReady()) if task is 
not ready
+        taskLockbox.unlockAll(task);
       }
+    }
+  }
 
-      knownTaskIds.add(task.getId());
-
-      if (!taskFutures.containsKey(task.getId())) {
-        final ListenableFuture<TaskStatus> runnerTaskFuture;
-        if (runnerTaskFutures.containsKey(task.getId())) {
-          runnerTaskFuture = runnerTaskFutures.get(task.getId());
-        } else {
-          // Task should be running, so run it.
-          final boolean taskIsReady;
-          try {
-            taskIsReady = task.isReady(taskActionClientFactory.create(task));
-          }
-          catch (Exception e) {
-            log.warn(e, "Exception thrown during isReady for task: %s", 
task.getId());
-            final String errorMessage;
-            if (e instanceof MaxAllowedLocksExceededException) {
-              errorMessage = e.getMessage();
-            } else {
-              errorMessage = "Failed while waiting for the task to be ready to 
run. "
-                                          + "See overlord logs for more 
details.";
-            }
-            notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), 
errorMessage);
-            continue;
-          }
-          if (taskIsReady) {
-            log.info("Asking taskRunner to run: %s", task.getId());
-            runnerTaskFuture = taskRunner.run(task);
-          } else {
-            // Task.isReady() can internally lock intervals or segments.
-            // We should release them if the task is not ready.
-            taskLockbox.unlockAll(task);
-            continue;
-          }
-        }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
-      } else if (isTaskPending(task)) {
-        // if the taskFutures contain this task and this task is pending, also 
let the taskRunner
-        // to run it to guarantee it will be assigned to run
-        // see https://github.com/apache/druid/pull/6991
-        taskRunner.run(task);
+  private boolean isTaskReady(Task task)
+  {
+    try {
+      return task.isReady(taskActionClientFactory.create(task));
+    }
+    catch (Exception e) {
+      log.warn(e, "Error while checking if task [%s] is ready to run.", 
task.getId());
+      final String errorMessage;
+      if (e instanceof MaxAllowedLocksExceededException) {
+        errorMessage = e.getMessage();
+      } else {
+        errorMessage = "Failed while waiting for the task to be ready to run. "
+                       + "See overlord logs for more details.";
       }
+      notifyStatus(task, TaskStatus.failure(task.getId(), errorMessage), 
errorMessage);
+      return false;
     }
   }
 
-  @VisibleForTesting
-  private void manageInternalPostCritical(
-      final Set<String> knownTaskIds,
-      final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures
-  )
+  /**
+   * Kills tasks not present in the set of known tasks.
+   */
+  private void killUnknownTasks()
   {
-    // Kill tasks that shouldn't be running
-    final Set<String> tasksToKill = 
Sets.difference(runnerTaskFutures.keySet(), knownTaskIds);
-    if (!tasksToKill.isEmpty()) {
-      log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
-
-      // On large installations running several thousands of tasks,
-      // concatenating the list of known task ids can be compupationally 
expensive.
-      final boolean logKnownTaskIds = log.isDebugEnabled();
-      final String reason = logKnownTaskIds
-              ? StringUtils.format("Task is not in knownTaskIds[%s]", 
knownTaskIds)
-              : "Task is not in knownTaskIds";
-
-      for (final String taskId : tasksToKill) {
-        try {
-          taskRunner.shutdown(taskId, reason);
-        }
-        catch (Exception e) {
-          log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
-        }
+    final Set<String> knownTaskIds = tasks.keySet();

Review Comment:
   Nb: earlier PR https://github.com/apache/druid/pull/12099 is maybe relevant 
here too for background/history.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to