gianm commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2967771311


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1521,206 +1480,212 @@ public void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final Worker
     final Worker worker = workerHolder.getWorker();
 
     log.debug(
-        "Worker[%s] wrote [%s] status for task [%s] on [%s]",
+        "Worker[%s] wrote status[%s] for task[%s] on location[%s]",
         worker.getHost(),
         announcement.getTaskStatus().getStatusCode(),
         taskId,
         announcement.getTaskLocation()
     );
 
-    HttpRemoteTaskRunnerWorkItem taskItem;
-    boolean shouldShutdownTask = false;
-    boolean isTaskCompleted = false;
-
-    synchronized (statusLock) {
-      taskItem = tasks.get(taskId);
-      if (taskItem == null) {
-        // Try to find information about it in the TaskStorage
-        Optional<TaskStatus> knownStatusInStorage = 
taskStorage.getStatus(taskId);
-
-        if (knownStatusInStorage.isPresent()) {
-          switch (knownStatusInStorage.get().getStatusCode()) {
-            case RUNNING:
-              taskItem = new HttpRemoteTaskRunnerWorkItem(
-                  taskId,
-                  worker,
-                  TaskLocation.unknown(),
-                  null,
-                  announcement.getTaskType(),
-                  HttpRemoteTaskRunnerWorkItem.State.RUNNING
-              );
-              tasks.put(taskId, taskItem);
-              final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-              metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
-              emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L));
-              break;
-            case SUCCESS:
-            case FAILED:
-              if (!announcement.getTaskStatus().isComplete()) {
-                log.info(
-                    "Worker[%s] reported status for completed, known from 
taskStorage, task[%s]. Ignored.",
-                    worker.getHost(),
-                    taskId
-                );
-              }
-              break;
-            default:
-              log.makeAlert(
-                  "Found unrecognized state[%s] of task[%s] in taskStorage. 
Notification[%s] from worker[%s] is ignored.",
-                  knownStatusInStorage.get().getStatusCode(),
-                  taskId,
-                  announcement,
-                  worker.getHost()
-              ).emit();
-          }
-        } else {
-          log.warn(
-              "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.",
-              worker.getHost(),
-              announcement.getStatus(),
-              taskId
-          );
-        }
-      }
-
-      if (taskItem == null) {
-        if (!announcement.getTaskStatus().isComplete()) {
-          shouldShutdownTask = true;
-        }
-      } else {
-        switch (announcement.getTaskStatus().getStatusCode()) {
-          case RUNNING:
-            switch (taskItem.getState()) {
-              case PENDING:
-              case PENDING_WORKER_ASSIGN:
-                taskItem.setWorker(worker);
-                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
-                log.info("Task[%s] started RUNNING on worker[%s].", taskId, 
worker.getHost());
-
-                final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-                IndexTaskUtils.setTaskDimensions(metricBuilder, 
taskItem.getTask());
-                emitter.emit(metricBuilder.setMetric(
-                    "task/pending/time",
-                    new Duration(taskItem.getCreatedTime(), 
DateTimes.nowUtc()).getMillis())
-                );
+    final AtomicBoolean shouldShutdownTask = new AtomicBoolean(false);
+    final AtomicBoolean isTaskCompleted = new AtomicBoolean(false);
 
-                // fall through
-              case RUNNING:
-                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
-                  if 
(!announcement.getTaskLocation().equals(taskItem.getLocation())) {
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          if (taskEntry == null) {
+            // Try to find information about it in the TaskStorage
+            Optional<TaskStatus> knownStatusInStorage = 
taskStorage.getStatus(taskId);

Review Comment:
   This is going to need to do a metadata call while holding a (partial) lock 
on `tasks`. I see the old code did it under `statusLock`, and also there's a 
resolved conversation about keeping this here. It's fine to keep it here, I 
suppose, but please include a comment about how this does a metadata call and 
may cause contention on `tasks`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java:
##########
@@ -124,7 +124,7 @@ public Response getWorkersWithUnacknowledgedTasks()
       return 
Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is 
NULL.").build();
     }
 
-    return 
Response.ok().entity(httpRemoteTaskRunner.getWorkersWithUnacknowledgedTasks()).build();
+    return 
Response.ok().entity(httpRemoteTaskRunner.getPendingAssignWorkers()).build();

Review Comment:
   Also a breaking API change, please avoid it by introducing a new API or a 
`@QueryParam`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -463,63 +365,77 @@ private boolean runTaskOnWorker(
   }
 
   // CAUTION: This method calls RemoteTaskRunnerWorkItem.setResult(..) which 
results in TaskQueue.notifyStatus() being called
-  // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "statusLock"
+  // because that is attached by TaskQueue to task result future. So, this 
method must not be called with "workerStatusLock"

Review Comment:
   Should refer to `workerStateLock`?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java:
##########
@@ -72,7 +72,7 @@ public Response getPendingTasksQueue()
       return 
Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is 
NULL.").build();
     }
 
-    return 
Response.ok().entity(httpRemoteTaskRunner.getPendingTasksList()).build();
+    return 
Response.ok().entity(httpRemoteTaskRunner.getPendingTasks()).build();

Review Comment:
   This would be a breaking change in the API response. Please avoid it by 
introducing a new API or something like `@QueryParam("full")`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1190,197 +1153,197 @@ private void startPendingTaskHandling()
                  .emit();
             }
             finally {
-              log.info("PendingTaskExecution loop exited.");
+              log.info("PendingTaskExecution loop exited");
             }
           }
       );
     }
   }
 
