This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 40d0dc9e0e Use separate executor to handle task updates in TaskQueue 
(#14533)
40d0dc9e0e is described below

commit 40d0dc9e0e02c89f91762a400ecdd84b950907ca
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 7 20:43:12 2023 +0530

    Use separate executor to handle task updates in TaskQueue (#14533)
    
    Description:
    `TaskQueue.notifyStatus` is often a heavy call as it performs the following 
operations:
    - Update task status in metadata DB
    - Update task locks in metadata DB
    - Request (synchronously) the task runner to shutdown the completed task
    - Clean up in-memory data structures
    
    This method can often be slow and can cause worker sync / task runners to 
slow down.
    
    Main changes:
    - Run task completion callbacks in a separate executor to handle task 
completion updates
    - Add new config `druid.indexer.queue.taskCompleteHandlerNumThreads`
    - Add metrics to monitor number of processed and queued items
    - There are still other paths that can invoke `notifyStatus`, but those 
need not be moved to
    the new executor as they are synchronous on purpose.
    
    Other changes:
    - Add new metrics `task/status/queue/count`, `task/status/handled/count`
    - Add `TaskCountStatsProvider.getStats()` which deprecates the other 
`getXXXTaskCount` methods.
    - Use `CoordinatorRunStats` to collect and report metrics. This class has 
been used as is
    for now but will later be renamed and repurposed to use across all Druid 
services.
---
 .../org/apache/druid/indexing/overlord/Stats.java  |  40 +++-----
 .../apache/druid/indexing/overlord/TaskMaster.java |  12 +++
 .../apache/druid/indexing/overlord/TaskQueue.java  | 108 +++++++++++++++------
 .../indexing/overlord/config/TaskQueueConfig.java  |  12 ++-
 .../indexing/overlord/TaskLockConfigTest.java      |   2 +-
 .../indexing/overlord/TaskQueueScaleTest.java      |   2 +-
 .../druid/indexing/overlord/TaskQueueTest.java     |  68 +++++++------
 .../druid/indexing/overlord/http/OverlordTest.java |   5 +-
 .../balancer/RandomBalancerStrategy.java           |   4 +-
 .../coordinator/stats/CoordinatorRunStats.java     |  20 ++++
 .../server/metrics/TaskCountStatsMonitor.java      |  25 +++++
 .../server/metrics/TaskCountStatsProvider.java     |  16 +++
 .../server/metrics/TaskSlotCountStatsProvider.java |   5 +
 .../coordinator/CoordinatorRunStatsTest.java       |  22 +++++
 .../server/metrics/TaskCountStatsMonitorTest.java  |  21 +++-
 15 files changed, 268 insertions(+), 94 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
 b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
similarity index 51%
copy from 
server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
copy to 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
index a96f8ce062..4d62111e62 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
@@ -17,34 +17,20 @@
  * under the License.
  */
 
-package org.apache.druid.server.metrics;
+package org.apache.druid.indexing.overlord;
 
-import java.util.Map;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
 
-public interface TaskCountStatsProvider
+/**
+ * Task-level stats emitted as metrics.
+ */
+public class Stats
 {
-  /**
-   * Return the number of successful tasks for each datasource during emission 
period.
-   */
-  Map<String, Long> getSuccessfulTaskCount();
-
-  /**
-   * Return the number of failed tasks for each datasource during emission 
period.
-   */
-  Map<String, Long> getFailedTaskCount();
-
-  /**
-   * Return the number of current running tasks for each datasource.
-   */
-  Map<String, Long> getRunningTaskCount();
-
-  /**
-   * Return the number of current pending tasks for each datasource.
-   */
-  Map<String, Long> getPendingTaskCount();
-
-  /**
-   * Return the number of current waiting tasks for each datasource.
-   */
-  Map<String, Long> getWaitingTaskCount();
+  public static class TaskQueue
+  {
+    public static final CoordinatorStat STATUS_UPDATES_IN_QUEUE
+        = CoordinatorStat.toDebugAndEmit("queuedStatusUpdates", 
"task/status/queue/count");
+    public static final CoordinatorStat HANDLED_STATUS_UPDATES
+        = CoordinatorStat.toDebugAndEmit("handledStatusUpdates", 
"task/status/updated/count");
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 1eab403585..e4d26b3757 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.server.DruidNode;
 import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.metrics.TaskCountStatsProvider;
 import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
 
@@ -362,6 +363,17 @@ public class TaskMaster implements TaskCountStatsProvider, 
TaskSlotCountStatsPro
     }
   }
 
+  @Override
+  public CoordinatorRunStats getStats()
+  {
+    Optional<TaskQueue> taskQueue = getTaskQueue();
+    if (taskQueue.isPresent()) {
+      return taskQueue.get().getQueueStats();
+    } else {
+      return CoordinatorRunStats.empty();
+    }
+  }
+
   private void gracefulStopLeaderLifecycle()
   {
     try {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 0beccce3dd..89ccfc606d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -46,6 +46,7 @@ import 
org.apache.druid.indexing.overlord.config.TaskLockConfig;
 import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
 import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -53,6 +54,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.utils.CollectionUtils;
 
 import java.util.ArrayList;
@@ -71,6 +73,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -97,7 +100,10 @@ public class TaskQueue
   @GuardedBy("giant")
   private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new 
HashMap<>();
 
-  // Tasks that are in the process of being cleaned up by notifyStatus. 
Prevents manageInternal from re-launching them.
+  /**
+   * Tasks that have recently completed and are being cleaned up. These tasks
+   * should not be relaunched by task management.
+   */
   @GuardedBy("giant")
   private final Set<String> recentlyCompletedTasks = new HashSet<>();
 
@@ -124,6 +130,12 @@ public class TaskQueue
           .setNameFormat("TaskQueue-StorageSync").build()
   );
 
+  /**
+   * Dedicated executor for task completion callbacks, to ensure that task 
runner
+   * and worker sync operations are not blocked.
+   */
+  private final ExecutorService taskCompleteCallbackExecutor;
+
   private volatile boolean active = false;
 
   private static final EmittingLogger log = new 
EmittingLogger(TaskQueue.class);
@@ -135,6 +147,9 @@ public class TaskQueue
   @GuardedBy("totalFailedTaskCount")
   private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
 
+  private final AtomicInteger statusUpdatesInQueue = new AtomicInteger();
+  private final AtomicInteger handledStatusUpdates = new AtomicInteger();
+
   public TaskQueue(
       TaskLockConfig lockConfig,
       TaskQueueConfig config,
@@ -154,6 +169,10 @@ public class TaskQueue
     this.taskActionClientFactory = 
Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory");
     this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox");
     this.emitter = Preconditions.checkNotNull(emitter, "emitter");
+    this.taskCompleteCallbackExecutor = Execs.multiThreaded(
+        config.getTaskCompleteHandlerNumThreads(),
+        "TaskQueue-OnComplete-%d"
+    );
   }
 
   @VisibleForTesting
@@ -421,7 +440,8 @@ public class TaskQueue
             continue;
           }
         }
-        taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
+        attachCallbacks(task, runnerTaskFuture);
+        taskFutures.put(task.getId(), runnerTaskFuture);
       } else if (isTaskPending(task)) {
         // if the taskFutures contain this task and this task is pending, also 
let the taskRunner
         // to run it to guarantee it will be assigned to run
@@ -587,12 +607,22 @@ public class TaskQueue
   }
 
   /**
-   * Notify this queue that some task has an updated status. If this update is 
valid, the status will be persisted in
-   * the task storage facility. If the status is a completed status, the task 
will be unlocked and no further
-   * updates will be accepted.
-   *
-   * @param task       task to update
-   * @param taskStatus new task status
+   * Notifies this queue that the given task has an updated status. If this 
update
+   * is valid and task is now complete, the following operations are performed:
+   * <ul>
+   * <li>Add task to {@link #recentlyCompletedTasks} to prevent re-launching 
them</li>
+   * <li>Persist new status in the metadata storage to safeguard against 
crashes
+   * and leader re-elections</li>
+   * <li>Request {@link #taskRunner} to shutdown task (synchronously)</li>
+   * <li>Remove all locks for task from metadata storage</li>
+   * <li>Remove task entry from {@link #tasks} and {@link #taskFutures}</li>
+   * <li>Remove task from {@link #recentlyCompletedTasks}</li>
+   * <li>Request task management</li>
+   * </ul>
+   * <p>
+   * Since this operation involves DB updates and synchronous remote calls, it
+   * must be invoked on a dedicated executor so that task runner and worker 
sync
+   * is not blocked.
    *
    * @throws NullPointerException     if task or status is null
    * @throws IllegalArgumentException if the task ID does not match the status 
ID
@@ -661,7 +691,7 @@ public class TaskQueue
       if (removeTaskInternal(task.getId())) {
         taskFutures.remove(task.getId());
       } else {
-        log.warn("Unknown task completed: %s", task.getId());
+        log.warn("Unknown task[%s] completed", task.getId());
       }
 
       recentlyCompletedTasks.remove(task.getId());
@@ -673,14 +703,17 @@ public class TaskQueue
   }
 
   /**
-   * Attach success and failure handlers to a task status future, such that 
when it completes, we perform the
-   * appropriate updates.
-   *
-   * @param statusFuture a task status future
-   *
-   * @return the same future, for convenience
+   * Attaches callbacks to the task status future to update application state 
when
+   * the task completes. Submits a job to handle the status on the dedicated
+   * {@link #taskCompleteCallbackExecutor}.
+   * <p>
+   * The {@code onSuccess} and {@code onFailure} methods will however run on 
the
+   * executor providing the {@code statusFuture} itself (typically the worker 
sync executor).
+   * This has been done in order to track metrics for in-flight status updates
+   * immediately. Thus, care must be taken to ensure that the success/failure
+   * methods remain lightweight enough to keep the sync executor unblocked.
    */
