kfaraz commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2119984489
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine. We'll come back and try to
- // shut it down again later in manageInternalPostCritical, once it's
removed from the "tasks" map.
+ // If task runner shutdown fails, continue with the task shutdown
routine.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
}
removeTaskLock(task);
requestManagement();
+
+ // Emit event and log
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+ emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+ );
+
+ if (taskStatus.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+ }
+ }
+
+ /**
+ * Handles updating a task's status and incrementing/decrementing the status
update accounting metrics
+ */
+ private void handleStatusWrapper(
+ final String taskId,
+ final TaskStatus status,
+ final String reasonFormat,
+ final Object... args
+ )
+ {
+ statusUpdatesInQueue.incrementAndGet();
+ startStopLock.readLock().lock();
+ try {
+ // If we're not supposed to be running anymore, don't do anything.
Somewhat racey if the flag gets set
+ // after we check and before we commit the database transaction, but
better than nothing.
+ if (!active) {
+ log.info("Abandoning task [%s] due to shutdown.", taskId);
+ return;
+ }
+
+ updateTaskEntry(
+ taskId,
+ entry -> notifyCompletedStatus(entry, status, reasonFormat, args)
+ );
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to handle task status")
+ .addData("task", taskId)
+ .addData("statusCode", status.getStatusCode())
+ .emit();
+ }
+ finally {
+ startStopLock.readLock().unlock();
+ statusUpdatesInQueue.decrementAndGet();
+ handledStatusUpdates.incrementAndGet();
Review Comment:
Incrementing the count just before calling `notifyStatus` and decrementing
it right after doesn't add any real value.
In that case, the value of `statusUpdatesInQueue` will just represent the
number of threads currently performing `notifyStatus`, which is not a useful
metric. We want to know how many status updates are in queue and are yet to be
handled.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]