nozjkoitop commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1778188074
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
Review Comment:
Nice catch, thanks
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
+ return
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) -
getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+ }
+
+ final Integer limit = getLimitForTask(task.getType(),
config.getTaskSlotRatiosPerWorker());
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ return limit -
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >=
task.getTaskResource()
+
.getRequiredCapacity();
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Double ratioLimit = ratiosMap.get(taskType);
+
+ if (ratioLimit == null) {
+ return null;
+ }
+
+ return calculateTaskCapacityFromRatio(ratioLimit, worker.getCapacity());
Review Comment:
Done
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
+ return
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) -
getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+ }
+
+ final Integer limit = getLimitForTask(task.getType(),
config.getTaskSlotRatiosPerWorker());
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ return limit -
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >=
task.getTaskResource()
+
.getRequiredCapacity();
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Double ratioLimit = ratiosMap.get(taskType);
Review Comment:
Renamed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]