himanshug commented on a change in pull request #8697: HRTR: make pending task execution handling to go through all tasks on not finding worker slots URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r357028172
########## File path: indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java ########## @@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); - addPendingTaskToExecutor(task.getId()); + pendingTaskIds.add(task.getId()); + + statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { - pendingTasksExec.execute( - () -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - ImmutableWorkerInfo immutableWorker; - HttpRemoteTaskRunnerWorkItem taskItem = null; + for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { - taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { + log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); + return; + } - if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; - } + pendingTasksExecutionLoop(); + } + catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); + } + finally { + log.info("PendingTaskExecution loop exited."); + } + } + ); + } + } - if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; - } + private void pendingTasksExecutionLoop() + { + while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { + // Find one pending task to run and a worker to run on + HttpRemoteTaskRunnerWorkItem taskItem = null; + ImmutableWorkerInfo immutableWorker = null; - if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); - } - immutableWorker = findWorkerToRunTask(taskItem.getTask()); - - if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; - } else if (workersWithUnacknowledgedTask.putIfAbsent( - immutableWorker.getWorker().getHost(), - taskId - ) != null) { - // there was a race and someone else took this worker slot, try again - continue; - } - } + synchronized (statusLock) { + Iterator<String> iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { + String taskId = iter.next(); + HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + + if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown or was picked up earlier and no more pending + iter.remove(); Review comment: > Do you mean the other threads can pick up even after the ti state is set to PENDING_WORKER_ASSIGN but will ignore it in the below if clause? Or, do they really not pick up because somehow pendingTaskIds is updated before other threads start picking up? yes, other executor threads would see that but ignore because of the if clause you noted. > But can pendingTaskIds be updated after this line? Like as the below code: ... if we removed here then we might have to add it back just in case task couldn't run for whatever reason, adding back would be complex because we will find out that we need to add it back at some later time and also adding back to its old position in the list would be difficult. as opposed to all that we remove it from `pendintTaskIds` only when task is known to be no longer in `PENDING` or `PENDING_WORKER_ASSIGN` HttpRemoteTaskRunnerWorkItem.State by whichever pendingTasksExecutor thread notices that . ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org