jtuglu1 commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2968068099
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1521,206 +1480,212 @@ public void taskAddedOrUpdated(final TaskAnnouncement
announcement, final Worker
final Worker worker = workerHolder.getWorker();
log.debug(
- "Worker[%s] wrote [%s] status for task [%s] on [%s]",
+ "Worker[%s] wrote status[%s] for task[%s] on location[%s]",
worker.getHost(),
announcement.getTaskStatus().getStatusCode(),
taskId,
announcement.getTaskLocation()
);
- HttpRemoteTaskRunnerWorkItem taskItem;
- boolean shouldShutdownTask = false;
- boolean isTaskCompleted = false;
-
- synchronized (statusLock) {
- taskItem = tasks.get(taskId);
- if (taskItem == null) {
- // Try to find information about it in the TaskStorage
- Optional<TaskStatus> knownStatusInStorage =
taskStorage.getStatus(taskId);
-
- if (knownStatusInStorage.isPresent()) {
- switch (knownStatusInStorage.get().getStatusCode()) {
- case RUNNING:
- taskItem = new HttpRemoteTaskRunnerWorkItem(
- taskId,
- worker,
- TaskLocation.unknown(),
- null,
- announcement.getTaskType(),
- HttpRemoteTaskRunnerWorkItem.State.RUNNING
- );
- tasks.put(taskId, taskItem);
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
- emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L));
- break;
- case SUCCESS:
- case FAILED:
- if (!announcement.getTaskStatus().isComplete()) {
- log.info(
- "Worker[%s] reported status for completed, known from
taskStorage, task[%s]. Ignored.",
- worker.getHost(),
- taskId
- );
- }
- break;
- default:
- log.makeAlert(
- "Found unrecognized state[%s] of task[%s] in taskStorage.
Notification[%s] from worker[%s] is ignored.",
- knownStatusInStorage.get().getStatusCode(),
- taskId,
- announcement,
- worker.getHost()
- ).emit();
- }
- } else {
- log.warn(
- "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.",
- worker.getHost(),
- announcement.getStatus(),
- taskId
- );
- }
- }
-
- if (taskItem == null) {
- if (!announcement.getTaskStatus().isComplete()) {
- shouldShutdownTask = true;
- }
- } else {
- switch (announcement.getTaskStatus().getStatusCode()) {
- case RUNNING:
- switch (taskItem.getState()) {
- case PENDING:
- case PENDING_WORKER_ASSIGN:
- taskItem.setWorker(worker);
- taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
- log.info("Task[%s] started RUNNING on worker[%s].", taskId,
worker.getHost());
-
- final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder,
taskItem.getTask());
- emitter.emit(metricBuilder.setMetric(
- "task/pending/time",
- new Duration(taskItem.getCreatedTime(),
DateTimes.nowUtc()).getMillis())
- );
+ final AtomicBoolean shouldShutdownTask = new AtomicBoolean(false);
+ final AtomicBoolean isTaskCompleted = new AtomicBoolean(false);
- // fall through
- case RUNNING:
- if (worker.getHost().equals(taskItem.getWorker().getHost())) {
- if
(!announcement.getTaskLocation().equals(taskItem.getLocation())) {
+ tasks.compute(
+ taskId,
+ (key, taskEntry) -> {
+ if (taskEntry == null) {
+ // Try to find information about it in the TaskStorage
+ Optional<TaskStatus> knownStatusInStorage =
taskStorage.getStatus(taskId);
Review Comment:
> I see the old code did it under statusLock, and also there's a resolved
conversation about keeping this here. It's fine to keep it here, I suppose, but
please include a comment about how this does a metadata call and may cause
contention on tasks.
Yes, I can add a comment. The key point is this will only lock a (hopefully
small) subset of the task keys, allowing other tasks to continue their work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]