jtuglu1 commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2968013721


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1190,197 +1153,197 @@ private void startPendingTaskHandling()
                  .emit();
             }
             finally {
-              log.info("PendingTaskExecution loop exited.");
+              log.info("PendingTaskExecution loop exited");
             }
           }
       );
     }
   }
 
-  private void pendingTasksExecutionLoop()
+  @VisibleForTesting
+  void pendingTasksExecutionLoop()
   {
     while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
       try {
-        // Find one pending task to run and a worker to run on
-        HttpRemoteTaskRunnerWorkItem taskItem = null;
-        ImmutableWorkerInfo immutableWorker = null;
-
-        synchronized (statusLock) {
-          Iterator<String> iter = pendingTaskIds.iterator();
-          while (iter.hasNext()) {
-            String taskId = iter.next();
-            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
-
-            if (ti == null || !ti.getState().isPending()) {
-              // happens if the task was shutdown, failed or observed running 
by a worker
-              iter.remove();
-              continue;
-            }
-
-            if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
-              // picked up by another pending task executor thread which is in 
the process of trying to
-              // run it on a worker, skip to next.
-              continue;
-            }
-
-            if (ti.getTask() == null) {
-              // this is not supposed to happen except for a bug, we want to 
mark this task failed but
-              // taskComplete(..) can not be called while holding statusLock. 
See the javadoc on that
-              // method.
-              // so this will get marked failed afterwards outside of current 
synchronized block.
-              taskItem = ti;
-              break;
-            }
-
-            immutableWorker = findWorkerToRunTask(ti.getTask());
-            if (immutableWorker == null) {
-              continue;
-            }
-
-            String prevUnackedTaskId = 
workersWithUnacknowledgedTask.putIfAbsent(
-                immutableWorker.getWorker().getHost(),
-                taskId
-            );
-            if (prevUnackedTaskId != null) {
-              log.makeAlert(
-                  "Found worker[%s] with unacked task[%s] but still was 
identified to run task[%s].",
-                  immutableWorker.getWorker().getHost(),
-                  prevUnackedTaskId,
-                  taskId
-              ).emit();
-            }
-
-            // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
-            // up by another task execution thread.
-            // note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
-            // back if this thread couldn't run this task for any reason, 
which we will know at some later time
-            // and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
-            // Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
-            // And, it is automatically removed by any of the task execution 
threads when they notice that
-            // ti.getState().isPending() is false (at the beginning of this 
loop)
-            
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
-            taskItem = ti;
-            break;
-          }
-
-          if (taskItem == null) {
-            // Either no pending task is found or no suitable worker is found 
for any of the pending tasks.
-            // statusLock.notifyAll() is called whenever a new task shows up 
or if there is a possibility for a task
-            // to successfully get worker to run, for example when a new 
worker shows up, a task slot opens up
-            // because some task completed etc.
-            statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+        final PendingTaskQueueItem taskItem = pendingTasks.poll(1, 
TimeUnit.MINUTES);
+        if (taskItem == null) {
+          log.info("Found no available tasks. Waiting for tasks to assign");
+          continue;
+        }
+        final String taskId = taskItem.getTask().getId();
+
+        ImmutableWorkerInfo workerToAssign;
+        // Set to false inside tasks.compute() if the task is no longer 
PENDING when we attempt
+        // to transition it to PENDING_WORKER_ASSIGN. A task can complete 
concurrently (e.g. via
+        // cancellation) without holding workerStateLock, so we cannot rely on 
a pre-check.
+        final AtomicBoolean taskWasPending = new AtomicBoolean(true);
+
+        synchronized (workerStateLock) {
+          workerToAssign = findWorkerToRunTask(taskItem.getTask());
+
+          if (workerToAssign == null) {

Review Comment:
   > The old logic would potentially iterate the entire pendingTaskIds looking 
for an assignable task, essentially allowing tasks to skip the line in case 
they required different capacity or different affinity workers. Please update 
the new logic to handle this case.
   
   Yes, I thought about this. This logic was a bit cumbersome and hard to read 
and was overly-conservative in the locking behavior; would you be opposed to 
simply rescheduling this task? I was thinking of extending this to some sort of 
priority/backoff queue to address this problem.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1190,197 +1153,197 @@ private void startPendingTaskHandling()
                  .emit();
             }
             finally {
-              log.info("PendingTaskExecution loop exited.");
+              log.info("PendingTaskExecution loop exited");
             }
           }
       );
     }
   }
 
