kfaraz commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2120040191


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
       taskRunner.shutdown(task.getId(), reasonFormat, args);
     }
     catch (Throwable e) {
-      // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
-      // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
+      // If task runner shutdown fails, continue with the task shutdown 
routine.
       log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
     }
 
     removeTaskLock(task);
     requestManagement();
+
+    // Emit event and log
+    final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+    emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+    log.info(
+        "Completed task[%s] with status[%s] in [%d]ms.",
+        task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+    );
+
+    if (taskStatus.isSuccess()) {
+      Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
+    } else {
+      Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+    }
+  }
+
+  /**
+   * Handles updating a task's status and incrementing/decrementing the status 
update accounting metrics
+   */
+  private void handleStatusWrapper(
+      final String taskId,
+      final TaskStatus status,
+      final String reasonFormat,
+      final Object... args
+  )
+  {
+    statusUpdatesInQueue.incrementAndGet();
+    startStopLock.readLock().lock();
+    try {
+      // If we're not supposed to be running anymore, don't do anything. 
Somewhat racey if the flag gets set
+      // after we check and before we commit the database transaction, but 
better than nothing.
+      if (!active) {
+        log.info("Abandoning task [%s] due to shutdown.", taskId);
+        return;
+      }
+
+      updateTaskEntry(
+          taskId,
+          entry -> notifyCompletedStatus(entry, status, reasonFormat, args)
+      );
+    }
+    catch (Exception e) {
+      log.makeAlert(e, "Failed to handle task status")
+         .addData("task", taskId)
+         .addData("statusCode", status.getStatusCode())
+         .emit();
+    }
+    finally {
+      startStopLock.readLock().unlock();
+      statusUpdatesInQueue.decrementAndGet();
+      handledStatusUpdates.incrementAndGet();

Review Comment:
   Yes, in the current code, decrementing here was fine since this method was 
invoked only for the callbacks.
   But now, since we are calling it from new places too (which are running on 
their own separate thread), we
   should move the decrement to a different method which is invoked only by the 
callbacks.|
   
   Please let me know if that makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to