nozjkoitop commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1778188074


##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig 
config)
+  {
+    if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) && 
config.getParallelIndexTaskSlotRatio() != 1) {

Review Comment:
   Nice catch, thanks



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig 
config)
+  {
+    if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) && 
config.getParallelIndexTaskSlotRatio() != 1) {
+      return 
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) - 
getCurrParallelIndexCapacityUsed()
+             >= task.getTaskResource().getRequiredCapacity();
+    }
+
+    final Integer limit = getLimitForTask(task.getType(), 
config.getTaskSlotRatiosPerWorker());
+
+    if (limit == null) {
+      return true; // No limit specified, so task can run
+    }
+
+    return limit - 
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >= 
task.getTaskResource()
+                                                                               
           .getRequiredCapacity();
+  }
+
+  private Integer getLimitForTask(
+      String taskType,
+      Map<String, Double> ratiosMap
+  )
+  {
+    Double ratioLimit = ratiosMap.get(taskType);
+
+    if (ratioLimit == null) {
+      return null;
+    }
+
+    return calculateTaskCapacityFromRatio(ratioLimit, worker.getCapacity());

Review Comment:
   Done



##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double 
parallelIndexTaskSlotRatio)
     return workerParallelIndexCapacity;
   }
 
+  private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig 
config)
+  {
+    if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) && 
config.getParallelIndexTaskSlotRatio() != 1) {
+      return 
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) - 
getCurrParallelIndexCapacityUsed()
+             >= task.getTaskResource().getRequiredCapacity();
+    }
+
+    final Integer limit = getLimitForTask(task.getType(), 
config.getTaskSlotRatiosPerWorker());
+
+    if (limit == null) {
+      return true; // No limit specified, so task can run
+    }
+
+    return limit - 
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >= 
task.getTaskResource()
+                                                                               
           .getRequiredCapacity();
+  }
+
+  private Integer getLimitForTask(
+      String taskType,
+      Map<String, Double> ratiosMap
+  )
+  {
+    Double ratioLimit = ratiosMap.get(taskType);

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to