jtuglu-netflix commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2120014907


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            statusUpdatesInQueue.incrementAndGet();
             TaskStatus status = TaskStatus.failure(
                 task.getId(),
                 "Failed to run task. See overlord logs for more details."
             );
-            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
-          }
-
-          private void handleStatus(final TaskStatus status)
-          {
-            try {
-              // If we're not supposed to be running anymore, don't do 
anything. Somewhat racey if the flag gets set
-              // after we check and before we commit the database transaction, 
but better than nothing.
-              if (!active) {
-                log.info("Abandoning task [%s] due to shutdown.", 
task.getId());
-                return;
-              }
-
-              updateTaskEntry(
-                  task.getId(),
-                  entry -> notifyStatus(entry, status, "notified status change 
from task")
-              );
-
-              // Emit event and log, if the task is done
-              if (status.isComplete()) {
-                IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
-                emitter.emit(metricBuilder.setMetric("task/run/time", 
status.getDuration()));
-
-                log.info(
-                    "Completed task[%s] with status[%s] in [%d]ms.",
-                    task.getId(), status.getStatusCode(), status.getDuration()
-                );
-
-                if (status.isSuccess()) {
-                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
-                } else {
-                  Counters.incrementAndGetLong(totalFailedTaskCount, 
getMetricKey(task));
-                }
-              }
-            }
-            catch (Exception e) {
-              log.makeAlert(e, "Failed to handle task status")
-                 .addData("task", task.getId())
-                 .addData("statusCode", status.getStatusCode())
-                 .emit();
-            }
-            finally {
-              statusUpdatesInQueue.decrementAndGet();
-              handledStatusUpdates.incrementAndGet();
-            }
+            taskCompleteCallbackExecutor.execute(() -> handleStatusWrapper(

Review Comment:
   `handleStatus` and `handleStatusWrapper` are synonymous (do the same 
things). Only difference is the latter doesn't do metric/log emission and 
acquires the read lock.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            statusUpdatesInQueue.incrementAndGet();
             TaskStatus status = TaskStatus.failure(
                 task.getId(),
                 "Failed to run task. See overlord logs for more details."
             );
-            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
-          }
-
-          private void handleStatus(final TaskStatus status)
-          {
-            try {
-              // If we're not supposed to be running anymore, don't do 
anything. Somewhat racey if the flag gets set
-              // after we check and before we commit the database transaction, 
but better than nothing.
-              if (!active) {
-                log.info("Abandoning task [%s] due to shutdown.", 
task.getId());
-                return;
-              }
-
-              updateTaskEntry(
-                  task.getId(),
-                  entry -> notifyStatus(entry, status, "notified status change 
from task")
-              );
-
-              // Emit event and log, if the task is done
-              if (status.isComplete()) {
-                IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
-                emitter.emit(metricBuilder.setMetric("task/run/time", 
status.getDuration()));
-
-                log.info(
-                    "Completed task[%s] with status[%s] in [%d]ms.",
-                    task.getId(), status.getStatusCode(), status.getDuration()
-                );
-
-                if (status.isSuccess()) {
-                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
-                } else {
-                  Counters.incrementAndGetLong(totalFailedTaskCount, 
getMetricKey(task));
-                }
-              }
-            }
-            catch (Exception e) {
-              log.makeAlert(e, "Failed to handle task status")
-                 .addData("task", task.getId())
-                 .addData("statusCode", status.getStatusCode())
-                 .emit();
-            }
-            finally {
-              statusUpdatesInQueue.decrementAndGet();
-              handledStatusUpdates.incrementAndGet();
-            }
+            taskCompleteCallbackExecutor.execute(() -> handleStatusWrapper(

Review Comment:
   `handleStatus` and `handleStatusWrapper` are synonymous (do the same things) 
– a bit poor naming on my part. Only difference is the latter doesn't do 
metric/log emission and acquires the read lock.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to