This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 284d048e0be Fix TaskQueryTool.getAllActiveTasks() (#18854)
284d048e0be is described below
commit 284d048e0be7741b2fc3229857a4212d6cf63394
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 18 21:39:15 2025 +0530
Fix TaskQueryTool.getAllActiveTasks() (#18854)
---
.../druid/indexing/overlord/TaskQueryTool.java | 22 ++++++++----------
.../apache/druid/indexing/overlord/TaskQueue.java | 1 -
.../compact/OverlordCompactionSchedulerTest.java | 2 +-
.../druid/indexing/overlord/TaskLifecycleTest.java | 2 +-
.../overlord/http/OverlordResourceTest.java | 27 +++++++++++++++++-----
.../druid/indexing/overlord/http/OverlordTest.java | 2 +-
6 files changed, 33 insertions(+), 23 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 77f9d25f607..9be83c597ff 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -29,14 +29,12 @@ import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.TaskLock;
import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -60,20 +58,16 @@ import java.util.stream.Stream;
*/
public class TaskQueryTool
{
- private static final Logger log = new Logger(TaskQueryTool.class);
-
private final TaskStorage storage;
private final GlobalTaskLockbox taskLockbox;
private final TaskMaster taskMaster;
private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier;
- private final ProvisioningStrategy provisioningStrategy;
@Inject
public TaskQueryTool(
TaskStorage storage,
GlobalTaskLockbox taskLockbox,
TaskMaster taskMaster,
- ProvisioningStrategy provisioningStrategy,
Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier
)
{
@@ -81,7 +75,6 @@ public class TaskQueryTool
this.taskLockbox = taskLockbox;
this.taskMaster = taskMaster;
this.workerBehaviorConfigSupplier = workerBehaviorConfigSupplier;
- this.provisioningStrategy = provisioningStrategy;
}
/**
@@ -155,19 +148,22 @@ public class TaskQueryTool
return storage.getTaskInfo(taskId);
}
+ /**
+ * Retrieves all active tasks from the {@link TaskQueue} if available,
+ * otherwise from the metadata store.
+ */
public List<TaskStatusPlus> getAllActiveTasks()
{
final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
- // Serve active task statuses from memory
+ // Serve active task statuses from the TaskQueue memory
final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
final List<TaskInfo> activeTasks = taskQueue.get().getTaskInfos();
for (TaskInfo taskInfo : activeTasks) {
final Task task = taskInfo.getTask();
- final Optional<TaskStatus> statusOptional =
taskQueue.get().getTaskStatus(task.getId());
- if (statusOptional.isPresent()) {
- final TaskStatus status = statusOptional.get();
+ final TaskStatus status = taskInfo.getStatus();
+ if (status.isRunnable()) {
taskStatusPlusList.add(
new TaskStatusPlus(
task.getId(),
@@ -220,7 +216,7 @@ public class TaskQueryTool
// This way, we can use the snapshot from taskStorage as the source of
truth for the set of tasks to process
// and use the snapshot from taskRunner as a reference for potential task
state updates happened
// after the first snapshotting.
- Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusStream(
+ Stream<TaskStatusPlus> taskStatusPlusStream =
retrieveTaskStatusesFromMetadataStore(
state,
dataSource,
createdTimeDuration,
@@ -286,7 +282,7 @@ public class TaskQueryTool
return taskStatuses;
}
- private Stream<TaskStatusPlus> getTaskStatusPlusStream(
+ private Stream<TaskStatusPlus> retrieveTaskStatusesFromMetadataStore(
TaskStateLookup state,
@Nullable String dataSource,
Duration createdTimeDuration,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 6d056ca66d3..82f231e2ef6 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -1044,7 +1044,6 @@ public class TaskQueue
}
/**
- * Returns the list of currently active tasks for the given datasource.
* List of all active and completed task infos currently being managed by
this TaskQueue.
*/
public List<TaskInfo> getTaskInfos()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index d154c32ce3e..b15f2f2b9b6 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -215,7 +215,7 @@ public class OverlordCompactionSchedulerTest
scheduler = new OverlordCompactionScheduler(
taskMaster,
taskLockbox,
- new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, () ->
defaultWorkerConfig),
+ new TaskQueryTool(taskStorage, taskLockbox, taskMaster, () ->
defaultWorkerConfig),
segmentsMetadataManager,
new SegmentsMetadataManagerConfig(null, null, null),
() ->
DruidCompactionConfig.empty().withClusterConfig(compactionConfig.get()),
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index fd4673669e0..43434f77d3d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -437,7 +437,7 @@ public class TaskLifecycleTest extends
InitializedNullHandlingTest
TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
- tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
+ tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
return taskStorage;
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e52255e5c70..288ae1bf387 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -52,7 +52,6 @@ import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.Intervals;
@@ -109,7 +108,6 @@ public class OverlordResourceTest
private TaskStorage taskStorage;
private GlobalTaskLockbox taskLockbox;
private JacksonConfigManager configManager;
- private ProvisioningStrategy provisioningStrategy;
private AuthConfig authConfig;
private TaskQueryTool taskQueryTool;
private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@@ -128,7 +126,6 @@ public class OverlordResourceTest
taskRunner = EasyMock.createMock(TaskRunner.class);
taskQueue = EasyMock.createStrictMock(TaskQueue.class);
configManager = EasyMock.createMock(JacksonConfigManager.class);
- provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
authConfig = EasyMock.createMock(AuthConfig.class);
overlord = EasyMock.createStrictMock(DruidOverlord.class);
taskMaster = EasyMock.createStrictMock(TaskMaster.class);
@@ -138,7 +135,6 @@ public class OverlordResourceTest
taskStorage,
taskLockbox,
taskMaster,
- provisioningStrategy,
() -> configManager.watch(WorkerBehaviorConfig.CONFIG_KEY,
WorkerBehaviorConfig.class).get()
);
indexerMetadataStorageAdapter =
EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -226,8 +222,7 @@ public class OverlordResourceTest
workerTaskRunnerQueryAdapter,
authConfig,
configManager,
- auditManager,
- provisioningStrategy
+ auditManager
);
}
@@ -353,6 +348,26 @@ public class OverlordResourceTest
Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
}
+ @Test
+ public void
test_getAllActiveTasks_withTaskQueryTool_returnsRunningTasksOnly()
+ {
+
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+ EasyMock.expect(taskQueue.getTaskInfos()).andReturn(
+ List.of(
+ new TaskInfo(DateTimes.nowUtc(), TaskStatus.success("s"), new
NoopTask("s", null, null, 1L, 0L, null)),
+ new TaskInfo(DateTimes.nowUtc(), TaskStatus.failure("f", ""), new
NoopTask("f", null, null, 1L, 0L, null)),
+ new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r1"), new
NoopTask("r1", null, null, 1L, 0L, null)),
+ new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r2"), new
NoopTask("r2", null, null, 1L, 0L, null))
+ )
+ );
+
+ replayAll();
+
+ final List<TaskStatusPlus> activeTasks = taskQueryTool.getAllActiveTasks();
+ Assert.assertEquals(2, activeTasks.size());
+ Assert.assertTrue(activeTasks.stream().allMatch(status ->
status.getStatusCode().equals(TaskState.RUNNING)));
+ }
+
@Test
public void testGetTasks()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 77c032d08b4..9852f897056 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -283,7 +283,7 @@ public class OverlordTest
Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation());
final TaskQueryTool taskQueryTool
- = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
+ = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
= new WorkerTaskRunnerQueryAdapter(taskMaster, null);
// Test Overlord resource stuff
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]