-  private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final 
ListenableFuture<TaskStatus> statusFuture)
+  private void attachCallbacks(final Task task, final 
ListenableFuture<TaskStatus> statusFuture)
   {
     final ServiceMetricEvent.Builder metricBuilder = new 
ServiceMetricEvent.Builder();
     IndexTaskUtils.setTaskDimensions(metricBuilder, task);
@@ -692,8 +725,9 @@ public class TaskQueue
           @Override
           public void onSuccess(final TaskStatus status)
           {
-            log.info("Received %s status for task: %s", 
status.getStatusCode(), status.getId());
-            handleStatus(status);
+            log.info("Received status[%s] for task[%s].", 
status.getStatusCode(), status.getId());
+            statusUpdatesInQueue.incrementAndGet();
+            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
           }
 
           @Override
@@ -704,9 +738,12 @@ public class TaskQueue
                .addData("type", task.getType())
                .addData("dataSource", task.getDataSource())
                .emit();
-            handleStatus(
-                TaskStatus.failure(task.getId(), "Failed to run this task. See 
overlord logs for more details.")
+            statusUpdatesInQueue.incrementAndGet();
+            TaskStatus status = TaskStatus.failure(
+                task.getId(),
+                "Failed to run task. See overlord logs for more details."
             );
+            taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
           }
 
           private void handleStatus(final TaskStatus status)
@@ -715,7 +752,7 @@ public class TaskQueue
               // If we're not supposed to be running anymore, don't do 
anything. Somewhat racey if the flag gets set
               // after we check and before we commit the database transaction, 
but better than nothing.
               if (!active) {
-                log.info("Abandoning task due to shutdown: %s", task.getId());
+                log.info("Abandoning task [%s] due to shutdown.", 
task.getId());
                 return;
               }
 
@@ -727,10 +764,8 @@ public class TaskQueue
                 emitter.emit(metricBuilder.build("task/run/time", 
status.getDuration()));
 
                 log.info(
-                    "Task %s: %s (%d run duration)",
-                    status.getStatusCode(),
-                    task.getId(),
-                    status.getDuration()
+                    "Completed task[%s] with status[%s] in [%d]ms.",
+                    task.getId(), status.getStatusCode(), status.getDuration()
                 );
 
                 if (status.isSuccess()) {
@@ -746,10 +781,15 @@ public class TaskQueue
                  .addData("statusCode", status.getStatusCode())
                  .emit();
             }
+            finally {
+              statusUpdatesInQueue.decrementAndGet();
+              handledStatusUpdates.incrementAndGet();
+            }
           }
-        }
+        },
+        // Use direct executor to track metrics for in-flight updates 
immediately
+        Execs.directExecutor()
     );
