This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 284d048e0be Fix TaskQueryTool.getAllActiveTasks() (#18854)
284d048e0be is described below

commit 284d048e0be7741b2fc3229857a4212d6cf63394
Author: Kashif Faraz <[email protected]>
AuthorDate: Thu Dec 18 21:39:15 2025 +0530

    Fix TaskQueryTool.getAllActiveTasks() (#18854)
---
 .../druid/indexing/overlord/TaskQueryTool.java     | 22 ++++++++----------
 .../apache/druid/indexing/overlord/TaskQueue.java  |  1 -
 .../compact/OverlordCompactionSchedulerTest.java   |  2 +-
 .../druid/indexing/overlord/TaskLifecycleTest.java |  2 +-
 .../overlord/http/OverlordResourceTest.java        | 27 +++++++++++++++++-----
 .../druid/indexing/overlord/http/OverlordTest.java |  2 +-
 6 files changed, 33 insertions(+), 23 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
index 77f9d25f607..9be83c597ff 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java
@@ -29,14 +29,12 @@ import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexer.TaskStatusPlus;
 import org.apache.druid.indexing.common.TaskLock;
 import org.apache.druid.indexing.common.task.Task;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.http.TaskStateLookup;
 import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.metadata.LockFilterPolicy;
 import org.apache.druid.metadata.TaskLookup;
 import org.apache.druid.metadata.TaskLookup.TaskLookupType;
@@ -60,20 +58,16 @@ import java.util.stream.Stream;
  */
 public class TaskQueryTool
 {
-  private static final Logger log = new Logger(TaskQueryTool.class);
-
   private final TaskStorage storage;
   private final GlobalTaskLockbox taskLockbox;
   private final TaskMaster taskMaster;
   private final Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier;
-  private final ProvisioningStrategy provisioningStrategy;
 
   @Inject
   public TaskQueryTool(
       TaskStorage storage,
       GlobalTaskLockbox taskLockbox,
       TaskMaster taskMaster,
-      ProvisioningStrategy provisioningStrategy,
       Supplier<WorkerBehaviorConfig> workerBehaviorConfigSupplier
   )
   {
@@ -81,7 +75,6 @@ public class TaskQueryTool
     this.taskLockbox = taskLockbox;
     this.taskMaster = taskMaster;
     this.workerBehaviorConfigSupplier = workerBehaviorConfigSupplier;
-    this.provisioningStrategy = provisioningStrategy;
   }
 
   /**
@@ -155,19 +148,22 @@ public class TaskQueryTool
     return storage.getTaskInfo(taskId);
   }
 
+  /**
+   * Retrieves all active tasks from the {@link TaskQueue} if available,
+   * otherwise from the metadata store.
+   */
   public List<TaskStatusPlus> getAllActiveTasks()
   {
     final Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
     if (taskQueue.isPresent()) {
-      // Serve active task statuses from memory
+      // Serve active task statuses from the TaskQueue memory
       final List<TaskStatusPlus> taskStatusPlusList = new ArrayList<>();
       final List<TaskInfo> activeTasks = taskQueue.get().getTaskInfos();
 
       for (TaskInfo taskInfo : activeTasks) {
         final Task task = taskInfo.getTask();
-        final Optional<TaskStatus> statusOptional = 
taskQueue.get().getTaskStatus(task.getId());
-        if (statusOptional.isPresent()) {
-          final TaskStatus status = statusOptional.get();
+        final TaskStatus status = taskInfo.getStatus();
+        if (status.isRunnable()) {
           taskStatusPlusList.add(
               new TaskStatusPlus(
                   task.getId(),
@@ -220,7 +216,7 @@ public class TaskQueryTool
     // This way, we can use the snapshot from taskStorage as the source of 
truth for the set of tasks to process
     // and use the snapshot from taskRunner as a reference for potential task 
state updates happened
     // after the first snapshotting.
-    Stream<TaskStatusPlus> taskStatusPlusStream = getTaskStatusPlusStream(
+    Stream<TaskStatusPlus> taskStatusPlusStream = 
retrieveTaskStatusesFromMetadataStore(
         state,
         dataSource,
         createdTimeDuration,
@@ -286,7 +282,7 @@ public class TaskQueryTool
     return taskStatuses;
   }
 
-  private Stream<TaskStatusPlus> getTaskStatusPlusStream(
+  private Stream<TaskStatusPlus> retrieveTaskStatusesFromMetadataStore(
       TaskStateLookup state,
       @Nullable String dataSource,
       Duration createdTimeDuration,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
index 6d056ca66d3..82f231e2ef6 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java
@@ -1044,7 +1044,6 @@ public class TaskQueue
   }
 
   /**
-   * Returns the list of currently active tasks for the given datasource.
    * List of all active and completed task infos currently being managed by 
this TaskQueue.
    */
   public List<TaskInfo> getTaskInfos()
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
index d154c32ce3e..b15f2f2b9b6 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/OverlordCompactionSchedulerTest.java
@@ -215,7 +215,7 @@ public class OverlordCompactionSchedulerTest
     scheduler = new OverlordCompactionScheduler(
         taskMaster,
         taskLockbox,
-        new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, () -> 
defaultWorkerConfig),
+        new TaskQueryTool(taskStorage, taskLockbox, taskMaster, () -> 
defaultWorkerConfig),
         segmentsMetadataManager,
         new SegmentsMetadataManagerConfig(null, null, null),
         () -> 
DruidCompactionConfig.empty().withClusterConfig(compactionConfig.get()),
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
index fd4673669e0..43434f77d3d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java
@@ -437,7 +437,7 @@ public class TaskLifecycleTest extends 
InitializedNullHandlingTest
     TaskMaster taskMaster = EasyMock.createMock(TaskMaster.class);
     
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.absent()).anyTimes();
     EasyMock.replay(taskMaster);
-    tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
+    tsqa = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
     return taskStorage;
   }
 
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
index e52255e5c70..288ae1bf387 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java
@@ -52,7 +52,6 @@ import org.apache.druid.indexing.overlord.TaskRunner;
 import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
 import org.apache.druid.indexing.overlord.TaskStorage;
 import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter;
-import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
 import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.Intervals;
@@ -109,7 +108,6 @@ public class OverlordResourceTest
   private TaskStorage taskStorage;
   private GlobalTaskLockbox taskLockbox;
   private JacksonConfigManager configManager;
-  private ProvisioningStrategy provisioningStrategy;
   private AuthConfig authConfig;
   private TaskQueryTool taskQueryTool;
   private IndexerMetadataStorageAdapter indexerMetadataStorageAdapter;
@@ -128,7 +126,6 @@ public class OverlordResourceTest
     taskRunner = EasyMock.createMock(TaskRunner.class);
     taskQueue = EasyMock.createStrictMock(TaskQueue.class);
     configManager = EasyMock.createMock(JacksonConfigManager.class);
-    provisioningStrategy = EasyMock.createMock(ProvisioningStrategy.class);
     authConfig = EasyMock.createMock(AuthConfig.class);
     overlord = EasyMock.createStrictMock(DruidOverlord.class);
     taskMaster = EasyMock.createStrictMock(TaskMaster.class);
@@ -138,7 +135,6 @@ public class OverlordResourceTest
         taskStorage,
         taskLockbox,
         taskMaster,
-        provisioningStrategy,
         () -> configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, 
WorkerBehaviorConfig.class).get()
     );
     indexerMetadataStorageAdapter = 
EasyMock.createStrictMock(IndexerMetadataStorageAdapter.class);
@@ -226,8 +222,7 @@ public class OverlordResourceTest
         workerTaskRunnerQueryAdapter,
         authConfig,
         configManager,
-        auditManager,
-        provisioningStrategy
+        auditManager
     );
   }
 
@@ -353,6 +348,26 @@ public class OverlordResourceTest
     Assert.assertEquals(tasksIds.get(1), responseObjects.get(0).getId());
   }
 
+  @Test
+  public void 
test_getAllActiveTasks_withTaskQueryTool_returnsRunningTasksOnly()
+  {
+    
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue));
+    EasyMock.expect(taskQueue.getTaskInfos()).andReturn(
+        List.of(
+            new TaskInfo(DateTimes.nowUtc(), TaskStatus.success("s"), new 
NoopTask("s", null, null, 1L, 0L, null)),
+            new TaskInfo(DateTimes.nowUtc(), TaskStatus.failure("f", ""), new 
NoopTask("f", null, null, 1L, 0L, null)),
+            new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r1"), new 
NoopTask("r1", null, null, 1L, 0L, null)),
+            new TaskInfo(DateTimes.nowUtc(), TaskStatus.running("r2"), new 
NoopTask("r2", null, null, 1L, 0L, null))
+        )
+    );
+
+    replayAll();
+
+    final List<TaskStatusPlus> activeTasks = taskQueryTool.getAllActiveTasks();
+    Assert.assertEquals(2, activeTasks.size());
+    Assert.assertTrue(activeTasks.stream().allMatch(status -> 
status.getStatusCode().equals(TaskState.RUNNING)));
+  }
+
   @Test
   public void testGetTasks()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
index 77c032d08b4..9852f897056 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordTest.java
@@ -283,7 +283,7 @@ public class OverlordTest
     Assert.assertEquals(Optional.absent(), overlord.getRedirectLocation());
 
     final TaskQueryTool taskQueryTool
-        = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null, null);
+        = new TaskQueryTool(taskStorage, taskLockbox, taskMaster, null);
     final WorkerTaskRunnerQueryAdapter workerTaskRunnerQueryAdapter
         = new WorkerTaskRunnerQueryAdapter(taskMaster, null);
     // Test Overlord resource stuff


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to