This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 9c55da6a27c Emit metrics on all task completions. (#18766)
9c55da6a27c is described below
commit 9c55da6a27c69d0136a4209d201730609e2a55db
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jan 8 07:55:48 2026 -0800
Emit metrics on all task completions. (#18766)
Previously, "emitTaskCompletionLogsAndMetrics" would emit the metrics
task/run/time, task/success/count, and task/failed/count only for tasks
that complete due to an attached runner callback (from attachCallbacks).
This patch causes metrics to be emitted whenever notifyStatus successfully
marks a task as completed.
The prior behavior missed scenarios where the shutdown API is used on a
task that the runner is aware of but has not yet been added to the queue.
It could happen during Overlord startup, while the queue is initializing.
This patch also fixes a bug in TaskQueue#getTaskStatus, where it was
using status from the taskRunner rather than activeTasks.
---
.../embedded/indexing/IngestionSmokeTest.java | 9 +-
.../apache/druid/indexing/overlord/TaskQueue.java | 46 ++++----
.../druid/indexing/overlord/TaskQueueTest.java | 128 ++++++++++-----------
.../testing/embedded/EmbeddedClusterApis.java | 85 ++++++++++++++
4 files changed, 175 insertions(+), 93 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index cb0e2052d31..fdc90cab49a 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -340,9 +340,12 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
);
final Optional<InputStream> streamOptional =
- overlord.bindings()
- .getInstance(TaskLogStreamer.class)
- .streamTaskLog(taskId, 0);
+ cluster.callApi().waitForResult(
+ () -> overlord.bindings()
+ .getInstance(TaskLogStreamer.class)
+ .streamTaskLog(taskId, 0),
+ Optional::isPresent
+ ).go();
Assertions.assertTrue(streamOptional.isPresent());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 82f231e2ef6..5f64e18e942 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -36,7 +36,6 @@ import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.EntryAlreadyExists;
import org.apache.druid.error.InvalidInput;
-import org.apache.druid.indexer.RunnerTaskState;
import org.apache.druid.indexer.TaskInfo;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
@@ -46,6 +45,7 @@ import
org.apache.druid.indexing.common.actions.TaskActionClientFactory;
import org.apache.druid.indexing.common.task.IndexTaskUtils;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskContextEnricher;
+import org.apache.druid.indexing.common.task.TaskMetrics;
import org.apache.druid.indexing.common.task.Tasks;
import
org.apache.druid.indexing.common.task.batch.MaxAllowedLocksExceededException;
import
org.apache.druid.indexing.common.task.batch.parallel.SinglePhaseParallelIndexTaskRunner;
@@ -451,7 +451,6 @@ public class TaskQueue
}
final TaskStatus taskStatus = TaskStatus.failure(task.getId(),
errorMessage);
notifyStatus(entry, taskStatus, taskStatus.getErrorMsg());
- emitTaskCompletionLogsAndMetrics(task, taskStatus);
return;
}
if (taskIsReady) {
@@ -751,8 +750,8 @@ public class TaskQueue
}
shutdownTaskOnRunner(task.getId(), reasonFormat, args);
-
removeTaskLock(task);
+ emitTaskCompletionLogsAndMetrics(task, taskStatus);
requestManagement();
log.info("Completed notifyStatus for task[%s] with status[%s]",
task.getId(), taskStatus);
@@ -813,9 +812,6 @@ public class TaskQueue
task.getId(),
entry -> notifyStatus(entry, status, "notified status change
from task")
);
-
- // Emit event and log, if the task is done
- emitTaskCompletionLogsAndMetrics(task, status);
}
catch (Exception e) {
log.makeAlert(e, "Failed to handle task status")
@@ -966,14 +962,14 @@ public class TaskQueue
}
/**
- * Gets the current status of this task either from the {@link TaskRunner}
- * or from the {@link TaskStorage} (if not available with the TaskRunner).
+ * Gets the current status of this task either from {@link #activeTasks} and
{@link #taskRunner}, if active,
+ * or otherwise from the {@link TaskStorage}.
*/
public Optional<TaskStatus> getTaskStatus(final String taskId)
{
- RunnerTaskState runnerTaskState = taskRunner.getRunnerTaskState(taskId);
- if (runnerTaskState != null && runnerTaskState != RunnerTaskState.NONE) {
- return
Optional.of(TaskStatus.running(taskId).withLocation(taskRunner.getTaskLocation(taskId)));
+ final TaskEntry activeTaskEntry = activeTasks.get(taskId);
+ if (activeTaskEntry != null) {
+ return
Optional.of(activeTaskEntry.taskInfo.getStatus().withLocation(taskRunner.getTaskLocation(taskId)));
} else {
return taskStorage.getStatus(taskId);
}
@@ -1074,24 +1070,22 @@ public class TaskQueue
private void emitTaskCompletionLogsAndMetrics(final Task task, final
TaskStatus status)
{
- if (status.isComplete()) {
- final ServiceMetricEvent.Builder metricBuilder =
ServiceMetricEvent.builder();
- IndexTaskUtils.setTaskDimensions(metricBuilder, task);
- IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
-
- emitter.emit(metricBuilder.setMetric("task/run/time",
status.getDuration()));
+ final ServiceMetricEvent.Builder metricBuilder =
ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskDimensions(metricBuilder, task);
+ IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
- if (status.isSuccess()) {
- Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
- } else {
- Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
- }
+ emitter.emit(metricBuilder.setMetric(TaskMetrics.RUN_DURATION,
status.getDuration()));
- log.info(
- "Completed task[%s] with status[%s] in [%d]ms.",
- task.getId(), status, status.getDuration()
- );
+ if (status.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
}
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), status, status.getDuration()
+ );
}
private void validateTaskPayload(Task task)
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
index 4d0733dfade..388af3bfe91 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java
@@ -144,7 +144,7 @@ public class TaskQueueTest extends IngestionTestBase
taskQueue.setActive(true);
}
- @Test
+ @Test(timeout = 30_000)
public void testManageQueuedTasksReleaseLockWhenTaskIsNotReady() throws
Exception
{
// task1 emulates a case when there is a task that was issued before task2
and acquired locks conflicting
@@ -164,16 +164,20 @@ public class TaskQueueTest extends IngestionTestBase
final TestTask task3 = new TestTask("t3", Intervals.of("2021-02-01/P1M"));
taskQueue.add(task3);
taskQueue.manageQueuedTasks();
+
+ // Wait for task3 to exit.
+ waitForTaskToExit(task3);
+
Assert.assertFalse(task2.isDone());
Assert.assertTrue(task3.isDone());
Assert.assertTrue(getLockbox().findLocksForTask(task2).isEmpty());
- // Shut down task1 and task3 and release their locks.
+ // Shut down task1 and release its locks.
shutdownTask(task1);
- taskQueue.shutdown(task3.getId(), "Emulating shutdown of task3");
// Now task2 should run.
taskQueue.manageQueuedTasks();
+ waitForTaskToExit(task2);
Assert.assertTrue(task2.isDone());
// Sleep to allow all metrics to be emitted
@@ -490,7 +494,7 @@ public class TaskQueueTest extends IngestionTestBase
Thread.sleep(100);
// Verify that metrics are emitted on receiving announcement
- serviceEmitter.verifyEmitted("task/run/time",
Map.of(DruidMetrics.DESCRIPTION, "shutdown on runner"), 1);
+ serviceEmitter.verifyEmitted("task/run/time",
Map.of(DruidMetrics.DESCRIPTION, "shutdown"), 1);
verifySuccessfulTaskCount(taskQueue, 0);
verifyFailedTaskCount(taskQueue, 1);
@@ -500,69 +504,53 @@ public class TaskQueueTest extends IngestionTestBase
}
@Test
- public void testGetTaskStatus()
+ public void testGetTaskStatus_successfulTask()
{
- final TaskRunner taskRunner = EasyMock.createMock(TaskRunner.class);
- final TaskStorage taskStorage = EasyMock.createMock(TaskStorage.class);
-
- final String newTask = "newTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(newTask))
- .andReturn(null);
- EasyMock.expect(taskStorage.getStatus(newTask))
- .andReturn(Optional.of(TaskStatus.running(newTask)));
-
- final String waitingTask = "waitingTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(waitingTask))
- .andReturn(RunnerTaskState.WAITING);
- EasyMock.expect(taskRunner.getTaskLocation(waitingTask))
- .andReturn(TaskLocation.unknown());
-
- final String pendingTask = "pendingTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(pendingTask))
- .andReturn(RunnerTaskState.PENDING);
- EasyMock.expect(taskRunner.getTaskLocation(pendingTask))
- .andReturn(TaskLocation.unknown());
-
- final String runningTask = "runningTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(runningTask))
- .andReturn(RunnerTaskState.RUNNING);
- EasyMock.expect(taskRunner.getTaskLocation(runningTask))
- .andReturn(TaskLocation.create("host", 8100, 8100));
-
- final String successfulTask = "successfulTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(successfulTask))
- .andReturn(RunnerTaskState.NONE);
- EasyMock.expect(taskStorage.getStatus(successfulTask))
- .andReturn(Optional.of(TaskStatus.success(successfulTask)));
-
- final String failedTask = "failedTask";
- EasyMock.expect(taskRunner.getRunnerTaskState(failedTask))
- .andReturn(RunnerTaskState.NONE);
- EasyMock.expect(taskStorage.getStatus(failedTask))
- .andReturn(Optional.of(TaskStatus.failure(failedTask,
failedTask)));
-
- EasyMock.replay(taskRunner, taskStorage);
+ final TestTask task = new TestTask("successfulTask",
Intervals.of("2021-01-01/P1D"));
+ taskQueue.add(task);
- final TaskQueue taskQueue = new TaskQueue(
- new TaskLockConfig(),
- new TaskQueueConfig(null, null, null, null, null, null),
- new DefaultTaskConfig(),
- taskStorage,
- taskRunner,
- actionClientFactory,
- getLockbox(),
- serviceEmitter,
- getObjectMapper(),
- new NoopTaskContextEnricher()
- );
- taskQueue.setActive(true);
+ // Active task: status should come from activeTasks map
+ final Optional<TaskStatus> activeStatus =
taskQueue.getTaskStatus(task.getId());
+ Assert.assertTrue(activeStatus.isPresent());
+ Assert.assertEquals(TaskState.RUNNING, activeStatus.get().getStatusCode());
+ Assert.assertEquals(task.getId(), activeStatus.get().getId());
- Assert.assertEquals(TaskStatus.running(newTask),
taskQueue.getTaskStatus(newTask).get());
- Assert.assertEquals(TaskStatus.running(waitingTask),
taskQueue.getTaskStatus(waitingTask).get());
- Assert.assertEquals(TaskStatus.running(pendingTask),
taskQueue.getTaskStatus(pendingTask).get());
- Assert.assertEquals(TaskStatus.running(runningTask),
taskQueue.getTaskStatus(runningTask).get());
- Assert.assertEquals(TaskStatus.success(successfulTask),
taskQueue.getTaskStatus(successfulTask).get());
- Assert.assertEquals(TaskStatus.failure(failedTask, failedTask),
taskQueue.getTaskStatus(failedTask).get());
+ // Run the task so it completes
+ taskQueue.manageQueuedTasks();
+ waitForTaskToExit(task);
+
+ final Optional<TaskStatus> completedStatus =
taskQueue.getTaskStatus(task.getId());
+ Assert.assertTrue(completedStatus.isPresent());
+ Assert.assertEquals(TaskState.SUCCESS,
completedStatus.get().getStatusCode());
+ }
+
+ @Test
+ public void testGetTaskStatus_failedTask()
+ {
+ final TestTask task = new TestTask("failedTask",
Intervals.of("2021-01-01/P1D"))
+ {
+ @Override
+ public TaskStatus runTask(TaskToolbox toolbox)
+ {
+ super.done = true;
+ return TaskStatus.failure(getId(), "intentional failure");
+ }
+ };
+ taskQueue.add(task);
+ taskQueue.manageQueuedTasks();
+ waitForTaskToExit(task);
+
+ final Optional<TaskStatus> failedStatus =
taskQueue.getTaskStatus(task.getId());
+ Assert.assertTrue(failedStatus.isPresent());
+ Assert.assertEquals(TaskState.FAILED, failedStatus.get().getStatusCode());
+ Assert.assertEquals("intentional failure",
failedStatus.get().getErrorMsg());
+ }
+
+ @Test
+ public void testGetTaskStatus_unknownTask()
+ {
+ final Optional<TaskStatus> unknownStatus =
taskQueue.getTaskStatus("unknownTask");
+ Assert.assertFalse(unknownStatus.isPresent());
}
@Test
@@ -785,6 +773,18 @@ public class TaskQueueTest extends IngestionTestBase
);
}
+ private void waitForTaskToExit(final Task task)
+ {
+ while
(taskQueue.getActiveTasksForDatasource(task.getDataSource()).containsKey(task.getId()))
{
+ try {
+ Thread.sleep(10);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
private static void verifySuccessfulTaskCount(final TaskQueue taskQueue, int
successCount)
{
Assert.assertEquals(
diff --git
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
index d796d240987..a628ad26bbe 100644
---
a/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
+++
b/services/src/test/java/org/apache/druid/testing/embedded/EmbeddedClusterApis.java
@@ -20,6 +20,7 @@
package org.apache.druid.testing.embedded;
import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.broker.BrokerClient;
@@ -35,6 +36,7 @@ import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
@@ -59,6 +61,7 @@ import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
+import java.util.function.Predicate;
/**
* Contains various utility methods to interact with an {@link
EmbeddedDruidCluster}.
@@ -353,6 +356,22 @@ public class EmbeddedClusterApis implements
EmbeddedResource
);
}
+ /**
+ * Creates a waiter that can wait for a result to match a matcher. Make sure
to call {@link ResultWaiter#go()}
+ * or else the waiter will not do anything.
+ *
+ * In general, you should prefer using {@link LatchableEmitter} rather than
this method, because it doesn't need
+ * retry loops and is therefore both more responsive, and better at catching
race conditions. Use this method
+ * when there is no metric to wait on, and you believe that adding one would
be overkill.
+ */
+ public <T> ResultWaiter<T> waitForResult(
+ final ExceptionalSupplier<T> resultSupplier,
+ final Predicate<T> resultMatcher
+ )
+ {
+ return new ResultWaiter<>(resultSupplier, resultMatcher);
+ }
+
/**
* Returns a {@link Closeable} that deletes all the data for the given
datasource
* on {@link Closeable#close()}.
@@ -483,9 +502,75 @@ public class EmbeddedClusterApis implements
EmbeddedResource
return alignedIntervals;
}
+ /**
+ * Waiter returned by {@link #waitForResult}.
+ */
+ public static class ResultWaiter<T>
+ {
+ private final ExceptionalSupplier<T> resultSupplier;
+ private final Predicate<T> resultMatcher;
+ private long timeoutMillis = 10_000;
+ private long retryMillis = 250;
+
+ private ResultWaiter(ExceptionalSupplier<T> resultSupplier, Predicate<T>
resultMatcher)
+ {
+ this.resultSupplier = resultSupplier;
+ this.resultMatcher = resultMatcher;
+ }
+
+ public ResultWaiter<T> withTimeoutMillis(final long timeoutMillis)
+ {
+ this.timeoutMillis = timeoutMillis;
+ return this;
+ }
+
+ public ResultWaiter<T> withRetryMillis(final long retryMillis)
+ {
+ this.retryMillis = retryMillis;
+ return this;
+ }
+
+ /**
+ * Start checking for the result and return it when it's available.
+ */
+ public T go()
+ {
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+
+ try {
+ T t = resultSupplier.get();
+ boolean matches;
+ while (!(matches = resultMatcher.test(t)) && stopwatch.millisElapsed()
< timeoutMillis) {
+ Thread.sleep(retryMillis);
+ t = resultSupplier.get();
+ }
+
+ if (matches) {
+ return t;
+ } else {
+ throw new ISE("Condition not met after [%,d] ms. Final object was
[%s].", stopwatch.millisElapsed(), t);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ catch (Throwable e) {
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
@FunctionalInterface
public interface TaskBuilder
{
Object build(String dataSource, String taskId);
}
+
+ @FunctionalInterface
+ public interface ExceptionalSupplier<T>
+ {
+ T get() throws Throwable;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]