kfaraz commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2631164807
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1521,206 +1456,215 @@ public void taskAddedOrUpdated(final TaskAnnouncement
announcement, final Worker
final Worker worker = workerHolder.getWorker();
log.debug(
- "Worker[%s] wrote [%s] status for task [%s] on [%s]",
+ "Worker[%s] wrote status[%s] for task[%s] on [%s]",
Review Comment:
```suggestion
"Worker[%s] wrote status[%s] for task[%s] on location[%s]",
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -186,39 +176,28 @@ public class HttpRemoteTaskRunner implements
WorkerTaskRunner, TaskLogStreamer,
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
private final HttpClient httpClient;
- private final ObjectMapper smileMapper;
+ private final ObjectMapper objectMapper;
Review Comment:
Looking at the factory class, we are still passing `@Smile` annotated mapper
here. Let's retain the original name of this field to avoid ambiguity with the
other mapper (typically injected without an annotation or with `@Json`).
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -402,31 +296,54 @@ private ImmutableWorkerInfo findWorkerToRunTask(Task task)
}
private boolean runTaskOnWorker(
Review Comment:
Please add a javadoc to this method, which clarifies what the return
value/excpetion in this method means.
- true: successfully assigned
- false: failed to assign, retry later
- exception: failed to assign, mark task as completed
I wonder if this tri-state should be captured in the return value itself
rather than throwing varying exceptions like ISE, IllegalState (using
Preconditions). You could return a value which has a `success` boolean, a
`retry` boolean and a String `failureReason`, which can be used for
alerting/task completion message.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,66 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy
ZK updation WorkerTaskMonitor class.
+ * HTTP-based distributed task scheduler that manages assignment of tasks to
slots on workers (MiddleManagers).
+ * State information:
+ * Task state machine is as follows:
+ * 1. PENDING – Task has been submitted to the scheduler.
+ * 2. PENDING_ASSIGN – Task has been assignment to a worker, but has not
started running yet.
+ * 3. EXECUTING – Task is running on a worker.
+ * 4. COMPLETE – Task has completed (success/fail).
+ * Worker state machine is as follows:
+ * 1. READY – Worker is online and ready to receive new tasks.
+ * 2. PENDING_ASSIGN – A task has been submitted to this worker, but has
not started running yet.
+ * 3. BLACKLISTED – Worker has too many failed tasks.
+ * 4. LAZY – Worker has no more task running and been marked as reapable by
the worker auto-scaler.
Review Comment:
This is not needed here and should be moved to the respective enum values.
Just link them here using `@see WorkerHolder.State` and `@see
HttpRemoteTaskRunnerWorkItem.State`
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
Review Comment:
Don't use a future here. Instead, keep an additional `TaskStatus
resultStatus` or similar field in `HttpRemoteTaskRunnerWorkItem`, which gets
updated on calling `taskEntry.setStatus()`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring
this failure.");
+ }
+ } else {
+ // Notify interested parties
Review Comment:
We are not notifying here.
```suggestion
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,55 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy
ZK updation WorkerTaskMonitor class.
+ * TODO: add a more descriptive title
Review Comment:
Can this be reverted/updated now?
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -112,71 +107,66 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
- * A Remote TaskRunner to manage tasks on Middle Manager nodes using
internal-discovery({@link DruidNodeDiscoveryProvider})
- * to discover them and Http.
- * Middle Managers manages list of assigned/completed tasks on disk and expose
3 HTTP endpoints
- * 1. POST request for assigning a task
- * 2. POST request for shutting down a task
- * 3. GET request for getting list of assigned, running, completed tasks on
Middle Manager and its enable/disable status.
- * This endpoint is implemented to support long poll and holds the request
till there is a change. This class
- * sends the next request immediately as the previous finishes to keep the
state up-to-date.
- * <p>
- * ZK_CLEANUP_TODO : As of 0.11.1, it is required to cleanup task status paths
from ZK which are created by the
- * workers to support deprecated RemoteTaskRunner. So a method
"scheduleCompletedTaskStatusCleanupFromZk()" is added'
- * which should be removed in the release that removes RemoteTaskRunner legacy
ZK updation WorkerTaskMonitor class.
+ * HTTP-based distributed task scheduler that manages assignment of tasks to
slots on workers (MiddleManagers).
Review Comment:
```suggestion
* HTTP-based distributed task scheduler that manages assignment of tasks to
slots on workers (MiddleManagers or Indexers).
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
Review Comment:
If we find the entry to be null, just set some flag (similar to
`taskCompleted`) and return null rather than throwing exception. We can throw
the exception outside the `compute`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
Review Comment:
This try-catch will not be needed at all, if we don't use a future.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring
this failure.");
+ }
+ } else {
+ // Notify interested parties
+ taskEntry.setResult(taskStatus);
+ taskCompleted.set(true);
+ }
+
+ return taskEntry;
}
- }
- catch (InterruptedException e) {
- log.warn(e, "Interrupted while getting the last known task status.");
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e) {
- // This case should not really happen.
- log.warn(e, "Failed to get the last known task status. Ignoring this
failure.");
- }
- } else {
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(),
taskStatus);
+ );
- // Update success/failure counters, Blacklist node if there are too many
failures.
- if (workerHolder != null) {
- blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ if (workerHost != null) {
+ synchronized (workerStateLock) {
Review Comment:
Doesn't seem like we need to acquire this lock.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring
this failure.");
+ }
+ } else {
+ // Notify interested parties
+ taskEntry.setResult(taskStatus);
+ taskCompleted.set(true);
+ }
+
+ return taskEntry;
}
- }
- catch (InterruptedException e) {
- log.warn(e, "Interrupted while getting the last known task status.");
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e) {
- // This case should not really happen.
- log.warn(e, "Failed to get the last known task status. Ignoring this
failure.");
- }
- } else {
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(),
taskStatus);
+ );
- // Update success/failure counters, Blacklist node if there are too many
failures.
- if (workerHolder != null) {
- blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ if (workerHost != null) {
+ synchronized (workerStateLock) {
+ workers.compute(
+ workerHost,
+ (key, workerHolder) -> {
+ if (workerHolder != null) {
+ log.info(
+ "Worker[%s] completed task[%s] with status[%s]",
+ workerHolder.getWorker().getHost(),
+ taskStatus.getId(),
+ taskStatus.getStatusCode()
+ );
+ // Worker is done with this task
+ workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
+ blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ } else {
+ log.warn("Could not find worker[%s]", workerHost);
Review Comment:
Is this really a warning? It is possible that the task finished on the
worker and then the worker went away.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring
this failure.");
+ }
+ } else {
+ // Notify interested parties
+ taskEntry.setResult(taskStatus);
+ taskCompleted.set(true);
+ }
+
+ return taskEntry;
}
- }
- catch (InterruptedException e) {
- log.warn(e, "Interrupted while getting the last known task status.");
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e) {
- // This case should not really happen.
- log.warn(e, "Failed to get the last known task status. Ignoring this
failure.");
- }
- } else {
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(),
taskStatus);
+ );
- // Update success/failure counters, Blacklist node if there are too many
failures.
- if (workerHolder != null) {
- blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ if (workerHost != null) {
+ synchronized (workerStateLock) {
+ workers.compute(
+ workerHost,
+ (key, workerHolder) -> {
+ if (workerHolder != null) {
+ log.info(
+ "Worker[%s] completed task[%s] with status[%s]",
+ workerHolder.getWorker().getHost(),
+ taskStatus.getId(),
+ taskStatus.getStatusCode()
+ );
Review Comment:
Nit: Can be 1-lined, I feel.
```suggestion
log.info("Worker[%s] completed task[%s] with status[%s].",
workerHost, taskId, taskStatus);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
Review Comment:
```suggestion
"Ignoring update to status[%s] as task[%s] has already
completed with status[%s].",
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -564,20 +498,27 @@ public void nodeViewInitializedTimedOut()
log.info("Waiting for worker discovery...");
}
}
- log.info("[%s] Workers are discovered.", workers.size());
+ log.info("Discovered [%d] workers.", workers.size());
- // Wait till all worker state is sync'd so that we know which worker is
running/completed what tasks or else
+ // Wait till all worker state is synced so that we know which worker is
running/completed what tasks or else
// We would start assigning tasks which are pretty soon going to be
reported by discovered workers.
- for (WorkerHolder worker : workers.values()) {
- log.info("Waiting for worker[%s] to sync state...",
worker.getWorker().getHost());
- worker.waitForInitialization();
- }
- log.info("Workers have sync'd state successfully.");
+ workers.forEach((workerHost, workerEntry) -> {
+ try {
+ workerEntry.waitForInitialization();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ });
+ log.info("Workers have synced state successfully.");
Review Comment:
I don't think changing from the regular for loop to a `forEach` makes much
of a difference. It is still possible for new items to get added to the
`workers` map after we started the `forEach` iteration.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -466,60 +377,82 @@ private boolean runTaskOnWorker(
// because that is attached by TaskQueue to task result future. So, this
method must not be called with "statusLock"
// held. See https://github.com/apache/druid/issues/6201
private void taskComplete(
- HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem,
- WorkerHolder workerHolder,
+ String taskId,
+ String workerHost,
TaskStatus taskStatus
)
{
- Preconditions.checkState(!Thread.holdsLock(statusLock), "Current thread
must not hold statusLock.");
- Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
- Preconditions.checkNotNull(taskStatus, "taskStatus");
- if (workerHolder != null) {
- log.info(
- "Worker[%s] completed task[%s] with status[%s]",
- workerHolder.getWorker().getHost(),
- taskStatus.getId(),
- taskStatus.getStatusCode()
- );
- // Worker is done with this task
- workerHolder.setLastCompletedTaskTime(DateTimes.nowUtc());
- }
+ Preconditions.checkState(!Thread.holdsLock(workerStateLock), "Current
thread must not hold workerStateLock.");
+ Preconditions.checkState(!Thread.holdsLock(taskStateLock), "Current thread
must not hold taskStateLock.");
- if (taskRunnerWorkItem.getResult().isDone()) {
- // This is not the first complete event.
- try {
- TaskState lastKnownState =
taskRunnerWorkItem.getResult().get().getStatusCode();
- if (taskStatus.getStatusCode() != lastKnownState) {
- log.warn(
- "The state of the new task complete event is different from its
last known state. "
- + "New state[%s], last known state[%s]",
- taskStatus.getStatusCode(),
- lastKnownState
- );
+ AtomicBoolean taskCompleted = new AtomicBoolean(false);
+
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ Preconditions.checkState(taskEntry != null, "Expected task[%s] to
exist", taskId);
+ if (taskEntry.getResult().isDone()) {
+ // This is not the first complete event.
+ try {
+ TaskState lastKnownState =
taskEntry.getResult().get().getStatusCode();
+ if (taskStatus.getStatusCode() != lastKnownState) {
+ log.warn(
+ "The state of the new task complete event is different
from its last known state. "
+ + "New state[%s], last known state[%s]",
+ taskStatus.getStatusCode(),
+ lastKnownState
+ );
+ }
+ }
+ catch (InterruptedException e) {
+ log.warn(e, "Interrupted while getting the last known task
status.");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e) {
+ // This case should not really happen.
+ log.warn(e, "Failed to get the last known task status. Ignoring
this failure.");
+ }
+ } else {
+ // Notify interested parties
+ taskEntry.setResult(taskStatus);
+ taskCompleted.set(true);
+ }
+
+ return taskEntry;
}
- }
- catch (InterruptedException e) {
- log.warn(e, "Interrupted while getting the last known task status.");
- Thread.currentThread().interrupt();
- }
- catch (ExecutionException e) {
- // This case should not really happen.
- log.warn(e, "Failed to get the last known task status. Ignoring this
failure.");
- }
- } else {
- // Notify interested parties
- taskRunnerWorkItem.setResult(taskStatus);
- TaskRunnerUtils.notifyStatusChanged(listeners, taskStatus.getId(),
taskStatus);
+ );
- // Update success/failure counters, Blacklist node if there are too many
failures.
- if (workerHolder != null) {
- blacklistWorkerIfNeeded(taskStatus, workerHolder);
+ if (workerHost != null) {
+ synchronized (workerStateLock) {
+ workers.compute(
+ workerHost,
+ (key, workerHolder) -> {
+ if (workerHolder != null) {
+ log.info(
+ "Worker[%s] completed task[%s] with status[%s]",
+ workerHolder.getWorker().getHost(),
+ taskStatus.getId(),
+ taskStatus.getStatusCode()
+ );
Review Comment:
Also, should we keep this log line outside the `compute`? It doesn't need to
be inside the compute block.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]