kfaraz commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2631164807


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1521,206 +1456,215 @@ public void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final Worker
     final Worker worker = workerHolder.getWorker();
 
     log.debug(
-        "Worker[%s] wrote [%s] status for task [%s] on [%s]",
+        "Worker[%s] wrote status[%s] for task[%s] on [%s]",

Review Comment:
   ```suggestion
           "Worker[%s] wrote status[%s] for task[%s] on location[%s]",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -186,39 +176,28 @@ public class HttpRemoteTaskRunner implements 
WorkerTaskRunner, TaskLogStreamer,
 
   private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
   private final HttpClient httpClient;
-  private final ObjectMapper smileMapper;
+  private final ObjectMapper objectMapper;

Review Comment:
   Looking at the factory class, we are still passing `@Smile` annotated mapper 
here. Let's retain the original name of this field to avoid ambiguity with the 
other mapper (typically injected without an annotation or with `@Json`).



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -402,31 +296,54 @@ private ImmutableWorkerInfo findWorkerToRunTask(Task task)
   }
 
   private boolean runTaskOnWorker(

Review Comment:
   Please add a javadoc to this method, which clarifies what the return 
value/excpetion in this method means.
   - true: successfully assigned
   - false: failed to assign, retry later
   - exception: failed to assign, mark task as completed
   
   I wonder if this tri-state should be captured in the return value itself 
rather than throwing varying exceptions like ISE, IllegalState (using 
Preconditions). You could return a value which has a `success` boolean, a 
`retry` boolean and a String `failureReason`, which can be used for 
alerting/task completion message.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,66 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using 
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose 
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on 
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request 
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the 
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths 
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method 
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy 
ZK updation WorkerTaskMonitor class.
+ * HTTP-based distributed task scheduler that manages assignment of tasks to 
slots on workers (MiddleManagers).
+ * State information:
+ *  Task state machine is as follows:
+ *    1. PENDING – Task has been submitted to the scheduler.
+ *    2. PENDING_ASSIGN – Task has been assignment to a worker, but has not 
started running yet.
+ *    3. EXECUTING – Task is running on a worker.
+ *    4. COMPLETE – Task has completed (success/fail).
+ *  Worker state machine is as follows:
+ *    1. READY – Worker is online and ready to receive new tasks.
+ *    2. PENDING_ASSIGN – A task has been submitted to this worker, but has 
not started running yet.
+ *    3. BLACKLISTED – Worker has too many failed tasks.
+ *    4. LAZY – Worker has no more task running and been marked as reapable by 
the worker auto-scaler.

Review Comment:
   This is not needed here and should be moved to the respective enum values.
   Just link them here using `@see WorkerHolder.State` and `@see 
HttpRemoteTaskRunnerWorkItem.State`



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {

Review Comment:
   Don't use a future here. Instead, keep an additional `TaskStatus 
resultStatus` or similar field in `HttpRemoteTaskRunnerWorkItem`, which gets 
updated on calling `taskEntry.setStatus()`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties

Review Comment:
   We are not notifying here.
   ```suggestion           
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,55 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using 
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose 
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on 
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request 
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the 
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths 
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method 
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy 
ZK updation WorkerTaskMonitor class.
+ * TODO: add a more descriptive title

Review Comment:
   Can this be reverted/updated now?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,66 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using 
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose 
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on 
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request 
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the 
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths 
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method 
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy 
ZK updation WorkerTaskMonitor class.
+ * HTTP-based distributed task scheduler that manages assignment of tasks to 
slots on workers (MiddleManagers).

Review Comment:
   ```suggestion
    * HTTP-based distributed task scheduler that manages assignment of tasks to 
slots on workers (MiddleManagers or Indexers).
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);

Review Comment:
   If we find the entry to be null, just set some flag (similar to 
`taskCompleted`) and return null rather than throwing exception. We can throw 
the exception outside the `compute`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.

Review Comment:
   This try-catch will not be needed at all, if we don't use a future.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties
+            taskEntry.setResult(taskStatus);
+            taskCompleted.set(true);
+          }
+
+          return taskEntry;
         }
-      }
-      catch (InterruptedException e) {
-        log.warn(e, "Interrupted while getting the last known task status.");
-        Thread.currentThread().interrupt();
-      }
-      catch (ExecutionException e) {
-        // This case should not really happen.
-        log.warn(e, "Failed to get the last known task status. Ignoring this 
failure.");
-      }
-    } else {
-      // Notify interested parties
-      taskRunnerWorkItem.setResult(taskStatus);
-      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), 
taskStatus);
+    );
 
-      // Update success/failure counters, Blacklist node if there are too many 
failures.
-      if (workerHolder != null) {
-        blacklistWorkerIfNeeded(taskStatus, workerHolder);
+    if (workerHost != null) {
+      synchronized (workerStateLock) {

Review Comment:
   Doesn't seem like we need to acquire this lock.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties
+            taskEntry.setResult(taskStatus);
+            taskCompleted.set(true);
+          }
+
+          return taskEntry;
         }
-      }
-      catch (InterruptedException e) {
-        log.warn(e, "Interrupted while getting the last known task status.");
-        Thread.currentThread().interrupt();
-      }
-      catch (ExecutionException e) {
-        // This case should not really happen.
-        log.warn(e, "Failed to get the last known task status. Ignoring this 
failure.");
-      }
-    } else {
-      // Notify interested parties
-      taskRunnerWorkItem.setResult(taskStatus);
-      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), 
taskStatus);
+    );
 
-      // Update success/failure counters, Blacklist node if there are too many 
failures.
-      if (workerHolder != null) {
-        blacklistWorkerIfNeeded(taskStatus, workerHolder);
+    if (workerHost != null) {
+      synchronized (workerStateLock) {
+        workers.compute(
+            workerHost,
+            (key, workerHolder) -> {
+              if (workerHolder != null) {
+                log.info(
+                    "Worker[%s] completed task[%s] with status[%s]",
+                    workerHolder.getWorker().getHost(),
+                    taskStatus.getId(),
+                    taskStatus.getStatusCode()
+                );
+                // Worker is done with this task
+                workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
+                blacklistWorkerIfNeeded(taskStatus, workerHolder);
+              } else {
+                log.warn("Could not find worker[%s]", workerHost);

Review Comment:
   Is this really a warning? It is possible that the task finished on the 
worker and then the worker went away.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties
+            taskEntry.setResult(taskStatus);
+            taskCompleted.set(true);
+          }
+
+          return taskEntry;
         }
-      }
-      catch (InterruptedException e) {
-        log.warn(e, "Interrupted while getting the last known task status.");
-        Thread.currentThread().interrupt();
-      }
-      catch (ExecutionException e) {
-        // This case should not really happen.
-        log.warn(e, "Failed to get the last known task status. Ignoring this 
failure.");
-      }
-    } else {
-      // Notify interested parties
-      taskRunnerWorkItem.setResult(taskStatus);
-      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), 
taskStatus);
+    );
 
-      // Update success/failure counters, Blacklist node if there are too many 
failures.
-      if (workerHolder != null) {
-        blacklistWorkerIfNeeded(taskStatus, workerHolder);
+    if (workerHost != null) {
+      synchronized (workerStateLock) {
+        workers.compute(
+            workerHost,
+            (key, workerHolder) -> {
+              if (workerHolder != null) {
+                log.info(
+                    "Worker[%s] completed task[%s] with status[%s]",
+                    workerHolder.getWorker().getHost(),
+                    taskStatus.getId(),
+                    taskStatus.getStatusCode()
+                );

Review Comment:
   Nit: Can be 1-lined, I feel.
   ```suggestion
                   log.info("Worker[%s] completed task[%s] with status[%s].", 
workerHost, taskId, taskStatus);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",

Review Comment:
   ```suggestion
                       "Ignoring update to status[%s] as task[%s] has already 
completed with status[%s].",
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -564,20 +498,27 @@ public void nodeViewInitializedTimedOut()
         log.info("Waiting for worker discovery...");
       }
     }
-    log.info("[%s] Workers are discovered.", workers.size());
+    log.info("Discovered [%d] workers.", workers.size());
 
-    // Wait till all worker state is sync'd so that we know which worker is 
running/completed what tasks or else
+    // Wait till all worker state is synced so that we know which worker is 
running/completed what tasks or else
     // We would start assigning tasks which are pretty soon going to be 
reported by discovered workers.
-    for (WorkerHolder worker : workers.values()) {
-      log.info("Waiting for worker[%s] to sync state...", 
worker.getWorker().getHost());
-      worker.waitForInitialization();
-    }
-    log.info("Workers have sync'd state successfully.");
+    workers.forEach((workerHost, workerEntry) -> {
+      try {
+        workerEntry.waitForInitialization();
+      }
+      catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+    });
+    log.info("Workers have synced state successfully.");

Review Comment:
   I don't think changing from the regular for loop to a `forEach` makes much 
of a difference. It is still possible for new items to get added to the 
`workers` map after we started the `forEach` iteration.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
   // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
   // held. See https://github.com/apache/druid/issues/6201
   private void taskComplete(
-      HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
-      WorkerHolder workerHolder,
+      String taskId,
+      String workerHost,
       TaskStatus taskStatus
   )
   {
-    Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread 
must not hold statusLock.");
-    Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
-    Preconditions.checkNotNull(taskStatus, "taskStatus");
-    if (workerHolder != null) {
-      log.info(
-          "Worker[%s] completed task[%s] with status[%s]",
-          workerHolder.getWorker().getHost(),
-          taskStatus.getId(),
-          taskStatus.getStatusCode()
-      );
-      // Worker is done with this task
-      workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
-    }
+    Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current 
thread must not hold workerStateLock.");
+    Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread 
must not hold taskStateLock.");
 
-    if (taskRunnerWorkItem.getResult().isDone()) {
-      // This is not the first complete event.
-      try {
-        TaskState lastKnownState = 
taskRunnerWorkItem.getResult().get().getStatusCode();
-        if (taskStatus.getStatusCode() != lastKnownState) {
-          log.warn(
-              "The state of the new task complete event is different from its 
last known state. "
-              + "New state[%s], last known state[%s]",
-              taskStatus.getStatusCode(),
-              lastKnownState
-          );
+    AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          Preconditions.checkState(taskEntry != null, "Expected task[%s] to 
exist", taskId);
+          if (taskEntry.getResult().isDone()) {
+            // This is not the first complete event.
+            try {
+              TaskState lastKnownState = 
taskEntry.getResult().get().getStatusCode();
+              if (taskStatus.getStatusCode() != lastKnownState) {
+                log.warn(
+                    "The state of the new task complete event is different 
from its last known state. "
+                    + "New state[%s], last known state[%s]",
+                    taskStatus.getStatusCode(),
+                    lastKnownState
+                );
+              }
+            }
+            catch (InterruptedException e) {
+              log.warn(e, "Interrupted while getting the last known task 
status.");
+              Thread.currentThread().interrupt();
+            }
+            catch (ExecutionException e) {
+              // This case should not really happen.
+              log.warn(e, "Failed to get the last known task status. Ignoring 
this failure.");
+            }
+          } else {
+            // Notify interested parties
+            taskEntry.setResult(taskStatus);
+            taskCompleted.set(true);
+          }
+
+          return taskEntry;
         }
-      }
-      catch (InterruptedException e) {
-        log.warn(e, "Interrupted while getting the last known task status.");
-        Thread.currentThread().interrupt();
-      }
-      catch (ExecutionException e) {
-        // This case should not really happen.
-        log.warn(e, "Failed to get the last known task status. Ignoring this 
failure.");
-      }
-    } else {
-      // Notify interested parties
-      taskRunnerWorkItem.setResult(taskStatus);
-      TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(), 
taskStatus);
+    );
 
-      // Update success/failure counters, Blacklist node if there are too many 
failures.
-      if (workerHolder != null) {
-        blacklistWorkerIfNeeded(taskStatus, workerHolder);
+    if (workerHost != null) {
+      synchronized (workerStateLock) {
+        workers.compute(
+            workerHost,
+            (key, workerHolder) -> {
+              if (workerHolder != null) {
+                log.info(
+                    "Worker[%s] completed task[%s] with status[%s]",
+                    workerHolder.getWorker().getHost(),
+                    taskStatus.getId(),
+                    taskStatus.getStatusCode()
+                );

Review Comment:
   Also, should we keep this log line outside the `compute`? It doesn't need to 
be inside the compute block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to