himanshug commented on a change in pull request #8697: HRTR: make pending task
execution handling to go through all tasks on not finding worker slots
URL: https://github.com/apache/incubator-druid/pull/8697#discussion_r356801301
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java
##########
@@ -1021,92 +1043,164 @@ public void unregisterListener(String listenerId)
HttpRemoteTaskRunnerWorkItem.State.PENDING
);
tasks.put(task.getId(), taskRunnerWorkItem);
- addPendingTaskToExecutor(task.getId());
+ pendingTaskIds.add(task.getId());
+
+ statusLock.notifyAll();
+
return taskRunnerWorkItem.getResult();
}
}
}
- private void addPendingTaskToExecutor(final String taskId)
+ private void startPendingTaskHandling()
{
- pendingTasksExec.execute(
- () -> {
- while (!Thread.interrupted() && lifecycleLock.awaitStarted(1,
TimeUnit.MILLISECONDS)) {
- ImmutableWorkerInfo immutableWorker;
- HttpRemoteTaskRunnerWorkItem taskItem = null;
+ for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) {
+ pendingTasksExec.submit(
+ () -> {
try {
- synchronized (statusLock) {
- taskItem = tasks.get(taskId);
+ if (!lifecycleLock.awaitStarted()) {
+ log.makeAlert("Lifecycle not started, PendingTaskExecution
loop will not run.").emit();
+ return;
+ }
- if (taskItem == null) {
- log.info(
- "Task[%s] work item not found. Probably user asked to
shutdown before. Not assigning.",
- taskId
- );
- return;
- }
+ pendingTasksExecutionLoop();
+ }
+ catch (Throwable t) {
+ log.makeAlert(t, "Error while waiting for lifecycle start.
PendingTaskExecution loop will not run")
+ .emit();
+ }
+ finally {
+ log.info("PendingTaskExecution loop exited.");
+ }
+ }
+ );
+ }
+ }
- if (taskItem.getState() !=
HttpRemoteTaskRunnerWorkItem.State.PENDING) {
- log.info(
- "Task[%s] is in state[%s]. Probably some worker already
reported it. Not assigning.",
- taskId,
- taskItem.getState()
- );
- return;
- }
+ private void pendingTasksExecutionLoop()
+ {
+ while (!Thread.interrupted() && lifecycleLock.awaitStarted(1,
TimeUnit.MILLISECONDS)) {
+ try {
+ // Find one pending task to run and a worker to run on
+ HttpRemoteTaskRunnerWorkItem taskItem = null;
+ ImmutableWorkerInfo immutableWorker = null;
- if (taskItem.getTask() == null) {
- throw new ISE("WTF! couldn't find Task instance for
taskId[%s].", taskId);
- }
- immutableWorker = findWorkerToRunTask(taskItem.getTask());
-
- if (immutableWorker == null) {
- // no free worker, wait for some worker to become free
-
statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis());
- continue;
- } else if (workersWithUnacknowledgedTask.putIfAbsent(
- immutableWorker.getWorker().getHost(),
- taskId
- ) != null) {
- // there was a race and someone else took this worker slot,
try again
- continue;
- }
- }
+ synchronized (statusLock) {
+ Iterator<String> iter = pendingTaskIds.iterator();
+ while (iter.hasNext()) {
+ String taskId = iter.next();
+ HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId);
+
+ if (ti == null || !ti.getState().isPending()) {
+ // happens if the task was shutdown or was picked up earlier and
no more pending
+ iter.remove();
+ continue;
+ }
- try {
- // this will send HTTP request to worker for assigning task
and hence kept
- // outside the synchronized block.
- if (runTaskOnWorker(taskItem,
immutableWorker.getWorker().getHost())) {
- return;
- }
- }
- finally {
-
workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost());
- synchronized (statusLock) {
- statusLock.notifyAll();
- }
- }
+ if (ti.getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+ // picked up by another pending task executor thread.
+ continue;
}
- catch (InterruptedException ex) {
- log.info("Got InterruptedException while assigning task[%s].",
taskId);
- Thread.currentThread().interrupt();
- return;
+ if (ti.getTask() == null) {
+ // this is not supposed to happen except for a bug, we want to
mark this task failed but
+ // taskComplete(..) can not be called while holding statusLock.
See the javadoc on that
+ // method.
+ // so this will get marked failed afterwards outside of current
synchronized block.
+ taskItem = ti;
+ break;
}
- catch (Throwable th) {
- log.makeAlert(th, "Exception while trying to assign task")
- .addData("taskId", taskId)
- .emit();
- if (taskItem != null) {
- taskComplete(taskItem, null, TaskStatus.failure(taskId));
- }
+ immutableWorker = findWorkerToRunTask(ti.getTask());
+ if (immutableWorker == null) {
+ continue;
+ }
+
+ String prevUnackedTaskId =
workersWithUnacknowledgedTask.putIfAbsent(
+ immutableWorker.getWorker().getHost(),
+ taskId
+ );
+ if (prevUnackedTaskId != null) {
+ log.makeAlert(
+ "Found worker[%s] with unacked task[%s] but still was
identified to run task[%s].",
+ immutableWorker.getWorker().getHost(),
+ prevUnackedTaskId,
+ taskId
+ ).emit();
+ }
+
+
ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN);
+ taskItem = ti;
+ break;
+ }
+
+ if (taskItem == null) {
+ // Either no pending task is found or no suitable worker is found
for any of the pending tasks.
+ // statusLock.notifyAll() is called whenever a new task shows up
or if there is a possibility for a task
+ // to successfully get worker to run, for example when a new
worker shows up, a task slot opens up
+ // because some task completed etc.
+ statusLock.wait(TimeUnit.MINUTES.toMillis(1));
+ continue;
+ }
+ }
+
+ String taskId = taskItem.getTaskId();
+
+ if (taskItem.getTask() == null) {
+ log.makeAlert("No Task obj found in TaskItem for taskID[%s].
Failed.", taskId).emit();
+ taskComplete(taskItem, null, TaskStatus.failure(taskId));
+ continue;
+ }
+
+ try {
+ if (immutableWorker == null) {
+ throw new ISE("NULL immutableWorker");
+ }
- return;
+ // this will send HTTP request to worker for assigning task
+ if (!runTaskOnWorker(taskItem,
immutableWorker.getWorker().getHost())) {
+ if (taskItem.getState() ==
HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) {
+ taskItem.revertStateFromPendingWorkerAssignToPending();
}
}
}
- );
+ catch (InterruptedException ex) {
+ log.info("Got InterruptedException while assigning task[%s].",
taskId);
+ throw ex;
+ }
+ catch (Throwable th) {
+ log.makeAlert(th, "Exception while trying to assign task")
+ .addData("taskId", taskId)
+ .emit();
+
+ taskComplete(taskItem, null, TaskStatus.failure(taskId));
Review comment:
yes taskCompleted(..) has to be called outside of statusLock .. I have added
comments please take a look.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]