kfaraz commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2119928627


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -443,8 +447,13 @@ private void startPendingTaskOnRunner(TaskEntry entry, 
ListenableFuture<TaskStat
                   e.getMessage()
               );
             }
+            statusUpdatesInQueue.incrementAndGet();

Review Comment:
   This should be incremented just before adding items to the 
`taskCompleteCallbackExecutor`, not here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            statusUpdatesInQueue.incrementAndGet();
             TaskStatus status = TaskStatus.failure(
                 task.getId(),
                 "Failed to run task. See overlord logs for more details."
             );
-            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
-          }
-
-          private void handleStatus(final TaskStatus status)
-          {
-            try {
-              // If we're not supposed to be running anymore, don't do 
anything. Somewhat racey if the flag gets set
-              // after we check and before we commit the database transaction, 
but better than nothing.
-              if (!active) {
-                log.info("Abandoning task [%s] due to shutdown.", 
task.getId());
-                return;
-              }
-
-              updateTaskEntry(
-                  task.getId(),
-                  entry -> notifyStatus(entry, status, "notified status change 
from task")
-              );
-
-              // Emit event and log, if the task is done
-              if (status.isComplete()) {
-                IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
-                emitter.emit(metricBuilder.setMetric("task/run/time", 
status.getDuration()));
-
-                log.info(
-                    "Completed task[%s] with status[%s] in [%d]ms.",
-                    task.getId(), status.getStatusCode(), status.getDuration()
-                );
-
-                if (status.isSuccess()) {
-                  Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
-                } else {
-                  Counters.incrementAndGetLong(totalFailedTaskCount, 
getMetricKey(task));
-                }
-              }
-            }
-            catch (Exception e) {
-              log.makeAlert(e, "Failed to handle task status")
-                 .addData("task", task.getId())
-                 .addData("statusCode", status.getStatusCode())
-                 .emit();
-            }
-            finally {
-              statusUpdatesInQueue.decrementAndGet();
-              handledStatusUpdates.incrementAndGet();
-            }
+            taskCompleteCallbackExecutor.execute(() -> handleStatusWrapper(

Review Comment:
   This lambda should first call `handleStatusWrapper` and then update the 
counts for updates etc.
   
   Or since you need to do the same thing for both success and failure, you can 
keep the old `handleStatus` method here which invokes the top-level 
`handleStatusWrapper`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
       taskRunner.shutdown(task.getId(), reasonFormat, args);
     }
     catch (Throwable e) {
-      // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
-      // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
+      // If task runner shutdown fails, continue with the task shutdown 
routine.
       log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
     }
 
     removeTaskLock(task);
     requestManagement();
+
+    // Emit event and log
+    final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+    emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+    log.info(
+        "Completed task[%s] with status[%s] in [%d]ms.",
+        task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+    );
+
+    if (taskStatus.isSuccess()) {
+      Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
+    } else {
+      Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+    }
+  }
+
+  /**
+   * Handles updating a task's status and incrementing/decrementing the status 
update accounting metrics
+   */
+  private void handleStatusWrapper(

Review Comment:
   Please avoid the suffix `wrapper` as it is confusing. Maybe just use 
`handleStatus` or `handleCompletedStatus`.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -663,24 +648,33 @@ public void shutdownWithSuccess(final String taskId, 
String reasonFormat, Object
    * <li>Request {@link #taskRunner} to shutdown task (synchronously)</li>
    * <li>Remove all locks for task from metadata storage</li>
    * <li>Request task management</li>
+   * <li>Perform Task accounting metrics/logs</li>
    * </ul>
    * This method does not remove the task from {@link #activeTasks} to avoid
    * race conditions with {@link #syncFromStorage()}.
    * <p>
-   * Since this operation involves DB updates and synchronous remote calls, it
-   * must be invoked on a dedicated executor so that task runner and worker 
sync
-   * is not blocked.
+   * Since this operation is intended to be performed under one of activeTasks 
hash segment locks, involves DB updates
+   * and synchronous remote calls, it must be invoked on a dedicated executor 
so that task runner and worker sync
+   * are not blocked.
    *
    * @throws NullPointerException     if task or status is null
    * @throws IllegalArgumentException if the task ID does not match the status 
ID
    * @throws IllegalStateException    if this queue is currently shut down
    * @see #removeTaskInternal
    */
-  private void notifyStatus(final TaskEntry entry, final TaskStatus 
taskStatus, String reasonFormat, Object... args)
+  private void notifyCompletedStatus(
+      final TaskEntry entry,
+      final TaskStatus taskStatus,
+      String reasonFormat,
+      Object... args
+  )
   {
     // Don't do anything if the task has no entry in activeTasks
     if (entry == null) {
       return;
+    } else if (entry.isComplete) {
+      // A callback() or shutdown() beat us to updating the status and has 
already cleaned up this task

Review Comment:
   Let's log an info here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1058,15 +1067,17 @@ private void validateTaskPayload(Task task)
       String payload = passwordRedactingMapper.writeValueAsString(task);
       if (config.getMaxTaskPayloadSize() != null && 
config.getMaxTaskPayloadSize().getBytesInInt() < payload.length()) {
         throw InvalidInput.exception(
-                "Task[%s] has payload of size[%d] but max allowed size is 
[%d]. " +
-                    "Reduce the size of the task payload or increase 
'druid.indexer.queue.maxTaskPayloadSize'.",
-                task.getId(), payload.length(), config.getMaxTaskPayloadSize()
-            );
+            "Task[%s] has payload of size[%d] but max allowed size is [%d]. " +
+            "Reduce the size of the task payload or increase 
'druid.indexer.queue.maxTaskPayloadSize'.",
+            task.getId(), payload.length(), config.getMaxTaskPayloadSize()
+        );
       } else if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
         log.warn(
-            "Task[%s] of datasource[%s] has payload size[%d] larger than the 
recommended maximum[%d]. " +
-                "Large task payloads may cause stability issues in the 
Overlord and may fail while persisting to the metadata store." +
-                "Such tasks may be rejected by the Overlord in future Druid 
versions.",
+            "Task[%s] of datasource[%s] has payload size[%d] larger than the 
recommended maximum[%d]. "
+            +
+            "Large task payloads may cause stability issues in the Overlord 
and may fail while persisting to the metadata store."
+            +
+            "Such tasks may be rejected by the Overlord in future Druid 
versions.",

Review Comment:
   There doesn't seem to be any change here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
       taskRunner.shutdown(task.getId(), reasonFormat, args);
     }
     catch (Throwable e) {
-      // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
-      // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
+      // If task runner shutdown fails, continue with the task shutdown 
routine.
       log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
     }
 
     removeTaskLock(task);
     requestManagement();
+
+    // Emit event and log
+    final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+    emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+    log.info(
+        "Completed task[%s] with status[%s] in [%d]ms.",
+        task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+    );
+
+    if (taskStatus.isSuccess()) {
+      Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
+    } else {
+      Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+    }
+  }
+
+  /**
+   * Handles updating a task's status and incrementing/decrementing the status 
update accounting metrics
+   */
+  private void handleStatusWrapper(
+      final String taskId,
+      final TaskStatus status,
+      final String reasonFormat,
+      final Object... args
+  )
+  {
+    statusUpdatesInQueue.incrementAndGet();
+    startStopLock.readLock().lock();
+    try {
+      // If we're not supposed to be running anymore, don't do anything. 
Somewhat racey if the flag gets set
+      // after we check and before we commit the database transaction, but 
better than nothing.
+      if (!active) {
+        log.info("Abandoning task [%s] due to shutdown.", taskId);
+        return;
+      }
+
+      updateTaskEntry(
+          taskId,
+          entry -> notifyCompletedStatus(entry, status, reasonFormat, args)
+      );
+    }
+    catch (Exception e) {
+      log.makeAlert(e, "Failed to handle task status")
+         .addData("task", taskId)
+         .addData("statusCode", status.getStatusCode())
+         .emit();
+    }
+    finally {
+      startStopLock.readLock().unlock();
+      statusUpdatesInQueue.decrementAndGet();
+      handledStatusUpdates.incrementAndGet();

Review Comment:
   These two should not be done here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final 
TaskStatus taskStatus, St
       taskRunner.shutdown(task.getId(), reasonFormat, args);
     }
     catch (Throwable e) {
-      // If task runner shutdown fails, continue with the task shutdown 
routine. We'll come back and try to
-      // shut it down again later in manageInternalPostCritical, once it's 
removed from the "tasks" map.
+      // If task runner shutdown fails, continue with the task shutdown 
routine.
       log.warn(e, "TaskRunner failed to cleanup task after completion: %s", 
task.getId());
     }
 
     removeTaskLock(task);
     requestManagement();
+
+    // Emit event and log
+    final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+    IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+    emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+    log.info(
+        "Completed task[%s] with status[%s] in [%d]ms.",
+        task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+    );
+
+    if (taskStatus.isSuccess()) {
+      Counters.incrementAndGetLong(totalSuccessfulTaskCount, 
getMetricKey(task));
+    } else {
+      Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+    }
+  }
+
+  /**
+   * Handles updating a task's status and incrementing/decrementing the status 
update accounting metrics
+   */
+  private void handleStatusWrapper(
+      final String taskId,
+      final TaskStatus status,
+      final String reasonFormat,
+      final Object... args
+  )
+  {
+    statusUpdatesInQueue.incrementAndGet();

Review Comment:
   This should not be called here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -443,8 +447,13 @@ private void startPendingTaskOnRunner(TaskEntry entry, 
ListenableFuture<TaskStat
                   e.getMessage()
               );
             }
+            statusUpdatesInQueue.incrementAndGet();
+
             TaskStatus taskStatus = TaskStatus.failure(task.getId(), 
errorMessage);
-            notifyStatus(entry, taskStatus, taskStatus.getErrorMsg());
+            notifyCompletedStatus(entry, taskStatus, taskStatus.getErrorMsg());
+
+            statusUpdatesInQueue.decrementAndGet();
+            handledStatusUpdates.incrementAndGet();

Review Comment:
   These two are not needed here.



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            statusUpdatesInQueue.incrementAndGet();

Review Comment:
   We should continue to do this here.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -443,9 +461,8 @@ public void 
testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedE
 
     // Verify that metrics are emitted on receiving announcement
     serviceEmitter.verifyEmitted("task/run/time", 1);
-    CoordinatorRunStats stats = taskQueue.getQueueStats();
-    Assert.assertEquals(0L, 
stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE));
-    Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
+    verifyTaskStatusCounts(taskQueue, 0, 1);
+    verifyStatusUpdateCounts(taskQueue, 0, 2);

Review Comment:
   To make this more readable, please invoke the verify methods separately for 
each metric like:
   
   ```
   verifyMetric(metricName, expectedValue);
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java:
##########
@@ -77,7 +80,20 @@ public void setUpIngestionTestBase() throws IOException
           @Override
           public ListenableFuture<TaskStatus> run(Task task)
           {
-            return Futures.immediateFuture(TaskStatus.success(task.getId()));
+            // Skip the initialization; we just need to simulate a delayed 
future
+            Preconditions.checkArgument(task instanceof NoopTask, "task must 
be an instance of NoopTask");
+            final SettableFuture<TaskStatus> future = SettableFuture.create();
+
+            taskExecutor.submit(() -> {
+              try {
+                TaskStatus status = ((NoopTask) task).runTask(null);
+                future.set(status);
+              }
+              catch (Exception e) {
+                future.setException(e);
+              }
+            });
+            return future;

Review Comment:
   Why is this needed exactly?
   
   If needed to test out some specific behaviour, we should add targeted test 
cases,
   where we just return an incomplete future here and decide in the test case 
when
   we want to complete that future.



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -623,6 +640,25 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
     );
   }
 
+  private static void verifyTaskStatusCounts(final TaskQueue taskQueue, int 
successCount, int failureCount)
+  {
+    Assert.assertEquals(
+        successCount,
+        
taskQueue.getSuccessfulTaskCount().values().stream().mapToLong(Long::longValue).sum()
+    );
+    Assert.assertEquals(
+        failureCount,
+        
taskQueue.getFailedTaskCount().values().stream().mapToLong(Long::longValue).sum()

Review Comment:
   It's better to keep these two assertions in separate methods as that would 
make for more readable test cases.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to