-  private void pendingTasksExecutionLoop()
+  @VisibleForTesting
+  void pendingTasksExecutionLoop()
   {
     while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
       try {
-        // Find one pending task to run and a worker to run on
-        HttpRemoteTaskRunnerWorkItem taskItem = null;
-        ImmutableWorkerInfo immutableWorker = null;
-
-        synchronized (statusLock) {
-          Iterator<String> iter = pendingTaskIds.iterator();
-          while (iter.hasNext()) {
-            String taskId = iter.next();
-            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
-
-            if (ti == null || !ti.getState().isPending()) {
-              // happens if the task was shutdown, failed or observed running 
by a worker
-              iter.remove();
-              continue;
-            }
-
-            if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
-              // picked up by another pending task executor thread which is in 
the process of trying to
-              // run it on a worker, skip to next.
-              continue;
-            }
-
-            if (ti.getTask() == null) {
-              // this is not supposed to happen except for a bug, we want to 
mark this task failed but
-              // taskComplete(..) can not be called while holding statusLock. 
See the javadoc on that
-              // method.
-              // so this will get marked failed afterwards outside of current 
synchronized block.
-              taskItem = ti;
-              break;
-            }
-
-            immutableWorker = findWorkerToRunTask(ti.getTask());
-            if (immutableWorker == null) {
-              continue;
-            }
-
-            String prevUnackedTaskId = 
workersWithUnacknowledgedTask.putIfAbsent(
-                immutableWorker.getWorker().getHost(),
-                taskId
-            );
-            if (prevUnackedTaskId != null) {
-              log.makeAlert(
-                  "Found worker[%s] with unacked task[%s] but still was 
identified to run task[%s].",
-                  immutableWorker.getWorker().getHost(),
-                  prevUnackedTaskId,
-                  taskId
-              ).emit();
-            }
-
-            // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
-            // up by another task execution thread.
-            // note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
-            // back if this thread couldn't run this task for any reason, 
which we will know at some later time
-            // and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
-            // Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
-            // And, it is automatically removed by any of the task execution 
threads when they notice that
-            // ti.getState().isPending() is false (at the beginning of this 
loop)
-            
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
-            taskItem = ti;
-            break;
-          }
-
-          if (taskItem == null) {
-            // Either no pending task is found or no suitable worker is found 
for any of the pending tasks.
-            // statusLock.notifyAll() is called whenever a new task shows up 
or if there is a possibility for a task
-            // to successfully get worker to run, for example when a new 
worker shows up, a task slot opens up
-            // because some task completed etc.
-            statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+        final PendingTaskQueueItem taskItem = pendingTasks.poll(1, 
TimeUnit.MINUTES);
+        if (taskItem == null) {
+          log.info("Found no available tasks. Waiting for tasks to assign");
+          continue;
+        }
+        final String taskId = taskItem.getTask().getId();
+
+        ImmutableWorkerInfo workerToAssign;
+        // Set to false inside tasks.compute() if the task is no longer 
PENDING when we attempt
+        // to transition it to PENDING_WORKER_ASSIGN. A task can complete 
concurrently (e.g. via
+        // cancellation) without holding workerStateLock, so we cannot rely on 
a pre-check.
+        final AtomicBoolean taskWasPending = new AtomicBoolean(true);
+
+        synchronized (workerStateLock) {
+          workerToAssign = findWorkerToRunTask(taskItem.getTask());
+
+          if (workerToAssign == null) {
+            // Park until a worker signals capacity change (task complete, new 
worker, state → READY).
+            // Bounded wait avoids missed wakeups from spurious interrupts.
+            log.warn("No workers available to run task[%s], waiting for 
capacity", taskId);
+            workerStateLock.wait(TimeUnit.MINUTES.toMillis(1));
+            pendingTasks.put(taskItem.requeue());
             continue;
           }
-        }
 
-        String taskId = taskItem.getTaskId();
+          // Mark this worker as unassignable while task is being assigned
+          final ImmutableWorkerInfo finalWorkerToAssign = workerToAssign;
+          workers.compute(
+              workerToAssign.getWorker().getHost(), (key, entry) -> {
+                Preconditions.checkState(
+                    entry != null,
+                    "Expected selected worker[%s] to be available",
+                    finalWorkerToAssign.getWorker().getHost()
+                );
+                Preconditions.checkState(
+                    entry.getState() == WorkerHolder.State.READY,
+                    "Expected worker[%s] state to be READY, got [%s]",
+                    entry.getWorker().getHost(),
+                    entry.getState()
+                );
 
-        if (taskItem.getTask() == null) {
-          log.makeAlert("No Task obj found in TaskItem for taskID[%s]. 
Failed.", taskId).emit();
-          // taskComplete(..) must be called outside of statusLock, see 
comments on method.
-          taskComplete(
-              taskItem,
-              null,
-              TaskStatus.failure(
-                  taskId,
-                  "No payload found for this task. "
-                  + "See overlord logs and middleManager/indexer logs for more 
details."
-              )
+                entry.setState(WorkerHolder.State.PENDING_ASSIGN);
+                return entry;
+              }
           );
-          continue;
-        }
 
-        if (immutableWorker == null) {
-          throw new ISE("Unexpected state: null immutableWorker");
+          // Mark this task as pending worker assign. A task can complete 
concurrently (e.g. via
+          // cancellation) without holding workerStateLock, so we handle the 
non-PENDING case
+          // gracefully instead of asserting. The worker will be reset to 
READY in the finally block.
+          tasks.compute(
+              taskId,
+              (key, entry) -> {
+                Preconditions.checkState(entry != null, "Expected task[%s] to 
be in tasks set", taskId);
+                if (entry.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
+                  
entry.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
+                } else {
+                  taskWasPending.set(false);
+                }
+                return entry;
+              }
+          );
         }
 
+        final String workerHost = workerToAssign.getWorker().getHost();
         try {
-          // this will send HTTP request to worker for assigning task
-          if (!runTaskOnWorker(taskItem, 
immutableWorker.getWorker().getHost())) {
-            if (taskItem.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
-              taskItem.revertStateFromPendingWorkerAssignToPending();
-            }
+          if (!taskWasPending.get()) {
+            // Task completed (e.g. cancelled) while we were setting up 
assignment.
+            // Worker state will be reset to READY in the finally block.
+            log.info(
+                "Task[%s] completed concurrently before assignment to 
worker[%s], skipping",
+                taskId,
+                workerHost
+            );
+          } else if (!runTaskOnWorker(taskId, workerHost)) {
+            log.warn("Failed to assign task[%s] to worker[%s]. Sending to back 
of queue", taskId, workerHost);
+            pendingTasks.put(taskItem.requeue());
+          } else {
+            log.info("Assigned task[%s] to worker[%s]", taskId, workerHost);
           }
         }
         catch (InterruptedException ex) {
-          log.info("Got InterruptedException while assigning task[%s].", 
taskId);
+          log.info("Got InterruptedException while assigning task[%s]", 
taskId);
           throw ex;
         }
         catch (Throwable th) {
           log.makeAlert(th, "Exception while trying to assign task")
              .addData("taskId", taskId)
              .emit();
 
-          // taskComplete(..) must be called outside of statusLock, see 
comments on method.
+          // taskComplete(..) must be called outside workerStatusLock, see 
comments on method.
           taskComplete(
-              taskItem,
+              taskId,
               null,
-              TaskStatus.failure(taskId, "Failed to assign this task. See 
overlord logs for more details.")
+              TaskStatus.failure(
+                  taskId,
+                  StringUtils.format(
+                      "Failed to assign this task to worker[%s]. See overlord 
logs for more details",
+                      workerHost
+                  )
+              )
           );
         }
         finally {
-          synchronized (statusLock) {
-            
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
-            statusLock.notifyAll();
+          // Allow the worker to accept tasks again
+          synchronized (workerStateLock) {
+            workers.compute(
+                workerHost,
+                (key, entry) -> {
+                  if (entry == null) {
+                    log.warn("Could not find worker[%s]", workerHost);
+                  } else {
+                    // Only reset the worker status if PENDING_ASSIGN
+                    // If LAZY/BLACKLISTED, either the worker is getting 
trashed eminently or will be auto-reset.
+                    
entry.compareAndExchangeState(WorkerHolder.State.PENDING_ASSIGN, 
WorkerHolder.State.READY);
+                  }
+                  return entry;
+                }
+            );
           }
-        }
 
+          // Reset task state from PENDING_WORKER_ASSIGN -> PENDING if 
assignment did not complete.
+          // This allows the task to be picked up again from the queue without 
hitting a precondition failure.
+          tasks.compute(taskId, (key, entry) -> {
+            if (entry != null) {
+              entry.revertToPending();
+            }
+            return entry;
+          });
+
+          notifyWatchers();
+        }
       }
-      catch (InterruptedException ex) {
-        log.info("Interrupted, will Exit.");
+      catch (InterruptedException e) {
+        log.warn("Interrupted, stopping pending task execution loop");
         Thread.currentThread().interrupt();
       }
       catch (Throwable th) {
-        log.makeAlert(th, "Unknown Exception while trying to assign 
tasks.").emit();
+        log.makeAlert(th, "Unknown Exception while trying to assign 
tasks").emit();
       }
     }
-  }
 
-  /**
-   * Must not be used outside of this class and {@link 
HttpRemoteTaskRunnerResource}
-   */
-  List<String> getPendingTasksList()
-  {
-    synchronized (statusLock) {
-      return ImmutableList.copyOf(pendingTaskIds);
-    }
+    log.warn("Pending tasks execution loop exited");
   }
 
   @Override
   public void shutdown(String taskId, String reason)
   {
-    if (!lifecycleLock.awaitStarted(1, TimeUnit.SECONDS)) {

Review Comment:
   Why remove this guard?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1190,197 +1153,197 @@ private void startPendingTaskHandling()
                  .emit();
             }
             finally {
-              log.info("PendingTaskExecution loop exited.");
+              log.info("PendingTaskExecution loop exited");
             }
           }
       );
     }
   }
 
-  private void pendingTasksExecutionLoop()
+  @VisibleForTesting
+  void pendingTasksExecutionLoop()
   {
     while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
       try {
-        // Find one pending task to run and a worker to run on
-        HttpRemoteTaskRunnerWorkItem taskItem = null;
-        ImmutableWorkerInfo immutableWorker = null;
-
-        synchronized (statusLock) {
-          Iterator<String> iter = pendingTaskIds.iterator();
-          while (iter.hasNext()) {
-            String taskId = iter.next();
-            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
-
-            if (ti == null || !ti.getState().isPending()) {
-              // happens if the task was shutdown, failed or observed running 
by a worker
-              iter.remove();
-              continue;
-            }
-
-            if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
-              // picked up by another pending task executor thread which is in 
the process of trying to
-              // run it on a worker, skip to next.
-              continue;
-            }
-
-            if (ti.getTask() == null) {
-              // this is not supposed to happen except for a bug, we want to 
mark this task failed but
-              // taskComplete(..) can not be called while holding statusLock. 
See the javadoc on that
-              // method.
-              // so this will get marked failed afterwards outside of current 
synchronized block.
-              taskItem = ti;
-              break;
-            }
-
-            immutableWorker = findWorkerToRunTask(ti.getTask());
-            if (immutableWorker == null) {
-              continue;
-            }
-
-            String prevUnackedTaskId = 
workersWithUnacknowledgedTask.putIfAbsent(
-                immutableWorker.getWorker().getHost(),
-                taskId
-            );
-            if (prevUnackedTaskId != null) {
-              log.makeAlert(
-                  "Found worker[%s] with unacked task[%s] but still was 
identified to run task[%s].",
-                  immutableWorker.getWorker().getHost(),
-                  prevUnackedTaskId,
-                  taskId
-              ).emit();
-            }
-
-            // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
-            // up by another task execution thread.
-            // note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
-            // back if this thread couldn't run this task for any reason, 
which we will know at some later time
-            // and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
-            // Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
-            // And, it is automatically removed by any of the task execution 
threads when they notice that
-            // ti.getState().isPending() is false (at the beginning of this 
loop)
-            
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
-            taskItem = ti;
-            break;
-          }
-
-          if (taskItem == null) {
-            // Either no pending task is found or no suitable worker is found 
for any of the pending tasks.
-            // statusLock.notifyAll() is called whenever a new task shows up 
or if there is a possibility for a task
-            // to successfully get worker to run, for example when a new 
worker shows up, a task slot opens up
-            // because some task completed etc.
-            statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+        final PendingTaskQueueItem taskItem = pendingTasks.poll(1, 
TimeUnit.MINUTES);
+        if (taskItem == null) {
+          log.info("Found no available tasks. Waiting for tasks to assign");
+          continue;
+        }
+        final String taskId = taskItem.getTask().getId();
+
+        ImmutableWorkerInfo workerToAssign;
+        // Set to false inside tasks.compute() if the task is no longer 
PENDING when we attempt
+        // to transition it to PENDING_WORKER_ASSIGN. A task can complete 
concurrently (e.g. via
+        // cancellation) without holding workerStateLock, so we cannot rely on 
a pre-check.
+        final AtomicBoolean taskWasPending = new AtomicBoolean(true);
+
+        synchronized (workerStateLock) {
+          workerToAssign = findWorkerToRunTask(taskItem.getTask());
+
+          if (workerToAssign == null) {

Review Comment:
   It looks like this code will park and wait if the next task from 
`pendingTasks` can't be assigned. But there are situations where that task A 
can't be assigned, but another, later task B can be assigned. For example, if 
there is 1 free slot, and task A has `requiredCapacity: 2` while task B has 
`requiredCapacity: 1`. Another example: if strong worker affinity is 
configured, and none of the affinity workers for task A are available, but 
affinity workers for task B *are* available.
   
   The old logic would potentially iterate the entire `pendingTaskIds` looking 
for an assignable task, essentially allowing tasks to skip the line in case 
they required different capacity or different affinity workers. Please update 
the new logic to handle this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to