gianm commented on code in PR #15724:
URL: https://github.com/apache/druid/pull/15724#discussion_r1496214158
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/http/OverlordResource.java:
##########
@@ -469,9 +469,15 @@ public Response getMultipleTaskStatuses(Set<String>
taskIds)
return Response.status(Response.Status.BAD_REQUEST).entity("No TaskIds
provided.").build();
}
+ final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Map<String, TaskStatus> result =
Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
- Optional<TaskStatus> optional =
taskStorageQueryAdapter.getStatus(taskId);
+ final Optional<TaskStatus> optional;
+ if (taskQueue.isPresent()) {
Review Comment:
If the task is not present in the `taskQueue`, we should then check the
`taskStorageQueryAdapter`. It might be already complete.
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/client/IndexerWorkerManagerClient.java:
##########
@@ -65,10 +65,13 @@ public Map<String, TaskStatus> statuses(Set<String> taskIds)
@Override
public TaskLocation location(String workerId)
{
- final TaskStatusResponse response =
FutureUtils.getUnchecked(overlordClient.taskStatus(workerId), true);
+ final TaskStatus response = FutureUtils.getUnchecked(
+ overlordClient.taskStatuses(ImmutableSet.of(workerId)),
+ true
+ ).get(workerId);
- if (response.getStatus() != null) {
- return response.getStatus().getLocation();
Review Comment:
We need the same change in `SpecificTaskServiceLocator`, to use
`taskStatuses` instead of `taskStatus`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -894,7 +885,12 @@ public TaskLocation getTaskLocation(final String id)
@Override
public Optional<TaskStatus> getTaskStatus(String id)
{
- return taskStorage.getStatus(id);
+ final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
Review Comment:
If the task is not present in the `taskQueue`, we should then check the
`taskStorageQueryAdapter`. It might be already complete.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -671,7 +672,7 @@ private void notifyStatus(final Task task, final TaskStatus
taskStatus, String r
// Save status to metadata store first, so if we crash while doing the
rest of the shutdown, our successor
// remembers that this task has completed.
try {
- final Optional<TaskStatus> previousStatus =
taskStorage.getStatus(task.getId());
+ final Optional<TaskStatus> previousStatus = getTaskStatus(task.getId());
Review Comment:
This really should use the metadata store. The code block is only called
when a task completes, and we need to check to make sure the metadata store has
the correct status stored.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]