kfaraz commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2120102975


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            statusUpdatesInQueue.incrementAndGet();
             TaskStatus status = TaskStatus.failure(
                 task.getId(),
                 "Failed to run task. See overlord logs for more details."
             );
-            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
-          }
-
-          private void handleStatus(final TaskStatus status)
-          {
-            try {
-              // If we're not supposed to be running anymore, don't do 
anything. Somewhat racey if the flag gets set
-              // after we check and before we commit the database transaction, 
but better than nothing.
-              if (!active) {
-                log.info("Abandoning task [%s] due to shutdown.", 
task.getId());
-                return;
-              }
-
-              updateTaskEntry(
-                  task.getId(),
-                  entry -> notifyStatus(entry, status, "notified status change 
from task")
-              );
-
-              // Emit event and log, if the task is done
-              if (status.isComplete()) {
-                IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
-                emitter.emit(metricBuilder.setMetric("task/run/time", 
status.getDuration()));
-
-                log.info(
-                    "Completed task[%s] with status[%s] in [%d]ms.",
-                    task.getId(), status.getStatusCode(), status.getDuration()
-                );
-
-                if (status.isSuccess()) {
-                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
-                } else {
-                  Counters.incrementAndGetLong(totalFailedTaskCount, 
getMetricKey(task));
-                }
-              }
-            }
-            catch (Exception e) {
-              log.makeAlert(e, "Failed to handle task status")
-                 .addData("task", task.getId())
-                 .addData("statusCode", status.getStatusCode())
-                 .emit();
-            }
-            finally {
-              statusUpdatesInQueue.decrementAndGet();
-              handledStatusUpdates.incrementAndGet();
-            }
+            taskCompleteCallbackExecutor.execute(() -> handleStatusWrapper(

Review Comment:
   No, I mean we should do the statusUpdatesInQueue count increment and 
decrement only for the callbacks. It's up to you how you do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to