-  private void pendingTasksExecutionLoop()
+  @VisibleForTesting
+  void pendingTasksExecutionLoop()
   {
     while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, 
TimeUnit.MILLISECONDS)) {
       try {
-        // Find one pending task to run and a worker to run on
-        HttpRemoteTaskRunnerWorkItem taskItem = null;
-        ImmutableWorkerInfo immutableWorker = null;
-
-        synchronized (statusLock) {
-          Iterator<String> iter = pendingTaskIds.iterator();
-          while (iter.hasNext()) {
-            String taskId = iter.next();
-            HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
-
-            if (ti == null || !ti.getState().isPending()) {
-              // happens if the task was shutdown, failed or observed running 
by a worker
-              iter.remove();
-              continue;
-            }
-
-            if (ti.getState() == 
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
-              // picked up by another pending task executor thread which is in 
the process of trying to
-              // run it on a worker, skip to next.
-              continue;
-            }
-
-            if (ti.getTask() == null) {
-              // this is not supposed to happen except for a bug, we want to 
mark this task failed but
-              // taskComplete(..) can not be called while holding statusLock. 
See the javadoc on that
-              // method.
-              // so this will get marked failed afterwards outside of current 
synchronized block.
-              taskItem = ti;
-              break;
-            }
-
-            immutableWorker = findWorkerToRunTask(ti.getTask());
-            if (immutableWorker == null) {
-              continue;
-            }
-
-            String prevUnackedTaskId = 
workersWithUnacknowledgedTask.putIfAbsent(
-                immutableWorker.getWorker().getHost(),
-                taskId
-            );
-            if (prevUnackedTaskId != null) {
-              log.makeAlert(
-                  "Found worker[%s] with unacked task[%s] but still was 
identified to run task[%s].",
-                  immutableWorker.getWorker().getHost(),
-                  prevUnackedTaskId,
-                  taskId
-              ).emit();
-            }
-
-            // set state to PENDING_WORKER_ASSIGN before releasing the lock so 
that this task item is not picked
-            // up by another task execution thread.
-            // note that we can't simply delete this task item from 
pendingTaskIds or else we would have to add it
-            // back if this thread couldn't run this task for any reason, 
which we will know at some later time
-            // and also we will need to add it back to its old position in the 
list. that becomes complex quickly.
-            // Instead we keep the PENDING_WORKER_ASSIGN to notify other task 
execution threads not to pick this one up.
-            // And, it is automatically removed by any of the task execution 
threads when they notice that
-            // ti.getState().isPending() is false (at the beginning of this 
loop)
-            
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
-            taskItem = ti;
-            break;
-          }
-
-          if (taskItem == null) {
-            // Either no pending task is found or no suitable worker is found 
for any of the pending tasks.
-            // statusLock.notifyAll() is called whenever a new task shows up 
or if there is a possibility for a task
-            // to successfully get worker to run, for example when a new 
worker shows up, a task slot opens up
-            // because some task completed etc.
-            statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+        final PendingTaskQueueItem taskItem = pendingTasks.poll(1, 
TimeUnit.MINUTES);
+        if (taskItem == null) {
+          log.info("Found no available tasks. Waiting for tasks to assign");
+          continue;
+        }
+        final String taskId = taskItem.getTask().getId();
+
+        ImmutableWorkerInfo workerToAssign;
+        // Set to false inside tasks.compute() if the task is no longer 
PENDING when we attempt
+        // to transition it to PENDING_WORKER_ASSIGN. A task can complete 
concurrently (e.g. via
+        // cancellation) without holding workerStateLock, so we cannot rely on 
a pre-check.
+        final AtomicBoolean taskWasPending = new AtomicBoolean(true);
+
+        synchronized (workerStateLock) {
+          workerToAssign = findWorkerToRunTask(taskItem.getTask());
+
+          if (workerToAssign == null) {

Review Comment:
   > The old logic would potentially iterate the entire pendingTaskIds looking 
for an assignable task, essentially allowing tasks to skip the line in case 
they required different capacity or different affinity workers. Please update 
the new logic to handle this case.
   
   Yes, I thought about this. The older logic was a bit cumbersome and hard to 
read and was overly-conservative (slow) in the locking behavior; would you be 
opposed to simply rescheduling this task? I was thinking of extending this to 
some sort of priority/backoff queue to address this problem.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to