jtuglu1 commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2629428583
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1197,109 +1159,82 @@ private void startPendingTaskHandling()
}
}
- private void pendingTasksExecutionLoop()
+ @VisibleForTesting
+ void pendingTasksExecutionLoop()
{
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1,
TimeUnit.MILLISECONDS)) {
try {
- // Find one pending task to run and a worker to run on
- HttpRemoteTaskRunnerWorkItem taskItem = null;
- ImmutableWorkerInfo immutableWorker = null;
-
- synchronized (statusLock) {
- Iterator<String> iter = pendingTaskIds.iterator();
- while (iter.hasNext()) {
- String taskId = iter.next();
- HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
-
- if (ti == null || !ti.getState().isPending()) {
- // happens if the task was shutdown, failed or observed running
by a worker
- iter.remove();
- continue;
- }
-
- if (ti.getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
- // picked up by another pending task executor thread which is in
the process of trying to
- // run it on a worker, skip to next.
- continue;
- }
-
- if (ti.getTask() == null) {
- // this is not supposed to happen except for a bug, we want to
mark this task failed but
- // taskComplete(..) can not be called while holding statusLock.
See the javadoc on that
- // method.
- // so this will get marked failed afterwards outside of current
synchronized block.
- taskItem = ti;
- break;
- }
+ final PendingTaskQueueItem taskItem = pendingTasks.poll(1,
TimeUnit.MINUTES);
+ if (taskItem == null) {
+ log.info("Found no available tasks. Waiting for tasks to assign.");
+ continue;
+ }
+ final String taskId = taskItem.getTask().getId();
- immutableWorker = findWorkerToRunTask(ti.getTask());
- if (immutableWorker == null) {
- continue;
- }
+ ImmutableWorkerInfo workerToAssign;
+ int workerFetchRetries = 0;
- String prevUnackedTaskId =
workersWithUnacknowledgedTask.putIfAbsent(
- immutableWorker.getWorker().getHost(),
- taskId
- );
- if (prevUnackedTaskId != null) {
- log.makeAlert(
- "Found worker[%s] with unacked task[%s] but still was
identified to run task[%s].",
- immutableWorker.getWorker().getHost(),
- prevUnackedTaskId,
- taskId
- ).emit();
+ synchronized (workerStateLock) {
+ do {
+ workerToAssign = findWorkerToRunTask(taskItem.getTask());
+ if (workerToAssign == null) {
+ log.warn("No workers available to run task[%s]. Waiting",
taskId);
+ workerStateLock.wait(FIND_WORKER_BACKOFF_DELAY_MILLIS); // yield
the lock and wait a bit
}
+ } while (workerToAssign == null && workerFetchRetries++ <
FIND_WORKER_BACKOFF_RETRIES);
- // set state to PENDING_WORKER_ASSIGN before releasing the lock so
that this task item is not picked
- // up by another task execution thread.
- // note that we can't simply delete this task item from
pendingTaskIds or else we would have to add it
- // back if this thread couldn't run this task for any reason,
which we will know at some later time
- // and also we will need to add it back to its old position in the
list. that becomes complex quickly.
- // Instead we keep the PENDING_WORKER_ASSIGN to notify other task
execution threads not to pick this one up.
- // And, it is automatically removed by any of the task execution
threads when they notice that
- // ti.getState().isPending() is false (at the beginning of this
loop)
-
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
- taskItem = ti;
- break;
- }
-
- if (taskItem == null) {
- // Either no pending task is found or no suitable worker is found
for any of the pending tasks.
- // statusLock.notifyAll() is called whenever a new task shows up
or if there is a possibility for a task
- // to successfully get worker to run, for example when a new
worker shows up, a task slot opens up
- // because some task completed etc.
- statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+ // Exhausted worker assignment retries, let's backoff a bit
+ if (workerToAssign == null) {
+ log.warn("Failed to find workers available to run task[%s].
Sending to back of queue", taskId);
+ pendingTasks.put(taskItem.withFreshSequenceNumber());
continue;
}
- }
- String taskId = taskItem.getTaskId();
+ // Mark this worker as unassignable while task is being assigned
+ workers.compute(
+ workerToAssign.getWorker().getHost(), (key, entry) -> {
+ Preconditions.checkState(
+ entry != null,
+ "Expected selected worker[%s] to be available",
+ entry.getWorker().getHost()
+ );
+ Preconditions.checkState(
+ entry.getState() == WorkerHolder.State.READY,
+ "Expected worker[%s] state to be READY, got [%s]",
+ entry.getWorker().getHost(),
+ entry.getState()
+ );
- if (taskItem.getTask() == null) {
- log.makeAlert("No Task obj found in TaskItem for taskID[%s].
Failed.", taskId).emit();
- // taskComplete(..) must be called outside of statusLock, see
comments on method.
- taskComplete(
- taskItem,
- null,
- TaskStatus.failure(
- taskId,
- "No payload found for this task. "
- + "See overlord logs and middleManager/indexer logs for more
details."
- )
+ entry.setState(WorkerHolder.State.PENDING_ASSIGN);
+ return entry;
+ }
);
- continue;
- }
- if (immutableWorker == null) {
- throw new ISE("Unexpected state: null immutableWorker");
+ // Mark this task as pending worker assign
+ tasks.compute(
+ taskId,
+ (key, entry) -> {
+ Preconditions.checkState(entry != null, "Expected task[%s] to
be in tasks set", taskId);
+ Preconditions.checkState(
+ entry.getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING,
+ "Expected task[%s] state to be PENDING, got state[%s]",
+ taskId,
+ entry.getState()
+ );
+
+
entry.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
+ return entry;
+ }
+ );
}
+ final String workerHost = workerToAssign.getWorker().getHost();
try {
- // this will send HTTP request to worker for assigning task
- if (!runTaskOnWorker(taskItem,
immutableWorker.getWorker().getHost())) {
- if (taskItem.getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
- taskItem.revertStateFromPendingWorkerAssignToPending();
- }
+ if (!runTaskOnWorker(taskId, workerHost)) {
Review Comment:
A quick note: @kfaraz I was tempted to place this inside the above
.compute() to avoid the race, but given it could be holding the lock for a
while (and concurrent hashmap locks a range of keys, not necessarily just the
key), I opted to unlock (similar to what the existing runner does).
Additionally, if something like a removeWorker callback occurs, I'd prefer to
update the state right away, rather than wait for an assignment timeout, etc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]