kfaraz commented on code in PR #18060:
URL: https://github.com/apache/druid/pull/18060#discussion_r2119928627
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -443,8 +447,13 @@ private void startPendingTaskOnRunner(TaskEntry entry,
ListenableFuture<TaskStat
e.getMessage()
);
}
+ statusUpdatesInQueue.incrementAndGet();
Review Comment:
This should be incremented just before adding items to the
`taskCompleteCallbackExecutor`, not here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.emit();
- statusUpdatesInQueue.incrementAndGet();
TaskStatus status = TaskStatus.failure(
task.getId(),
"Failed to run task. See overlord logs for more details."
);
- taskCompleteCallbackExecutor.execute(() -> handleStatus(status));
- }
-
- private void handleStatus(final TaskStatus status)
- {
- try {
- // If we're not supposed to be running anymore, don't do
anything. Somewhat racey if the flag gets set
- // after we check and before we commit the database transaction,
but better than nothing.
- if (!active) {
- log.info("Abandoning task [%s] due to shutdown.",
task.getId());
- return;
- }
-
- updateTaskEntry(
- task.getId(),
- entry -> notifyStatus(entry, status, "notified status change
from task")
- );
-
- // Emit event and log, if the task is done
- if (status.isComplete()) {
- IndexTaskUtils.setTaskStatusDimensions(metricBuilder, status);
- emitter.emit(metricBuilder.setMetric("task/run/time",
status.getDuration()));
-
- log.info(
- "Completed task[%s] with status[%s] in [%d]ms.",
- task.getId(), status.getStatusCode(), status.getDuration()
- );
-
- if (status.isSuccess()) {
- Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
- } else {
- Counters.incrementAndGetLong(totalFailedTaskCount,
getMetricKey(task));
- }
- }
- }
- catch (Exception e) {
- log.makeAlert(e, "Failed to handle task status")
- .addData("task", task.getId())
- .addData("statusCode", status.getStatusCode())
- .emit();
- }
- finally {
- statusUpdatesInQueue.decrementAndGet();
- handledStatusUpdates.incrementAndGet();
- }
+ taskCompleteCallbackExecutor.execute(() -> handleStatusWrapper(
Review Comment:
This lambda should first call `handleStatusWrapper` and then update the
counts for updates etc.
Or since you need to do the same thing for both success and failure, you can
keep the old `handleStatus` method here which invokes the top-level
`handleStatusWrapper`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine. We'll come back and try to
- // shut it down again later in manageInternalPostCritical, once it's
removed from the "tasks" map.
+ // If task runner shutdown fails, continue with the task shutdown
routine.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
}
removeTaskLock(task);
requestManagement();
+
+ // Emit event and log
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+ emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+ );
+
+ if (taskStatus.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+ }
+ }
+
+ /**
+ * Handles updating a task's status and incrementing/decrementing the status
update accounting metrics
+ */
+ private void handleStatusWrapper(
Review Comment:
Please avoid the suffix `wrapper` as it is confusing. Maybe just use
`handleStatus` or `handleCompletedStatus`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -663,24 +648,33 @@ public void shutdownWithSuccess(final String taskId,
String reasonFormat, Object
* <li>Request {@link #taskRunner} to shutdown task (synchronously)</li>
* <li>Remove all locks for task from metadata storage</li>
* <li>Request task management</li>
+ * <li>Perform Task accounting metrics/logs</li>
* </ul>
* This method does not remove the task from {@link #activeTasks} to avoid
* race conditions with {@link #syncFromStorage()}.
* <p>
- * Since this operation involves DB updates and synchronous remote calls, it
- * must be invoked on a dedicated executor so that task runner and worker
sync
- * is not blocked.
+ * Since this operation is intended to be performed under one of activeTasks
hash segment locks, involves DB updates
+ * and synchronous remote calls, it must be invoked on a dedicated executor
so that task runner and worker sync
+ * are not blocked.
*
* @throws NullPointerException if task or status is null
* @throws IllegalArgumentException if the task ID does not match the status
ID
* @throws IllegalStateException if this queue is currently shut down
* @see #removeTaskInternal
*/
- private void notifyStatus(final TaskEntry entry, final TaskStatus
taskStatus, String reasonFormat, Object... args)
+ private void notifyCompletedStatus(
+ final TaskEntry entry,
+ final TaskStatus taskStatus,
+ String reasonFormat,
+ Object... args
+ )
{
// Don't do anything if the task has no entry in activeTasks
if (entry == null) {
return;
+ } else if (entry.isComplete) {
+ // A callback() or shutdown() beat us to updating the status and has
already cleaned up this task
Review Comment:
Let's log an info here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -1058,15 +1067,17 @@ private void validateTaskPayload(Task task)
String payload = passwordRedactingMapper.writeValueAsString(task);
if (config.getMaxTaskPayloadSize() != null &&
config.getMaxTaskPayloadSize().getBytesInInt() < payload.length()) {
throw InvalidInput.exception(
- "Task[%s] has payload of size[%d] but max allowed size is
[%d]. " +
- "Reduce the size of the task payload or increase
'druid.indexer.queue.maxTaskPayloadSize'.",
- task.getId(), payload.length(), config.getMaxTaskPayloadSize()
- );
+ "Task[%s] has payload of size[%d] but max allowed size is [%d]. " +
+ "Reduce the size of the task payload or increase
'druid.indexer.queue.maxTaskPayloadSize'.",
+ task.getId(), payload.length(), config.getMaxTaskPayloadSize()
+ );
} else if (payload.length() > TASK_SIZE_WARNING_THRESHOLD) {
log.warn(
- "Task[%s] of datasource[%s] has payload size[%d] larger than the
recommended maximum[%d]. " +
- "Large task payloads may cause stability issues in the
Overlord and may fail while persisting to the metadata store." +
- "Such tasks may be rejected by the Overlord in future Druid
versions.",
+ "Task[%s] of datasource[%s] has payload size[%d] larger than the
recommended maximum[%d]. "
+ +
+ "Large task payloads may cause stability issues in the Overlord
and may fail while persisting to the metadata store."
+ +
+ "Such tasks may be rejected by the Overlord in future Druid
versions.",
Review Comment:
There doesn't seem to be any change here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine. We'll come back and try to
- // shut it down again later in manageInternalPostCritical, once it's
removed from the "tasks" map.
+ // If task runner shutdown fails, continue with the task shutdown
routine.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
}
removeTaskLock(task);
requestManagement();
+
+ // Emit event and log
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+ emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+ );
+
+ if (taskStatus.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+ }
+ }
+
+ /**
+ * Handles updating a task's status and incrementing/decrementing the status
update accounting metrics
+ */
+ private void handleStatusWrapper(
+ final String taskId,
+ final TaskStatus status,
+ final String reasonFormat,
+ final Object... args
+ )
+ {
+ statusUpdatesInQueue.incrementAndGet();
+ startStopLock.readLock().lock();
+ try {
+ // If we're not supposed to be running anymore, don't do anything.
Somewhat racey if the flag gets set
+ // after we check and before we commit the database transaction, but
better than nothing.
+ if (!active) {
+ log.info("Abandoning task [%s] due to shutdown.", taskId);
+ return;
+ }
+
+ updateTaskEntry(
+ taskId,
+ entry -> notifyCompletedStatus(entry, status, reasonFormat, args)
+ );
+ }
+ catch (Exception e) {
+ log.makeAlert(e, "Failed to handle task status")
+ .addData("task", taskId)
+ .addData("statusCode", status.getStatusCode())
+ .emit();
+ }
+ finally {
+ startStopLock.readLock().unlock();
+ statusUpdatesInQueue.decrementAndGet();
+ handledStatusUpdates.incrementAndGet();
Review Comment:
These two should not be done here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -730,13 +724,66 @@ private void notifyStatus(final TaskEntry entry, final
TaskStatus taskStatus, St
taskRunner.shutdown(task.getId(), reasonFormat, args);
}
catch (Throwable e) {
- // If task runner shutdown fails, continue with the task shutdown
routine. We'll come back and try to
- // shut it down again later in manageInternalPostCritical, once it's
removed from the "tasks" map.
+ // If task runner shutdown fails, continue with the task shutdown
routine.
log.warn(e, "TaskRunner failed to cleanup task after completion: %s",
task.getId());
}
removeTaskLock(task);
requestManagement();
+
+ // Emit event and log
+ final ServiceMetricEvent.Builder builder = ServiceMetricEvent.builder();
+ IndexTaskUtils.setTaskStatusDimensions(builder, taskStatus);
+ emitter.emit(builder.setMetric("task/run/time", taskStatus.getDuration()));
+
+ log.info(
+ "Completed task[%s] with status[%s] in [%d]ms.",
+ task.getId(), taskStatus.getStatusCode(), taskStatus.getDuration()
+ );
+
+ if (taskStatus.isSuccess()) {
+ Counters.incrementAndGetLong(totalSuccessfulTaskCount,
getMetricKey(task));
+ } else {
+ Counters.incrementAndGetLong(totalFailedTaskCount, getMetricKey(task));
+ }
+ }
+
+ /**
+ * Handles updating a task's status and incrementing/decrementing the status
update accounting metrics
+ */
+ private void handleStatusWrapper(
+ final String taskId,
+ final TaskStatus status,
+ final String reasonFormat,
+ final Object... args
+ )
+ {
+ statusUpdatesInQueue.incrementAndGet();
Review Comment:
This should not be called here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -443,8 +447,13 @@ private void startPendingTaskOnRunner(TaskEntry entry,
ListenableFuture<TaskStat
e.getMessage()
);
}
+ statusUpdatesInQueue.incrementAndGet();
+
TaskStatus taskStatus = TaskStatus.failure(task.getId(),
errorMessage);
- notifyStatus(entry, taskStatus, taskStatus.getErrorMsg());
+ notifyCompletedStatus(entry, taskStatus, taskStatus.getErrorMsg());
+
+ statusUpdatesInQueue.decrementAndGet();
+ handledStatusUpdates.incrementAndGet();
Review Comment:
These two are not needed here.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -775,56 +825,15 @@ public void onFailure(final Throwable t)
.addData("type", task.getType())
.addData("dataSource", task.getDataSource())
.emit();
- statusUpdatesInQueue.incrementAndGet();
Review Comment:
We should continue to do this here.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -443,9 +461,8 @@ public void
testKilledTasksEmitRuntimeMetricWithHttpRemote() throws InterruptedE
// Verify that metrics are emitted on receiving announcement
serviceEmitter.verifyEmitted("task/run/time", 1);
- CoordinatorRunStats stats = taskQueue.getQueueStats();
- Assert.assertEquals(0L,
stats.get(Stats.TaskQueue.STATUS_UPDATES_IN_QUEUE));
- Assert.assertEquals(1L, stats.get(Stats.TaskQueue.HANDLED_STATUS_UPDATES));
+ verifyTaskStatusCounts(taskQueue, 0, 1);
+ verifyStatusUpdateCounts(taskQueue, 0, 2);
Review Comment:
To make this more readable, please invoke the verify methods separately for
each metric like:
```
verifyMetric(metricName, expectedValue);
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueConcurrencyTest.java:
##########
@@ -77,7 +80,20 @@ public void setUpIngestionTestBase() throws IOException
@Override
public ListenableFuture<TaskStatus> run(Task task)
{
- return Futures.immediateFuture(TaskStatus.success(task.getId()));
+ // Skip the initialization; we just need to simulate a delayed
future
+ Preconditions.checkArgument(task instanceof NoopTask, "task must
be an instance of NoopTask");
+ final SettableFuture<TaskStatus> future = SettableFuture.create();
+
+ taskExecutor.submit(() -> {
+ try {
+ TaskStatus status = ((NoopTask) task).runTask(null);
+ future.set(status);
+ }
+ catch (Exception e) {
+ future.setException(e);
+ }
+ });
+ return future;
Review Comment:
Why is this needed exactly?
If needed to test out some specific behaviour, we should add targeted test
cases,
where we just return an incomplete future here and decide in the test case
when
we want to complete that future.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -623,6 +640,25 @@ private HttpRemoteTaskRunner createHttpRemoteTaskRunner()
);
}
+ private static void verifyTaskStatusCounts(final TaskQueue taskQueue, int
successCount, int failureCount)
+ {
+ Assert.assertEquals(
+ successCount,
+
taskQueue.getSuccessfulTaskCount().values().stream().mapToLong(Long::longValue).sum()
+ );
+ Assert.assertEquals(
+ failureCount,
+
taskQueue.getFailedTaskCount().values().stream().mapToLong(Long::longValue).sum()
Review Comment:
It's better to keep these two assertions in separate methods as that would
make for more readable test cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]