jtuglu1 commented on code in PR #18851:
URL: https://github.com/apache/druid/pull/18851#discussion_r2648669766


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java:
##########
@@ -1521,206 +1456,215 @@ public void taskAddedOrUpdated(final TaskAnnouncement 
announcement, final Worker
     final Worker worker = workerHolder.getWorker();
 
     log.debug(
-        "Worker[%s] wrote [%s] status for task [%s] on [%s]",
+        "Worker[%s] wrote status[%s] for task[%s] on [%s]",
         worker.getHost(),
         announcement.getTaskStatus().getStatusCode(),
         taskId,
         announcement.getTaskLocation()
     );
 
-    HttpRemoteTaskRunnerWorkItem taskItem;
-    boolean shouldShutdownTask = false;
-    boolean isTaskCompleted = false;
-
-    synchronized (statusLock) {
-      taskItem = tasks.get(taskId);
-      if (taskItem == null) {
-        // Try to find information about it in the TaskStorage
-        Optional<TaskStatus> knownStatusInStorage = 
taskStorage.getStatus(taskId);
-
-        if (knownStatusInStorage.isPresent()) {
-          switch (knownStatusInStorage.get().getStatusCode()) {
-            case RUNNING:
-              taskItem = new HttpRemoteTaskRunnerWorkItem(
-                  taskId,
-                  worker,
-                  TaskLocation.unknown(),
-                  null,
-                  announcement.getTaskType(),
-                  HttpRemoteTaskRunnerWorkItem.State.RUNNING
-              );
-              tasks.put(taskId, taskItem);
-              final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-              metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
-              emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 1L));
-              break;
-            case SUCCESS:
-            case FAILED:
-              if (!announcement.getTaskStatus().isComplete()) {
-                log.info(
-                    "Worker[%s] reported status for completed, known from 
taskStorage, task[%s]. Ignored.",
-                    worker.getHost(),
-                    taskId
-                );
-              }
-              break;
-            default:
-              log.makeAlert(
-                  "Found unrecognized state[%s] of task[%s] in taskStorage. 
Notification[%s] from worker[%s] is ignored.",
-                  knownStatusInStorage.get().getStatusCode(),
-                  taskId,
-                  announcement,
-                  worker.getHost()
-              ).emit();
-          }
-        } else {
-          log.warn(
-              "Worker[%s] reported status[%s] for unknown task[%s]. Ignored.",
-              worker.getHost(),
-              announcement.getStatus(),
-              taskId
-          );
-        }
-      }
-
-      if (taskItem == null) {
-        if (!announcement.getTaskStatus().isComplete()) {
-          shouldShutdownTask = true;
-        }
-      } else {
-        switch (announcement.getTaskStatus().getStatusCode()) {
-          case RUNNING:
-            switch (taskItem.getState()) {
-              case PENDING:
-              case PENDING_WORKER_ASSIGN:
-                taskItem.setWorker(worker);
-                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
-                log.info("Task[%s] started RUNNING on worker[%s].", taskId, 
worker.getHost());
-
-                final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
-                IndexTaskUtils.setTaskDimensions(metricBuilder, 
taskItem.getTask());
-                emitter.emit(metricBuilder.setMetric(
-                    "task/pending/time",
-                    new Duration(taskItem.getCreatedTime(), 
DateTimes.nowUtc()).getMillis())
-                );
+    final AtomicBoolean shouldShutdownTask = new AtomicBoolean(false);
+    final AtomicBoolean isTaskCompleted = new AtomicBoolean(false);
 
-                // fall through
-              case RUNNING:
-                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
-                  if 
(!announcement.getTaskLocation().equals(taskItem.getLocation())) {
+    tasks.compute(
+        taskId,
+        (key, taskEntry) -> {
+          if (taskEntry == null) {
+            // Try to find information about it in the TaskStorage
+            Optional<TaskStatus> knownStatusInStorage = 
taskStorage.getStatus(taskId);
+
+            if (knownStatusInStorage.isPresent()) {
+              switch (knownStatusInStorage.get().getStatusCode()) {
+                case RUNNING:
+                  taskEntry = new HttpRemoteTaskRunnerWorkItem(
+                      taskId,
+                      worker,
+                      TaskLocation.unknown(),
+                      null,
+                      announcement.getTaskType(),
+                      HttpRemoteTaskRunnerWorkItem.State.RUNNING
+                  );
+                  final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+                  metricBuilder.setDimension(DruidMetrics.TASK_ID, taskId);
+                  emitter.emit(metricBuilder.setMetric(TASK_DISCOVERED_COUNT, 
1L));
+                  break;
+                case SUCCESS:
+                case FAILED:
+                  if (!announcement.getTaskStatus().isComplete()) {
                     log.info(
-                        "Task[%s] location changed on worker[%s]. new 
location[%s].",
-                        taskId,
+                        "Worker[%s] reported status for completed, known from 
taskStorage, task[%s]. Ignored.",
                         worker.getHost(),
-                        announcement.getTaskLocation()
+                        taskId
                     );
-                    taskItem.setLocation(announcement.getTaskLocation());
-                    TaskRunnerUtils.notifyLocationChanged(listeners, taskId, 
announcement.getTaskLocation());
                   }
-                } else {
-                  log.warn(
-                      "Found worker[%s] running task[%s] which is being run by 
another worker[%s]. Notification ignored.",
-                      worker.getHost(),
+                  break;
+                default:
+                  log.makeAlert(
+                      "Found unrecognized state[%s] of task[%s] in 
taskStorage. Notification[%s] from worker[%s] is ignored.",
+                      knownStatusInStorage.get().getStatusCode(),
                       taskId,
-                      taskItem.getWorker().getHost()
-                  );
-                  shouldShutdownTask = true;
-                }
-                break;
-              case COMPLETE:
-                log.warn(
-                    "Worker[%s] reported status for completed task[%s]. 
Ignored.",
-                    worker.getHost(),
-                    taskId
-                );
-                shouldShutdownTask = true;
-                break;
-              default:
-                log.makeAlert(
-                    "Found unrecognized state[%s] of task[%s]. 
Notification[%s] from worker[%s] is ignored.",
-                    taskItem.getState(),
-                    taskId,
-                    announcement,
-                    worker.getHost()
-                ).emit();
+                      announcement,
+                      worker.getHost()
+                  ).emit();
+              }
+            } else {
+              log.warn(
+                  "Worker[%s] reported status[%s] for unknown task[%s]. 
Ignored.",
+                  worker.getHost(),
+                  announcement.getStatus(),
+                  taskId
+              );
             }
-            break;
-          case FAILED:
-          case SUCCESS:
-            switch (taskItem.getState()) {
-              case PENDING:
-              case PENDING_WORKER_ASSIGN:
-                taskItem.setWorker(worker);
-                taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
-                log.info("Task[%s] finished on worker[%s].", taskId, 
worker.getHost());
-                // fall through
+          }
+
+          if (taskEntry == null) {
+            if (!announcement.getTaskStatus().isComplete()) {
+              shouldShutdownTask.set(true);
+            }
+          } else {
+            switch (announcement.getTaskStatus().getStatusCode()) {
               case RUNNING:
-                if (worker.getHost().equals(taskItem.getWorker().getHost())) {
-                  if 
(!announcement.getTaskLocation().equals(taskItem.getLocation())) {
-                    log.info(
-                        "Task[%s] location changed on worker[%s]. new 
location[%s].",
-                        taskId,
-                        worker.getHost(),
-                        announcement.getTaskLocation()
+                switch (taskEntry.getState()) {
+                  case PENDING:
+                  case PENDING_WORKER_ASSIGN:
+                    taskEntry.setWorker(worker);
+                    
taskEntry.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING);
+                    log.info("Task[%s] started RUNNING on worker[%s].", 
taskId, worker.getHost());
+
+                    final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
+                    IndexTaskUtils.setTaskDimensions(metricBuilder, 
taskEntry.getTask());
+                    emitter.emit(metricBuilder.setMetric(

Review Comment:
   Let's leave as-is for now. Metric creation + is low overhead/asynchronous 
and listener notification isn't too much overhead (top-level submission is with 
a direct executor, but the execution spills into an async threadpool).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to