jtuglu-netflix commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2119998563
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine. We'll come back and try to
- // shut it down again later in manageInternalPostCritical, once it's
removed from the "tasks" map.
+ // If task runner shutdown fails, continue with the task shutdown
routine.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
}
removeTaskLock(task);
requestManagement();
+
+ // Emit event and log
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+ emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+ );
+
+ if (taskStatus.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+ }
+ }
+
+ /**
+ * Handles updating a task's status and incrementing/decrementing the status
update accounting metrics
+ */
+ private void handleStatusWrapper(
+ final String taskId,
+ final TaskStatus status,
+ final String reasonFormat,
+ final Object... args
+ )
+ {
+ statusUpdatesInQueue.incrementAndGet();
+ startStopLock.readLock().lock();
+ try {
+ // If we're not supposed to be running anymore, don't do anything.
Somewhat racey if the flag gets set
+ // after we check and before we commit the database transaction, but
better than nothing.
+ if (!active) {
+ log.info("Abandoning task [%s] due to shutdown.", taskId);
+ return;
+ }
+
+ updateTaskEntry(
+ taskId,
+ entry -> notifyCompletedStatus(entry, status, reasonFormat, args)
+ );
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to handle task status")
+ .addData("task", taskId)
+ .addData("statusCode", status.getStatusCode())
+ .emit();
+ }
+ finally {
+ startStopLock.readLock().unlock();
+ statusUpdatesInQueue.decrementAndGet();
+ handledStatusUpdates.incrementAndGet();
Review Comment:
I'll note that currently, the `master` code does the decrements in the same
exact spot as this is doing it. Not sure where else one would put the
decrements as they signal where the callback completes for a `taskStatus`
update (and all blocking resources are released). I agree that the increment
should be placed before the executor.submit() if we want to measure queueing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]