abhishekrb19 commented on code in PR #19203:
URL: https://github.com/apache/druid/pull/19203#discussion_r2992890446


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -417,8 +418,14 @@ private void startPendingTasksOnRunner()
     log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].", 
unknownTaskIds.size(), unknownTaskIds);
 
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final String queuedTaskId : List.copyOf(activeTasks.keySet())) {
+    // Copy tasks list, as notifyStatus may modify it. Sort by priority 
(highest first) so that
+    // higher-priority tasks are submitted to the runner before lower-priority 
ones.
+    final List<String> queuedTaskIds = new ArrayList<>(activeTasks.keySet());
+    queuedTaskIds.sort(Comparator.comparingInt(id -> {
+      final TaskEntry entry = activeTasks.get(id);
+      return entry != null ? -entry.getTask().getPriority() : 
Tasks.DEFAULT_TASK_PRIORITY;

Review Comment:
   Will entry ever be null if operating on the same snapshot?



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueue.java:
##########
@@ -417,8 +418,14 @@ private void startPendingTasksOnRunner()
     log.info("Notified task runner to clean up [%,d] tasks with IDs[%s].", 
unknownTaskIds.size(), unknownTaskIds);
 
     // Attain futures for all active tasks (assuming they are ready to run).
-    // Copy tasks list, as notifyStatus may modify it.
-    for (final String queuedTaskId : List.copyOf(activeTasks.keySet())) {
+    // Copy tasks list, as notifyStatus may modify it. Sort by priority 
(highest first) so that
+    // higher-priority tasks are submitted to the runner before lower-priority 
ones.
+    final List<String> queuedTaskIds = new ArrayList<>(activeTasks.keySet());
+    queuedTaskIds.sort(Comparator.comparingInt(id -> {
+      final TaskEntry entry = activeTasks.get(id);

Review Comment:
   This code is directly looking up `activeTasks` even though we copy things 
above as the comment notes: `Copy tasks list, as notifyStatus may modify it`.
   
   For stronger consistency, should we instead take a snapshot of `activeTasks` 
and use it consistently for both `queuedTaskIds` and this comparator?



##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskQueueTest.java:
##########
@@ -751,6 +753,41 @@ public void 
testTaskWaitingTimeMetricEmittedForMultipleTasks() throws Exception
     serviceEmitter.verifyEmitted("task/run/time", 3);
   }
 
+  @Test
+  public void testTaskSubmissionToTaskRunnerBasedOnPriority() throws Exception
+  {
+    final RecordingTaskRunner recordingRunner = new 
RecordingTaskRunner(serviceEmitter);
+    final TaskQueue priorityQueue = new TaskQueue(
+        new TaskLockConfig(),
+        new TaskQueueConfig(10, null, null, null, null, null),
+        new DefaultTaskConfig(),
+        getTaskStorage(),
+        recordingRunner,
+        actionClientFactory,
+        getLockbox(),
+        serviceEmitter,
+        getObjectMapper(),
+        new NoopTaskContextEnricher()
+    );
+    priorityQueue.setActive(true);
+
+    final NoopTask lowPriority = NoopTask.ofPriority(10);
+    final NoopTask highPriority = NoopTask.ofPriority(100);

Review Comment:
   Perhaps it'll be good to have multiple high and low priorities and validate 
that all the high priority tasks get submitted before the medium and low 
priority ones (even if it's non-deterministic order within the same group)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to