This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 40d0dc9e0e Use separate executor to handle task updates in TaskQueue
(#14533)
40d0dc9e0e is described below
commit 40d0dc9e0e02c89f91762a400ecdd84b950907ca
Author: Kashif Faraz <[email protected]>
AuthorDate: Fri Jul 7 20:43:12 2023 +0530
Use separate executor to handle task updates in TaskQueue (#14533)
Description:
`TaskQueue.notifyStatus` is often a heavy call as it performs the following
operations:
- Update task status in metadata DB
- Update task locks in metadata DB
- Request (synchronously) the task runner to shutdown the completed task
- Clean up in-memory data structures
This method can often be slow and can cause worker sync / task runners to
slow down.
Main changes:
- Run task completion callbacks in a separate executor to handle task
completion updates
- Add new config `druid.indexer.queue.taskCompleteHandlerNumThreads`
- Add metrics to monitor number of processed and queued items
- There are still other paths that can invoke `notifyStatus`, but those
need not be moved to
the new executor as they are synchronous on purpose.
Other changes:
- Add new metrics `task/status/queue/count`, `task/status/handled/count`
- Add `TaskCountStatsProvider.getStats()` which deprecates the other
`getXXXTaskCount` methods.
- Use `CoordinatorRunStats` to collect and report metrics. This class has
been used as is
for now but will later be renamed and repurposed to use across all Druid
services.
---
.../org/apache/druid/indexing/overlord/Stats.java | 40 +++-----
.../apache/druid/indexing/overlord/TaskMaster.java | 12 +++
.../apache/druid/indexing/overlord/TaskQueue.java | 108 +++++++++++++++------
.../indexing/overlord/config/TaskQueueConfig.java | 12 ++-
.../indexing/overlord/TaskLockConfigTest.java | 2 +-
.../indexing/overlord/TaskQueueScaleTest.java | 2 +-
.../druid/indexing/overlord/TaskQueueTest.java | 68 +++++++------
.../druid/indexing/overlord/http/OverlordTest.java | 5 +-
.../balancer/RandomBalancerStrategy.java | 4 +-
.../coordinator/stats/CoordinatorRunStats.java | 20 ++++
.../server/metrics/TaskCountStatsMonitor.java | 25 +++++
.../server/metrics/TaskCountStatsProvider.java | 16 +++
.../server/metrics/TaskSlotCountStatsProvider.java | 5 +
.../coordinator/CoordinatorRunStatsTest.java | 22 +++++
.../server/metrics/TaskCountStatsMonitorTest.java | 21 +++-
15 files changed, 268 insertions(+), 94 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
similarity index 51%
copy from
server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
copy to
indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
index a96f8ce062..4d62111e62 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/Stats.java
@@ -17,34 +17,20 @@
* under the License.
*/
-package org.apache.druid.server.metrics;
+package org.apache.druid.indexing.overlord;
-import java.util.Map;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
-public interface TaskCountStatsProvider
+/**
+ * Task-level stats emitted as metrics.
+ */
+public class Stats
{
- /**
- * Return the number of successful tasks for each datasource during emission
period.
- */
- Map<String, Long> getSuccessfulTaskCount();
-
- /**
- * Return the number of failed tasks for each datasource during emission
period.
- */
- Map<String, Long> getFailedTaskCount();
-
- /**
- * Return the number of current running tasks for each datasource.
- */
- Map<String, Long> getRunningTaskCount();
-
- /**
- * Return the number of current pending tasks for each datasource.
- */
- Map<String, Long> getPendingTaskCount();
-
- /**
- * Return the number of current waiting tasks for each datasource.
- */
- Map<String, Long> getWaitingTaskCount();
+ public static class TaskQueue
+ {
+ public static final CoordinatorStat STATUS_UPDATES_IN_QUEUE
+ = CoordinatorStat.toDebugAndEmit("queuedStatusUpdates",
"task/status/queue/count");
+ public static final CoordinatorStat HANDLED_STATUS_UPDATES
+ = CoordinatorStat.toDebugAndEmit("handledStatusUpdates",
"task/status/updated/count");
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
index 1eab403585..e4d26b3757 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskMaster.java
@@ -43,6 +43,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.CoordinatorOverlordServiceConfig;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.metrics.TaskCountStatsProvider;
import org.apache.druid.server.metrics.TaskSlotCountStatsProvider;
@@ -362,6 +363,17 @@ public class TaskMaster implements TaskCountStatsProvider,
TaskSlotCountStatsPro
}
}
+ @Override
+ public CoordinatorRunStats getStats()
+ {
+ Optional<TaskQueue> taskQueue = getTaskQueue();
+ if (taskQueue.isPresent()) {
+ return taskQueue.get().getQueueStats();
+ } else {
+ return CoordinatorRunStats.empty();
+ }
+ }
+
private void gracefulStopLeaderLifecycle()
{
try {
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 0beccce3dd..89ccfc606d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -46,6 +46,7 @@ import
org.apache.druid.indexing.overlord.config.TaskLockConfig;
import org.apache.druid.indexing.overlord.config.TaskQueueConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -53,6 +54,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.utils.CollectionUtils;
import java.util.ArrayList;
@@ -71,6 +73,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -97,7 +100,10 @@ public class TaskQueue
@GuardedBy("giant")
private final Map<String, ListenableFuture<TaskStatus>> taskFutures = new
HashMap<>();
- // Tasks that are in the process of being cleaned up by notifyStatus.
Prevents manageInternal from re-launching them.
+ /**
+ * Tasks that have recently completed and are being cleaned up. These tasks
+ * should not be relaunched by task management.
+ */
@GuardedBy("giant")
private final Set<String> recentlyCompletedTasks = new HashSet<>();
@@ -124,6 +130,12 @@ public class TaskQueue
.setNameFormat("TaskQueue-StorageSync").build()
);
+ /**
+ * Dedicated executor for task completion callbacks, to ensure that task
runner
+ * and worker sync operations are not blocked.
+ */
+ private final ExecutorService taskCompleteCallbackExecutor;
+
private volatile boolean active = false;
private static final EmittingLogger log = new
EmittingLogger(TaskQueue.class);
@@ -135,6 +147,9 @@ public class TaskQueue
@GuardedBy("totalFailedTaskCount")
private Map<String, Long> prevTotalFailedTaskCount = new HashMap<>();
+ private final AtomicInteger statusUpdatesInQueue = new AtomicInteger();
+ private final AtomicInteger handledStatusUpdates = new AtomicInteger();
+
public TaskQueue(
TaskLockConfig lockConfig,
TaskQueueConfig config,
@@ -154,6 +169,10 @@ public class TaskQueue
this.taskActionClientFactory =
Preconditions.checkNotNull(taskActionClientFactory, "taskActionClientFactory");
this.taskLockbox = Preconditions.checkNotNull(taskLockbox, "taskLockbox");
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
+ this.taskCompleteCallbackExecutor = Execs.multiThreaded(
+ config.getTaskCompleteHandlerNumThreads(),
+ "TaskQueue-OnComplete-%d"
+ );
}
@VisibleForTesting
@@ -421,7 +440,8 @@ public class TaskQueue
continue;
}
}
- taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
+ attachCallbacks(task, runnerTaskFuture);
+ taskFutures.put(task.getId(), runnerTaskFuture);
} else if (isTaskPending(task)) {
// if the taskFutures contain this task and this task is pending, also
let the taskRunner
// to run it to guarantee it will be assigned to run
@@ -587,12 +607,22 @@ public class TaskQueue
}
/**
- * Notify this queue that some task has an updated status. If this update is
valid, the status will be persisted in
- * the task storage facility. If the status is a completed status, the task
will be unlocked and no further
- * updates will be accepted.
- *
- * @param task task to update
- * @param taskStatus new task status
+ * Notifies this queue that the given task has an updated status. If this
update
+ * is valid and task is now complete, the following operations are performed:
+ * <ul>
+ * <li>Add task to {@link #recentlyCompletedTasks} to prevent re-launching
them</li>
+ * <li>Persist new status in the metadata storage to safeguard against
crashes
+ * and leader re-elections</li>
+ * <li>Request {@link #taskRunner} to shutdown task (synchronously)</li>
+ * <li>Remove all locks for task from metadata storage</li>
+ * <li>Remove task entry from {@link #tasks} and {@link #taskFutures}</li>
+ * <li>Remove task from {@link #recentlyCompletedTasks}</li>
+ * <li>Request task management</li>
+ * </ul>
+ * <p>
+ * Since this operation involves DB updates and synchronous remote calls, it
+ * must be invoked on a dedicated executor so that task runner and worker
sync
+ * is not blocked.
*
* @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status
ID
@@ -661,7 +691,7 @@ public class TaskQueue
if (removeTaskInternal(task.getId())) {
taskFutures.remove(task.getId());
} else {
- log.warn("Unknown task completed: %s", task.getId());
+ log.warn("Unknown task[%s] completed", task.getId());
}
recentlyCompletedTasks.remove(task.getId());
@@ -673,14 +703,17 @@ public class TaskQueue
}
/**
- * Attach success and failure handlers to a task status future, such that
when it completes, we perform the
- * appropriate updates.
- *
- * @param statusFuture a task status future
- *
- * @return the same future, for convenience
+ * Attaches callbacks to the task status future to update application state
when
+ * the task completes. Submits a job to handle the status on the dedicated
+ * {@link #taskCompleteCallbackExecutor}.
+ * <p>
+ * The {@code onSuccess} and {@code onFailure} methods will however run on
the
+ * executor providing the {@code statusFuture} itself (typically the worker
sync executor).
+ * This has been done in order to track metrics for in-flight status updates
+ * immediately. Thus, care must be taken to ensure that the success/failure
+ * methods remain lightweight enough to keep the sync executor unblocked.
*/
- private ListenableFuture<TaskStatus> attachCallbacks(final Task task, final
ListenableFuture<TaskStatus> statusFuture)
+ private void attachCallbacks(final Task task, final
ListenableFuture<TaskStatus> statusFuture)
{
final ServiceMetricEvent.Builder metricBuilder = new
ServiceMetricEvent.Builder();
IndexTaskUtils.setTaskDimensions(metricBuilder, task);
@@ -692,8 +725,9 @@ public class TaskQueue
@Override
public void onSuccess(final TaskStatus status)
{
- log.info("Received %s status for task: %s",
status.getStatusCode(), status.getId());
- handleStatus(status);
+ log.info("Received status[%s] for task[%s].",
status.getStatusCode(), status.getId());
+ statusUpdatesInQueue.incrementAndGet();
+ taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
}
@Override
@@ -704,9 +738,12 @@ public class TaskQueue
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.emit();
- handleStatus(
- TaskStatus.failure(task.getId(), "Failed to run this task. See
overlord logs for more details.")
+ statusUpdatesInQueue.incrementAndGet();
+ TaskStatus status = TaskStatus.failure(
+ task.getId(),
+ "Failed to run task. See overlord logs for more details."
);
+ taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
}
private void handleStatus(final TaskStatus status)
@@ -715,7 +752,7 @@ public class TaskQueue
// If we're not supposed to be running anymore, don't do
anything. Somewhat racey if the flag gets set
// after we check and before we commit the database transaction,
but better than nothing.
if (!active) {
- log.info("Abandoning task due to shutdown: %s", task.getId());
+ log.info("Abandoning task [%s] due to shutdown.",
task.getId());
return;
}
@@ -727,10 +764,8 @@ public class TaskQueue
emitter.emit(metricBuilder.build("task/run/time",
status.getDuration()));
log.info(
- "Task %s: %s (%d run duration)",
- status.getStatusCode(),
- task.getId(),
- status.getDuration()
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), status.getStatusCode(), status.getDuration()
);
if (status.isSuccess()) {
@@ -746,10 +781,15 @@ public class TaskQueue
.addData("statusCode", status.getStatusCode())
.emit();
}
+ finally {
+ statusUpdatesInQueue.decrementAndGet();
+ handledStatusUpdates.incrementAndGet();
+ }
}
- }
+ },
+ // Use direct executor to track metrics for in-flight updates
immediately
+ Execs.directExecutor()
);
- return statusFuture;
}
/**
@@ -893,6 +933,20 @@ public class TaskQueue
}
}
+ public CoordinatorRunStats getQueueStats()
+ {
+ final int queuedUpdates = statusUpdatesInQueue.get();
+ final int handledUpdates = handledStatusUpdates.getAndSet(0);
+ if (queuedUpdates > 0) {
+ log.info("There are [%d] task status updates in queue, handled [%d]",
queuedUpdates, handledUpdates);
+ }
+
+ final CoordinatorRunStats stats = new CoordinatorRunStats();
+ stats.add(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE, queuedUpdates);
+ stats.add(Stats.TaskQueue.HANDLED_STATUS_UPDATES, handledUpdates);
+ return stats;
+ }
+
@VisibleForTesting
List<Task> getTasks()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
index 2b03814b3a..1e7bb1ffa2 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/TaskQueueConfig.java
@@ -39,15 +39,20 @@ public class TaskQueueConfig
@JsonProperty
private Duration storageSyncRate;
+ @JsonProperty
+ private int taskCompleteHandlerNumThreads;
+
@JsonCreator
public TaskQueueConfig(
@JsonProperty("maxSize") final Integer maxSize,
@JsonProperty("startDelay") final Period startDelay,
@JsonProperty("restartDelay") final Period restartDelay,
- @JsonProperty("storageSyncRate") final Period storageSyncRate
+ @JsonProperty("storageSyncRate") final Period storageSyncRate,
+ @JsonProperty("taskCompleteHandlerNumThreads") final Integer
taskCompleteHandlerNumThreads
)
{
this.maxSize = Configs.valueOrDefault(maxSize, Integer.MAX_VALUE);
+ this.taskCompleteHandlerNumThreads =
Configs.valueOrDefault(taskCompleteHandlerNumThreads, 5);
this.startDelay = defaultDuration(startDelay, "PT1M");
this.restartDelay = defaultDuration(restartDelay, "PT30S");
this.storageSyncRate = defaultDuration(storageSyncRate, "PT1M");
@@ -58,6 +63,11 @@ public class TaskQueueConfig
return maxSize;
}
+ public int getTaskCompleteHandlerNumThreads()
+ {
+ return taskCompleteHandlerNumThreads;
+ }
+
public Duration getStartDelay()
{
return startDelay;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
index b158eaca77..364b492148 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLockConfigTest.java
@@ -108,7 +108,7 @@ public class TaskLockConfigTest
} else {
lockConfig = new TaskLockConfig();
}
- final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null,
null);
+ final TaskQueueConfig queueConfig = new TaskQueueConfig(null, null, null,
null, null);
final TaskRunner taskRunner =
EasyMock.createNiceMock(RemoteTaskRunner.class);
final TaskActionClientFactory actionClientFactory =
EasyMock.createNiceMock(LocalTaskActionClientFactory.class);
final TaskLockbox lockbox = new TaskLockbox(taskStorage, new
TestIndexerMetadataStorageCoordinator());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
index d305b0d6c9..2d2937c814 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueScaleTest.java
@@ -116,7 +116,7 @@ public class TaskQueueScaleTest
taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, Period.millis(1), null, null),
+ new TaskQueueConfig(null, Period.millis(1), null, null, null),
new DefaultTaskConfig(),
taskStorage,
taskRunner,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 6388cdd573..dccc7eba70 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -28,6 +28,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.druid.common.guava.DSuppliers;
import org.apache.druid.discovery.DruidNodeDiscoveryProvider;
import org.apache.druid.discovery.WorkerNodeService;
+import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
@@ -64,6 +65,7 @@ import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.TestHelper;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
import org.apache.druid.server.initialization.IndexerZkConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
import org.apache.druid.server.metrics.NoopServiceEmitter;
@@ -81,7 +83,6 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
public class TaskQueueTest extends IngestionTestBase
{
@@ -102,7 +103,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -147,7 +148,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -187,7 +188,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -212,7 +213,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig()
{
@Override
@@ -247,7 +248,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -279,7 +280,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig()
{
@Override
@@ -312,7 +313,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -342,7 +343,7 @@ public class TaskQueueTest extends IngestionTestBase
final TaskActionClientFactory actionClientFactory =
createActionClientFactory();
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
new SimpleTaskRunner(actionClientFactory),
@@ -379,7 +380,9 @@ public class TaskQueueTest extends IngestionTestBase
final HttpRemoteTaskRunner taskRunner =
createHttpRemoteTaskRunner(ImmutableList.of("t1"));
final StubServiceEmitter metricsVerifier = new
StubServiceEmitter("druid/overlord", "testHost");
WorkerHolder workerHolder = EasyMock.createMock(WorkerHolder.class);
- EasyMock.expect(workerHolder.getWorker()).andReturn(new Worker("http",
"worker", "127.0.0.1", 1, "v1", WorkerConfig.DEFAULT_CATEGORY)).anyTimes();
+ EasyMock.expect(workerHolder.getWorker())
+ .andReturn(new Worker("http", "worker", "127.0.0.1", 1, "v1",
WorkerConfig.DEFAULT_CATEGORY))
+ .anyTimes();
workerHolder.incrementContinuouslyFailedTasksCount();
EasyMock.expectLastCall();
workerHolder.setLastCompletedTaskTime(EasyMock.anyObject());
@@ -387,7 +390,7 @@ public class TaskQueueTest extends IngestionTestBase
EasyMock.replay(workerHolder);
final TaskQueue taskQueue = new TaskQueue(
new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null),
+ new TaskQueueConfig(null, null, null, null, null),
new DefaultTaskConfig(),
getTaskStorage(),
taskRunner,
@@ -399,35 +402,36 @@ public class TaskQueueTest extends IngestionTestBase
final Task task = new TestTask(
"t1",
Intervals.of("2021-01-01/P1D"),
- ImmutableMap.of(
- Tasks.FORCE_TIME_CHUNK_LOCK_KEY,
- false
- )
+ ImmutableMap.of(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, false)
);
taskQueue.add(task);
taskQueue.manageInternal();
- taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
- task,
- TaskStatus.running(task.getId()),
- TaskLocation.create("worker", 1, 2)
- ), workerHolder);
- while (!taskRunner.getRunningTasks()
- .stream()
- .map(TaskRunnerWorkItem::getTaskId)
- .collect(Collectors.toList())
- .contains(task.getId())) {
+
+ // Announce the task and wait for it to start running
+ final String taskId = task.getId();
+ final TaskLocation taskLocation = TaskLocation.create("worker", 1, 2);
+ taskRunner.taskAddedOrUpdated(
+ TaskAnnouncement.create(task, TaskStatus.running(taskId),
taskLocation),
+ workerHolder
+ );
+ while (taskRunner.getRunnerTaskState(taskId) != RunnerTaskState.RUNNING) {
Thread.sleep(100);
}
- taskQueue.shutdown(task.getId(), "shutdown");
- taskRunner.taskAddedOrUpdated(TaskAnnouncement.create(
- task,
- TaskStatus.failure(task.getId(), "shutdown"),
- TaskLocation.create("worker", 1, 2)
- ), workerHolder);
+
+ // Kill the task, send announcement and wait for TaskQueue to handle update
+ taskQueue.shutdown(taskId, "shutdown");
+ taskRunner.taskAddedOrUpdated(
+ TaskAnnouncement.create(task, TaskStatus.failure(taskId, "shutdown"),
taskLocation),
+ workerHolder
+ );
taskQueue.manageInternal();
+ Thread.sleep(100);
- metricsVerifier.getEvents();
+ // Verify that metrics are emitted on receiving announcement
metricsVerifier.verifyEmitted("task/run/time", 1);
+ CoordinatorRunStats stats = taskQueue.getQueueStats();
+ Assert.assertEquals(0L,
stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE));
+ Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
}
private HttpRemoteTaskRunner createHttpRemoteTaskRunner(List<String>
runningTasks)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 9738f8b933..0b9c77ef66 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -215,7 +215,7 @@ public class OverlordTest
taskMaster = new TaskMaster(
new TaskLockConfig(),
- new TaskQueueConfig(null, new Period(1), null, new Period(10)),
+ new TaskQueueConfig(null, new Period(1), null, new Period(10), null),
new DefaultTaskConfig(),
taskLockbox,
taskStorage,
@@ -329,9 +329,12 @@ public class OverlordTest
response = overlordResource.getCompleteTasks(1, req);
Assert.assertEquals(1, (((List) response.getEntity()).size()));
+ Assert.assertEquals(1, taskMaster.getStats().rowCount());
taskMaster.stop();
Assert.assertFalse(taskMaster.isLeader());
+ Assert.assertEquals(0, taskMaster.getStats().rowCount());
+
EasyMock.verify(taskActionClientFactory);
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
index cccc2518e8..fc0da34310 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/balancer/RandomBalancerStrategy.java
@@ -38,8 +38,6 @@ import java.util.stream.Collectors;
*/
public class RandomBalancerStrategy implements BalancerStrategy
{
- private static final CoordinatorRunStats EMPTY_STATS = new
CoordinatorRunStats();
-
@Override
public Iterator<ServerHolder> findServersToLoadSegment(
DataSegment segmentToLoad,
@@ -77,6 +75,6 @@ public class RandomBalancerStrategy implements
BalancerStrategy
@Override
public CoordinatorRunStats getAndResetStats()
{
- return EMPTY_STATS;
+ return CoordinatorRunStats.empty();
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
index c67d84c681..6d0e1b8ae8 100644
---
a/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
+++
b/server/src/main/java/org/apache/druid/server/coordinator/stats/CoordinatorRunStats.java
@@ -40,10 +40,30 @@ import java.util.concurrent.atomic.AtomicInteger;
@ThreadSafe
public class CoordinatorRunStats
{
+ private static final CoordinatorRunStats EMPTY_INSTANCE = new
CoordinatorRunStats()
+ {
+ @Override
+ public void add(CoordinatorStat stat, RowKey rowKey, long value)
+ {
+ throw new UnsupportedOperationException("Cannot add stats to empty
CoordinatorRunStats instance");
+ }
+
+ @Override
+ public void updateMax(CoordinatorStat stat, RowKey rowKey, long value)
+ {
+ throw new UnsupportedOperationException("Cannot add stats to empty
CoordinatorRunStats instance");
+ }
+ };
+
private final ConcurrentHashMap<RowKey,
Object2LongOpenHashMap<CoordinatorStat>>
allStats = new ConcurrentHashMap<>();
private final Map<Dimension, String> debugDimensions = new HashMap<>();
+ public static CoordinatorRunStats empty()
+ {
+ return EMPTY_INSTANCE;
+ }
+
public CoordinatorRunStats()
{
this(null);
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
index 6c3394e977..f9cdaee75c 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsMonitor.java
@@ -23,6 +23,9 @@ import com.google.inject.Inject;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.AbstractMonitor;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
+import org.apache.druid.server.coordinator.stats.Dimension;
import java.util.Map;
@@ -46,6 +49,15 @@ public class TaskCountStatsMonitor extends AbstractMonitor
emit(emitter, "task/running/count", statsProvider.getRunningTaskCount());
emit(emitter, "task/pending/count", statsProvider.getPendingTaskCount());
emit(emitter, "task/waiting/count", statsProvider.getWaitingTaskCount());
+
+ CoordinatorRunStats stats = statsProvider.getStats();
+ if (stats != null) {
+ stats.forEachStat(
+ (stat, dimensions, statValue)
+ -> emit(emitter, stat, dimensions.getValues(), statValue)
+ );
+ }
+
return true;
}
@@ -60,4 +72,17 @@ public class TaskCountStatsMonitor extends AbstractMonitor
}
}
+ private void emit(ServiceEmitter emitter, CoordinatorStat stat,
Map<Dimension, String> dimensionValues, long value)
+ {
+ if (!stat.shouldEmit()) {
+ return;
+ }
+
+ ServiceMetricEvent.Builder eventBuilder = new ServiceMetricEvent.Builder();
+ dimensionValues.forEach(
+ (dim, dimValue) -> eventBuilder.setDimension(dim.reportedName(),
dimValue)
+ );
+ emitter.emit(eventBuilder.build(stat.getMetricName(), value));
+ }
+
}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
index a96f8ce062..1b96047fbd 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/TaskCountStatsProvider.java
@@ -19,6 +19,8 @@
package org.apache.druid.server.metrics;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+
import java.util.Map;
public interface TaskCountStatsProvider
@@ -26,25 +28,39 @@ public interface TaskCountStatsProvider
/**
* Return the number of successful tasks for each datasource during emission
period.
*/
+ @Deprecated
Map<String, Long> getSuccessfulTaskCount();
/**
* Return the number of failed tasks for each datasource during emission
period.
*/
+ @Deprecated
Map<String, Long> getFailedTaskCount();
/**
* Return the number of current running tasks for each datasource.
*/
+ @Deprecated
Map<String, Long> getRunningTaskCount();
/**
* Return the number of current pending tasks for each datasource.
*/
+ @Deprecated
Map<String, Long> getPendingTaskCount();
/**
* Return the number of current waiting tasks for each datasource.
*/
+ @Deprecated
Map<String, Long> getWaitingTaskCount();
+
+ /**
+ * Collects all task level stats. This method deprecates the other task stats
+ * methods such as {@link #getPendingTaskCount()}, {@link
#getWaitingTaskCount()}
+ * and will replace them in a future release.
+ *
+ * @return All task stats collected since the previous invocation of this
method.
+ */
+ CoordinatorRunStats getStats();
}
diff --git
a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
index e7a7249b10..51b876ce5a 100644
---
a/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
+++
b/server/src/main/java/org/apache/druid/server/metrics/TaskSlotCountStatsProvider.java
@@ -23,6 +23,11 @@ import javax.annotation.Nullable;
import java.util.Map;
+/**
+ * This is deprecated and will be merged into {@link TaskCountStatsProvider} in
+ * a future release.
+ */
+@Deprecated
public interface TaskSlotCountStatsProvider
{
/**
diff --git
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
index 1bf111a4f4..6b4a16f140 100644
---
a/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordinator/CoordinatorRunStatsTest.java
@@ -199,6 +199,28 @@ public class CoordinatorRunStatsTest
Assert.assertEquals(expectedTable, debugStats.buildStatsTable());
}
+ @Test
+ public void testAddToEmptyThrowsException()
+ {
+ CoordinatorRunStats runStats = CoordinatorRunStats.empty();
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> runStats.add(Stat.ERROR_1, 10)
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> runStats.add(Stat.ERROR_1, Key.DUTY_1, 10)
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> runStats.addToSegmentStat(Stat.ERROR_1, "t", "ds", 10)
+ );
+ Assert.assertThrows(
+ UnsupportedOperationException.class,
+ () -> runStats.updateMax(Stat.INFO_1, Key.TIER_1, 10)
+ );
+ }
+
/**
* Dimension keys for reporting stats.
*/
diff --git
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
index c6a47d56c4..46479484f8 100644
---
a/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
+++
b/server/src/test/java/org/apache/druid/server/metrics/TaskCountStatsMonitorTest.java
@@ -21,6 +21,8 @@ package org.apache.druid.server.metrics;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
+import org.apache.druid.server.coordinator.stats.CoordinatorRunStats;
+import org.apache.druid.server.coordinator.stats.CoordinatorStat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -65,6 +67,15 @@ public class TaskCountStatsMonitorTest
{
return ImmutableMap.of("d1", 1L);
}
+
+ @Override
+ public CoordinatorRunStats getStats()
+ {
+ final CoordinatorRunStats stats = new CoordinatorRunStats();
+ stats.add(Stat.INFO_1, 10);
+ stats.addToSegmentStat(Stat.DEBUG_1, "hot", "wiki", 20);
+ return stats;
+ }
};
}
@@ -74,11 +85,19 @@ public class TaskCountStatsMonitorTest
final TaskCountStatsMonitor monitor = new
TaskCountStatsMonitor(statsProvider);
final StubServiceEmitter emitter = new StubServiceEmitter("service",
"host");
monitor.doMonitor(emitter);
- Assert.assertEquals(5, emitter.getEvents().size());
+ Assert.assertEquals(7, emitter.getEvents().size());
emitter.verifyValue("task/success/count", 1L);
emitter.verifyValue("task/failed/count", 1L);
emitter.verifyValue("task/running/count", 1L);
emitter.verifyValue("task/pending/count", 1L);
emitter.verifyValue("task/waiting/count", 1L);
+ emitter.verifyValue(Stat.INFO_1.getMetricName(), 10L);
+ emitter.verifyValue(Stat.DEBUG_1.getMetricName(), ImmutableMap.of("tier",
"hot", "dataSource", "wiki"), 20L);
+ }
+
+ private static class Stat
+ {
+ static final CoordinatorStat INFO_1 = CoordinatorStat.toLogAndEmit("i1",
"info/1", CoordinatorStat.Level.INFO);
+ static final CoordinatorStat DEBUG_1 =
CoordinatorStat.toDebugAndEmit("d1", "debug/1");
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]