-    return statusFuture;
   }
 
   /**
@@ -893,6 +933,20 @@ public class TaskQueue
     }
   }
 
+  public CoordinatorRunStats getQueueStats()
+  {
+    final int queuedUpdates = statusUpdatesInQueue.get();
+    final int handledUpdates = handledStatusUpdates.getAndSet(0);
+    if (queuedUpdates > 0) {
+      log.info("There are [%d] task status updates in queue, handled [%d]", 
queuedUpdates, handledUpdates);
+    }
+
+    final CoordinatorRunStats stats = new CoordinatorRunStats();
+    stats.add(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE, queuedUpdates);
+    stats.add(Stats.TaskQueue.HANDLED_STATUS_UPDATES, handledUpdates);
+    return stats;
+  }
+
   @VisibleForTesting
   List<Task> getTasks()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
index 2b03814b3a..1e7bb1ffa2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
@@ -39,15 +39,20 @@ public class TaskQueueConfig
   @JsonProperty
   private Duration storageSyncRate;
 
+  @JsonProperty
+  private int taskCompleteHandlerNumThreads;
+
   @JsonCreator
   public TaskQueueConfig(
       @JsonProperty("maxSize") final Integer maxSize,
       @JsonProperty("startDelay") final Period startDelay,
       @JsonProperty("restartDelay") final Period restartDelay,
-      @JsonProperty("storageSyncRate") final Period storageSyncRate
+      @JsonProperty("storageSyncRate") final Period storageSyncRate,
+      @JsonProperty("taskCompleteHandlerNumThreads") final Integer 
taskCompleteHandlerNumThreads
   )
   {
     this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE);
+    this.taskCompleteHandlerNumThreads = 
Configs.valueOrDefault(taskCompleteHandlerNumThreads, 5);
     this.startDelay = defaultDuration(startDelay, "PT1M");
     this.restartDelay = defaultDuration(restartDelay, "PT30S");
     this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");
@@ -58,6 +63,11 @@ public class TaskQueueConfig
     return maxSize;
   }
 
+  public int getTaskCompleteHandlerNumThreads()
+  {
+    return taskCompleteHandlerNumThreads;
+  }
+
   public Duration getStartDelay()
   {
     return startDelay;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
index b158eaca77..364b492148 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -108,7 +108,7 @@ public class TaskLockConfigTest
     } else {
       lockConfig = new TaskLockConfig();
     }
-    final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, 
null);
+    final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null, 
null, null);
     final TaskRunner taskRunner = 
EasyMock.createNiceMock(RemoteTaskRunner.class);
     final TaskActionClientFactory actionClientFactory = 
EasyMock.createNiceMock(LocalTaskActionClientFactory.class);
     final TaskLockbox lockbox = new TaskLockbox(taskStorage, new 
TestIndexerMetadataStorageCoordinator());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index d305b0d6c9..2d2937c814 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -116,7 +116,7 @@ public class TaskQueueScaleTest
 
     taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, Period.millis(1), null, null),
+        new TaskQueueConfig(null, Period.millis(1), null, null, null),
         new DefaultTaskConfig(),
         taskStorage,
         taskRunner,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 6388cdd573..dccc7eba70 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.common.guava.DSuppliers;
 import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
 import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.indexer.RunnerTaskState;
 import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
@@ -64,6 +65,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.apache.druid.metadata.EntryExistsException;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
 import org.apache.druid.server.initialization.IndexerZkConfig;
 import org.apache.druid.server.initialization.ZkPathsConfig;
 import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -81,7 +83,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
 
 public class TaskQueueTest extends IngestionTestBase
 {
@@ -102,7 +103,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -147,7 +148,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -187,7 +188,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -212,7 +213,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig()
         {
           @Override
@@ -247,7 +248,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -279,7 +280,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig()
         {
           @Override
@@ -312,7 +313,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -342,7 +343,7 @@ public class TaskQueueTest extends IngestionTestBase
     final TaskActionClientFactory actionClientFactory = 
createActionClientFactory();
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         new SimpleTaskRunner(actionClientFactory),
@@ -379,7 +380,9 @@ public class TaskQueueTest extends IngestionTestBase
     final HttpRemoteTaskRunner taskRunner = 
createHttpRemoteTaskRunner(ImmutableList.of("t1"));
     final StubServiceEmitter metricsVerifier = new 
StubServiceEmitter("druid/overlord", "testHost");
     WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
-    EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http", 
"worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes();
+    EasyMock.expect(workerHolder.getWorker())
+            .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1", 
WorkerConfig.DEFAULT_CATEGORY))
+            .anyTimes();
     workerHolder.incrementContinuouslyFailedTasksCount();
     EasyMock.expectLastCall();
     workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
@@ -387,7 +390,7 @@ public class TaskQueueTest extends IngestionTestBase
     EasyMock.replay(workerHolder);
     final TaskQueue taskQueue = new TaskQueue(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, null, null, null),
+        new TaskQueueConfig(null, null, null, null, null),
         new DefaultTaskConfig(),
         getTaskStorage(),
         taskRunner,
@@ -399,35 +402,36 @@ public class TaskQueueTest extends IngestionTestBase
     final Task task = new TestTask(
         "t1",
         Intervals.of("2021-01-01/P1D"),
-        ImmutableMap.of(
-            Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
-            false
-        )
+        ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false)
     );
     taskQueue.add(task);
     taskQueue.manageInternal();
-    taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
-        task,
-        TaskStatus.running(task.getId()),
-        TaskLocation.create("worker", 1, 2)
-    ), workerHolder);
-    while (!taskRunner.getRunningTasks()
-        .stream()
-        .map(TaskRunnerWorkItem::getTaskId)
-        .collect(Collectors.toList())
-        .contains(task.getId())) {
+
+    // Announce the task and wait for it to start running
+    final String taskId = task.getId();
+    final TaskLocation taskLocation = TaskLocation.create("worker", 1, 2);
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(task, TaskStatus.running(taskId), 
taskLocation),
+        workerHolder
+    );
+    while (taskRunner.getRunnerTaskState(taskId) != RunnerTaskState.RUNNING) {
       Thread.sleep(100);
     }
-    taskQueue.shutdown(task.getId(), "shutdown");
-    taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
-        task,
-        TaskStatus.failure(task.getId(), "shutdown"),
-        TaskLocation.create("worker", 1, 2)
-    ), workerHolder);
+
+    // Kill the task, send announcement and wait for TaskQueue to handle update
+    taskQueue.shutdown(taskId, "shutdown");
+    taskRunner.taskAddedOrUpdated(
+        TaskAnnouncement.create(task, TaskStatus.failure(taskId, "shutdown"), 
taskLocation),
+        workerHolder
+    );
     taskQueue.manageInternal();
+    Thread.sleep(100);
 
-    metricsVerifier.getEvents();
+    // Verify that metrics are emitted on receiving announcement
     metricsVerifier.verifyEmitted("task/run/time", 1);
+    CoordinatorRunStats stats = taskQueue.getQueueStats();
+    Assert.assertEquals(0L, 
stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE));
+    Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
   }
 
   private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String> 
runningTasks)
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 9738f8b933..0b9c77ef66 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -215,7 +215,7 @@ public class OverlordTest
 
     taskMaster = new TaskMaster(
         new TaskLockConfig(),
-        new TaskQueueConfig(null, new Period(1), null, new Period(10)),
+        new TaskQueueConfig(null, new Period(1), null, new Period(10), null),
         new DefaultTaskConfig(),
         taskLockbox,
         taskStorage,
@@ -329,9 +329,12 @@ public class OverlordTest
 
     response = overlordResource.getCompleteTasks(1, req);
     Assert.assertEquals(1, (((List) response.getEntity()).size()));
+    Assert.assertEquals(1, taskMaster.getStats().rowCount());
 
     taskMaster.stop();
     Assert.assertFalse(taskMaster.isLeader());
+    Assert.assertEquals(0, taskMaster.getStats().rowCount());
+
     EasyMock.verify(taskActionClientFactory);
   }
 
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index cccc2518e8..fc0da34310 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -38,8 +38,6 @@ import java.util.stream.Collectors;
  */
 public class RandomBalancerStrategy implements BalancerStrategy
 {
-  private static final CoordinatorRunStats EMPTY_STATS = new 
CoordinatorRunStats();
-
   @Override
   public Iterator<ServerHolder> findServersToLoadSegment(
       DataSegment segmentToLoad,
@@ -77,6 +75,6 @@ public class RandomBalancerStrategy implements 
BalancerStrategy
   @Override
   public CoordinatorRunStats getAndResetStats()
   {
-    return EMPTY_STATS;
+    return CoordinatorRunStats.empty();
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index c67d84c681..6d0e1b8ae8 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -40,10 +40,30 @@ import java.util.concurrent.atomic.AtomicInteger;
 @ThreadSafe
 public class CoordinatorRunStats
 {
+  private static final CoordinatorRunStats EMPTY_INSTANCE = new 
CoordinatorRunStats()
+  {
+    @Override
+    public void add(CoordinatorStat stat, RowKey rowKey, long value)
+    {
+      throw new UnsupportedOperationException("Cannot add stats to empty 
CoordinatorRunStats instance");
+    }
+
+    @Override
+    public void updateMax(CoordinatorStat stat, RowKey rowKey, long value)
+    {
+      throw new UnsupportedOperationException("Cannot add stats to empty 
CoordinatorRunStats instance");
+    }
+  };
+
   private final ConcurrentHashMap<RowKey, 
Object2LongOpenHashMap<CoordinatorStat>>
       allStats = new ConcurrentHashMap<>();
   private final Map<Dimension, String> debugDimensions = new HashMap<>();
 
+  public static CoordinatorRunStats empty()
+  {
+    return EMPTY_INSTANCE;
+  }
+
   public CoordinatorRunStats()
   {
     this(null);
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
index 6c3394e977..f9cdaee75c 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -23,6 +23,9 @@ import com.google.inject.Inject;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
 
 import java.util.Map;
 
@@ -46,6 +49,15 @@ public class TaskCountStatsMonitor extends AbstractMonitor
     emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
     emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
     emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
+
+    CoordinatorRunStats stats = statsProvider.getStats();
+    if (stats != null) {
+      stats.forEachStat(
+          (stat, dimensions, statValue)
+              -> emit(emitter, stat, dimensions.getValues(), statValue)
+      );
+    }
+
     return true;
   }
 
@@ -60,4 +72,17 @@ public class TaskCountStatsMonitor extends AbstractMonitor
     }
   }
 
+  private void emit(ServiceEmitter emitter, CoordinatorStat stat, 
Map<Dimension, String> dimensionValues, long value)
+  {
+    if (!stat.shouldEmit()) {
+      return;
+    }
+
+    ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder();
+    dimensionValues.forEach(
+        (dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(), 
dimValue)
+    );
+    emitter.emit(eventBuilder.build(stat.getMetricName(), value));
+  }
+
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
index a96f8ce062..1b96047fbd 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.server.metrics;
 
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+
 import java.util.Map;
 
 public interface TaskCountStatsProvider
@@ -26,25 +28,39 @@ public interface TaskCountStatsProvider
   /**
    * Return the number of successful tasks for each datasource during emission 
period.
    */
+  @Deprecated
   Map<String, Long> getSuccessfulTaskCount();
 
   /**
    * Return the number of failed tasks for each datasource during emission 
period.
    */
+  @Deprecated
   Map<String, Long> getFailedTaskCount();
 
   /**
    * Return the number of current running tasks for each datasource.
    */
+  @Deprecated
   Map<String, Long> getRunningTaskCount();
 
   /**
    * Return the number of current pending tasks for each datasource.
    */
+  @Deprecated
   Map<String, Long> getPendingTaskCount();
 
   /**
    * Return the number of current waiting tasks for each datasource.
    */
+  @Deprecated
   Map<String, Long> getWaitingTaskCount();
+
+  /**
+   * Collects all task level stats. This method deprecates the other task stats
+   * methods such as {@link #getPendingTaskCount()}, {@link 
#getWaitingTaskCount()}
+   * and will replace them in a future release.
+   *
+   * @return All task stats collected since the previous invocation of this 
method.
+   */
+  CoordinatorRunStats getStats();
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
 
b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
index e7a7249b10..51b876ce5a 100644
--- 
a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
+++ 
b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
@@ -23,6 +23,11 @@ import javax.annotation.Nullable;
 
 import java.util.Map;
 
+/**
+ * This is deprecated and will be merged into {@link TaskCountStatsProvider} in
+ * a future release.
+ */
+@Deprecated
 public interface TaskSlotCountStatsProvider
 {
   /**
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 1bf111a4f4..6b4a16f140 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -199,6 +199,28 @@ public class CoordinatorRunStatsTest
     Assert.assertEquals(expectedTable, debugStats.buildStatsTable());
   }
 
+  @Test
+  public void testAddToEmptyThrowsException()
+  {
+    CoordinatorRunStats runStats = CoordinatorRunStats.empty();
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> runStats.add(Stat.ERROR_1, 10)
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> runStats.add(Stat.ERROR_1, Key.DUTY_1, 10)
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> runStats.addToSegmentStat(Stat.ERROR_1, "t", "ds", 10)
+    );
+    Assert.assertThrows(
+        UnsupportedOperationException.class,
+        () -> runStats.updateMax(Stat.INFO_1, Key.TIER_1, 10)
+    );
+  }
+
   /**
    * Dimension keys for reporting stats.
    */
diff --git 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
index c6a47d56c4..46479484f8 100644
--- 
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.metrics;
 
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,6 +67,15 @@ public class TaskCountStatsMonitorTest
       {
         return ImmutableMap.of("d1", 1L);
       }
+
+      @Override
+      public CoordinatorRunStats getStats()
+      {
+        final CoordinatorRunStats stats = new CoordinatorRunStats();
+        stats.add(Stat.INFO_1, 10);
+        stats.addToSegmentStat(Stat.DEBUG_1, "hot", "wiki", 20);
+        return stats;
+      }
     };
   }
 
@@ -74,11 +85,19 @@ public class TaskCountStatsMonitorTest
     final TaskCountStatsMonitor monitor = new 
TaskCountStatsMonitor(statsProvider);
     final StubServiceEmitter emitter = new StubServiceEmitter("service", 
"host");
     monitor.doMonitor(emitter);
-    Assert.assertEquals(5, emitter.getEvents().size());
+    Assert.assertEquals(7, emitter.getEvents().size());
     emitter.verifyValue("task/success/count", 1L);
     emitter.verifyValue("task/failed/count", 1L);
     emitter.verifyValue("task/running/count", 1L);
     emitter.verifyValue("task/pending/count", 1L);
     emitter.verifyValue("task/waiting/count", 1L);
+    emitter.verifyValue(Stat.INFO_1.getMetricName(), 10L);
+    emitter.verifyValue(Stat.DEBUG_1.getMetricName(), ImmutableMap.of("tier", 
"hot", "dataSource", "wiki"), 20L);
+  }
+
+  private static class Stat
+  {
+    static final CoordinatorStat INFO_1 = CoordinatorStat.toLogAndEmit("i1", 
"info/1", CoordinatorStat.Level.INFO);
+    static final CoordinatorStat DEBUG_1 = 
CoordinatorStat.toDebugAndEmit("d1", "debug/1");
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to