kfaraz commented on code in PR #14293:
URL: https://github.com/apache/druid/pull/14293#discussion_r1195890450


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -156,314 +153,248 @@ public TaskQueue(
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
   }
 
-  @VisibleForTesting
-  void setActive(boolean active)
-  {
-    this.active = active;
-  }
-
   /**
    * Starts this task queue. Allows {@link #add(Task)} to accept new tasks.
    */
   @LifecycleStart
-  public void start()
+  public synchronized void start()
   {
-    giant.lock();
-
-    try {
-      Preconditions.checkState(!active, "queue must be stopped");
-      active = true;
-      syncFromStorage();
-      // Mark these tasks as failed as they could not reacuire the lock
-      // Clean up needs to happen after tasks have been synced from storage
-      Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
-      for (Task task : tasksToFail) {
-        shutdown(task.getId(),
-                 "Shutting down forcefully as task failed to reacquire lock 
while becoming leader");
+    Preconditions.checkState(!active, "queue must be stopped");
+
+    // Mark queue as active only after first sync is complete
+    syncFromStorage();
+    active = true;
+
+    // Mark these tasks as failed as they could not reacquire locks
+    Set<Task> tasksToFail = taskLockbox.syncFromStorage().getTasksToFail();
+    for (Task task : tasksToFail) {
+      shutdown(
+          task.getId(),
+          "Shutting down forcefully as task failed to reacquire lock while 
becoming leader"
+      );
+    }
+    requestManagement();
+    // Remove any unacquired locks from storage (shutdown only clears entries 
for which a TaskLockPosse was acquired)
+    // This is called after requesting management as locks need to be cleared 
after notifyStatus is processed
+    for (Task task : tasksToFail) {
+      for (TaskLock lock : taskStorage.getLocks(task.getId())) {
+        taskStorage.removeLock(task.getId(), lock);
       }
-      managerExec.submit(
-          new Runnable()
-          {
-            @Override
-            public void run()
-            {
-              while (true) {
-                try {
-                  manage();
-                  break;
-                }
-                catch (InterruptedException e) {
-                  log.info("Interrupted, exiting!");
-                  break;
-                }
-                catch (Exception e) {
-                  final long restartDelay = 
config.getRestartDelay().getMillis();
-                  log.makeAlert(e, "Failed to manage").addData("restartDelay", 
restartDelay).emit();
-                  try {
-                    Thread.sleep(restartDelay);
-                  }
-                  catch (InterruptedException e2) {
-                    log.info("Interrupted, exiting!");
-                    break;
-                  }
-                }
-              }
+    }
+    log.info("Cleaned up [%d] tasks which failed to reacquire locks.", 
tasksToFail.size());
+
+    // Submit task management job
+    managerExec.submit(
+        () -> {
+          log.info("Beginning task management in [%s].", 
config.getStartDelay());
+          long startDelayMillis = config.getStartDelay().getMillis();
+          while (active) {
+            try {
+              Thread.sleep(startDelayMillis);
+              runTaskManagement();
+            }
+            catch (InterruptedException e) {
+              log.info("Interrupted, stopping task management.");
+              break;
+            }
+            catch (Exception e) {
+              startDelayMillis = config.getRestartDelay().getMillis();
+              log.makeAlert(e, "Failed to manage").addData("restartDelay", 
startDelayMillis).emit();
             }
           }
-      );
-      ScheduledExecutors.scheduleAtFixedRate(
-          storageSyncExec,
-          config.getStorageSyncRate(),
-          new Callable<ScheduledExecutors.Signal>()
-          {
-            @Override
-            public ScheduledExecutors.Signal call()
-            {
-              try {
-                syncFromStorage();
-              }
-              catch (Exception e) {
-                if (active) {
-                  log.makeAlert(e, "Failed to sync with storage").emit();
-                }
-              }
-              if (active) {
-                return ScheduledExecutors.Signal.REPEAT;
-              } else {
-                return ScheduledExecutors.Signal.STOP;
-              }
+        }
+    );
+
+    // Schedule storage sync job
+    ScheduledExecutors.scheduleAtFixedRate(
+        storageSyncExec,
+        config.getStorageSyncRate(),
+        () -> {
+          if (!active) {
+            log.info("Stopping storage sync as TaskQueue has been stopped");
+            return ScheduledExecutors.Signal.STOP;
+          }
+
+          try {
+            syncFromStorage();
+          }
+          catch (Exception e) {
+            if (active) {
+              log.makeAlert(e, "Failed to sync with storage").emit();
             }
           }
-      );
-      requestManagement();
-      // Remove any unacquired locks from storage (shutdown only clears 
entries for which a TaskLockPosse was acquired)
-      // This is called after requesting management as locks need to be 
cleared after notifyStatus is processed
-      for (Task task : tasksToFail) {
-        for (TaskLock lock : taskStorage.getLocks(task.getId())) {
-          taskStorage.removeLock(task.getId(), lock);
+
+          return active ? ScheduledExecutors.Signal.REPEAT : 
ScheduledExecutors.Signal.STOP;
         }
-      }
-    }
-    finally {
-      giant.unlock();
-    }
+    );
   }
 
   /**
    * Shuts down the queue.
    */
   @LifecycleStop
-  public void stop()
+  public synchronized void stop()
   {
-    giant.lock();
-
-    try {
-      tasks.clear();
-      taskFutures.clear();
-      active = false;
-      managerExec.shutdownNow();
-      storageSyncExec.shutdownNow();
-      requestManagement();
-    }
-    finally {
-      giant.unlock();
-    }
-  }
-
-  public boolean isActive()
-  {
-    return active;
+    active = false;
+    tasks.clear();
+    submittedTaskIds.clear();
+    recentlyCompletedTaskIds.clear();
+    managerExec.shutdownNow();
+    storageSyncExec.shutdownNow();
+    requestManagement();
   }
 
   /**
    * Request management from the management thread. Non-blocking.

Review Comment:
   Hmm, let me reevaluate this. Maybe we can just stick with the 
`BlockingDeque` that you had earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to