kfaraz commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1776262379
##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,8 @@ The following configs only apply if the Overlord is
running in remote mode. For
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to
send tasks to. The version number is a string. This affects the expected
behavior during certain operations like comparison against
`druid.worker.version`. Specifically, the version comparison follows dictionary
order. Use ISO8601 date format for the version to accommodate date comparisons.
|"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots
available for parallel indexing supervisor tasks per worker. The specified
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`|A map where each key is a task type
(`String`), and the corresponding value represents the absolute limit on the
number of task slots that tasks of this type can occupy on a worker. The value
is an `Integer` that is greater than or equal to 0. For example, a value of 5
means that tasks of this type can occupy up to 5 task slots on a worker. If
both absolute and ratio limits are specified for the same task type, the
effective limit will be the smaller of the absolute limit and the limit derived
from the corresponding ratio. `taskSlotLimits = {"index_parallel": 3,
"query_controller": 5}`. In this example, parallel indexing tasks can occupy up
to 3 task slots, and query controllers can occupy up to 5 task slots.|Empty map|
Review Comment:
Taking another look at this, I don't see this property being very useful or
intuitive.
If someone wants to limit the number of tasks of a given type to be 1 per
worker, they can already use availability groups. I don't see why someone would
want to limit the number of tasks of a certain type on a worker to be exactly,
say 2.
I think the ratio is a more useful setting. So for the time being, we should
just have `taskSlotRatiosPerWorker`. If, in the future, we feel the need of
such a property, we can add it back.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +270,79 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean hasSufficientWorkerCapacity(Task task)
+ {
+ int capacityRemaining = worker.getCapacity() - getCurrCapacityUsed();
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+ return capacityRemaining >= requiredCapacity;
+ }
+
+ private boolean isAvailabilityGroupAvailable(Task task)
+ {
+ return
!getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup());
+ }
+
+ private boolean canRunTaskBasedOnCustomLimit(Task task, Map<String, Integer>
limitsMap, Map<String, Double> ratiosMap)
Review Comment:
This method will become simpler once we remove the absolute limits.
##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,8 @@ The following configs only apply if the Overlord is
running in remote mode. For
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to
send tasks to. The version number is a string. This affects the expected
behavior during certain operations like comparison against
`druid.worker.version`. Specifically, the version comparison follows dictionary
order. Use ISO8601 date format for the version to accommodate date comparisons.
|"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots
available for parallel indexing supervisor tasks per worker. The specified
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`|A map where each key is a task type
(`String`), and the corresponding value represents the absolute limit on the
number of task slots that tasks of this type can occupy on a worker. The value
is an `Integer` that is greater than or equal to 0. For example, a value of 5
means that tasks of this type can occupy up to 5 task slots on a worker. If
both absolute and ratio limits are specified for the same task type, the
effective limit will be the smaller of the absolute limit and the limit derived
from the corresponding ratio. `taskSlotLimits = {"index_parallel": 3,
"query_controller": 5}`. In this example, parallel indexing tasks can occupy up
to 3 task slots, and query controllers can occupy up to 5 task slots.|Empty map|
+|`druid.indexer.runner.taskSlotRatios`|A map where each key is a task type
(`String`), and the corresponding value is a `Double` which should be in the
range [0, 1], representing the ratio of available task slots that tasks of this
type can occupy. This ratio defines the proportion of total task slots a task
type can use, calculated as `ratio * totalSlots`. If both absolute and ratio
limits are specified for the same task type, the effective limit will be the
smaller of the absolute limit and the limit derived from the corresponding
ratio. `taskSlotRatios = {"index_parallel": 0.5, "query_controller": 0.25}`. In
this example, parallel indexing tasks can occupy up to 50% of the total task
slots, and query controllers can occupy up to 25% of the total task
slots.|Empty map|
Review Comment:
```suggestion
|`druid.indexer.runner.taskSlotRatiosPerWorker`|Map from task type to the
ratio of task slots that the given task type can occupy on a single worker.
Each value must be in the range [0, 1]. A value of 0 for a task type means that
it can take up no task slot on a worker (i.e. tasks of this type would never
run). A value of 1 means that the task type may take up all task slots of a
worker if available. For `index_parallel` tasks, the
`parallelIndexTaskSlotRatio` overrides the value specified in this map. |Empty
map i.e. no limit on task slots for any task type|
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +270,79 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean hasSufficientWorkerCapacity(Task task)
+ {
+ int capacityRemaining = worker.getCapacity() - getCurrCapacityUsed();
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+ return capacityRemaining >= requiredCapacity;
+ }
+
+ private boolean isAvailabilityGroupAvailable(Task task)
+ {
+ return
!getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup());
+ }
+
+ private boolean canRunTaskBasedOnCustomLimit(Task task, Map<String, Integer>
limitsMap, Map<String, Double> ratiosMap)
+ {
+ final Integer limit = getLimitForTask(task.getType(), limitsMap,
ratiosMap);
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ int currentCapacityUsed =
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0);
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+
+ return hasCapacityBasedOnLimit(limit, currentCapacityUsed,
requiredCapacity);
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Integer> limitsMap,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Integer absoluteLimit = limitsMap.get(taskType);
+ Double ratioLimit = ratiosMap.get(taskType);
+
+ if (absoluteLimit == null && ratioLimit == null) {
+ return null;
+ }
+
+ // Validate the absolute limit if present
+ if (absoluteLimit != null) {
+ Preconditions.checkArgument(absoluteLimit >= 0, "Absolute limit for task
%s must be non-negative.", taskType);
+ }
+
+ // Validate the ratio limit if present
+ if (ratioLimit != null) {
+ Preconditions.checkArgument(ratioLimit >= 0.0 && ratioLimit <= 1.0,
+ "Ratio for task %s must be between 0.0 and
1.0 inclusive.", taskType
+ );
+ }
+
+ final int totalCapacity = worker.getCapacity();
+
+ Integer ratioBasedLimit = ratioLimit != null ?
calculateTaskCapacityFromRatio(ratioLimit, totalCapacity) : null;
+
+ if (absoluteLimit != null && ratioBasedLimit != null) {
+ return Math.min(absoluteLimit, ratioBasedLimit);
+ }
+
+ return absoluteLimit != null ? absoluteLimit : ratioBasedLimit;
+ }
+
+ private boolean hasCapacityBasedOnLimit(int limit, int currentCapacityUsed,
int requiredCapacity)
Review Comment:
This method is not needed, it can be inlined.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +270,79 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean hasSufficientWorkerCapacity(Task task)
+ {
+ int capacityRemaining = worker.getCapacity() - getCurrCapacityUsed();
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+ return capacityRemaining >= requiredCapacity;
+ }
+
+ private boolean isAvailabilityGroupAvailable(Task task)
+ {
+ return
!getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup());
+ }
Review Comment:
These two methods are not really needed. We can just continue with the
original code where these checks were performed inline in the method
`canRunTask`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -48,4 +56,36 @@ public double getParallelIndexTaskSlotRatio()
{
return parallelIndexTaskSlotRatio;
}
+
+ /**
+ * Returns a map where each key is a task type (`String`), and the value is
an `Integer`
+ * representing the absolute limit on the number of task slots that tasks of
this type can occupy.
+ * <p>
+ * This absolute limit specifies the maximum number of task slots available
to a specific task type.
+ * <p>
+ * If both an absolute limit and a ratio (from {@link #getTaskSlotRatios()})
are specified for the same task type,
+ * the effective limit will be the smaller of the two.
+ *
+ * @return A map of task types with their corresponding absolute slot limits.
+ */
+ public Map<String, Integer> getTaskSlotLimits()
+ {
+ return taskSlotLimits;
+ }
+
+ /**
+ * Returns a map where each key is a task type (`String`), and the value is
a `Double` which should be in the
+ * range [0, 1], representing the ratio of available task slots that tasks
of this type can occupy.
+ * <p>
+ * This ratio defines the proportion of total task slots a task type can
use, calculated as `ratio * totalSlots`.
+ * <p>
+ * If both a ratio and an absolute limit (from {@link #getTaskSlotLimits()})
are specified for the same task type,
+ * the effective limit will be the smaller of the two.
+ *
+ * @return A map of task types with their corresponding slot ratios.
Review Comment:
```suggestion
* Map from task type to task slot ratio for that type per worker.
* A value of 0 for a task type indicates that the task type cannot occupy
any slots.
* A value of 1 indicates that the task type may take up all available
slots of a worker if available.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -195,13 +222,31 @@ public boolean isValidVersion(String minVersion)
return worker.getVersion().compareTo(minVersion) >= 0;
}
- public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * returns true only if:
+ * <ul>
+ * <li>The worker has sufficient capacity to handle the task.</li>
+ * <li>The task is of the parallel index type and can operate within the
permitted ratio of slots designated for that task type.</li>
+ * <li>The the task can run under custom-defined limits for its type,
+ * such as a maximum number of tasks allowed or a ratio of slots the task
type can occupy.</li>
+ * <li>The availability group of the task is currently available.</li>
+ * </ul>
+ */
+ public boolean canRunTask(Task task, WorkerTaskRunnerConfig config)
{
- return (worker.getCapacity() - getCurrCapacityUsed() >=
task.getTaskResource().getRequiredCapacity()
- && canRunParallelIndexTask(task, parallelIndexTaskSlotRatio)
- &&
!getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup()));
+ return (hasSufficientWorkerCapacity(task)
+ && canRunParallelIndexTask(task,
config.getParallelIndexTaskSlotRatio())
+ && canRunTaskBasedOnCustomLimit(task, config.getTaskSlotLimits(),
config.getTaskSlotRatios())
+ && isAvailabilityGroupAvailable(task));
}
+
+ /**
+ * To be removed in future as it could be fully covered with the limits map.
+ */
Review Comment:
I don't think this is needed, especially since we have not deprecated the
old property yet.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +270,79 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean hasSufficientWorkerCapacity(Task task)
+ {
+ int capacityRemaining = worker.getCapacity() - getCurrCapacityUsed();
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+ return capacityRemaining >= requiredCapacity;
+ }
+
+ private boolean isAvailabilityGroupAvailable(Task task)
+ {
+ return
!getAvailabilityGroups().contains(task.getTaskResource().getAvailabilityGroup());
+ }
+
+ private boolean canRunTaskBasedOnCustomLimit(Task task, Map<String, Integer>
limitsMap, Map<String, Double> ratiosMap)
+ {
+ final Integer limit = getLimitForTask(task.getType(), limitsMap,
ratiosMap);
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ int currentCapacityUsed =
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0);
+ int requiredCapacity = task.getTaskResource().getRequiredCapacity();
+
+ return hasCapacityBasedOnLimit(limit, currentCapacityUsed,
requiredCapacity);
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Integer> limitsMap,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Integer absoluteLimit = limitsMap.get(taskType);
+ Double ratioLimit = ratiosMap.get(taskType);
+
+ if (absoluteLimit == null && ratioLimit == null) {
+ return null;
+ }
+
+ // Validate the absolute limit if present
+ if (absoluteLimit != null) {
+ Preconditions.checkArgument(absoluteLimit >= 0, "Absolute limit for task
%s must be non-negative.", taskType);
+ }
+
+ // Validate the ratio limit if present
+ if (ratioLimit != null) {
+ Preconditions.checkArgument(ratioLimit >= 0.0 && ratioLimit <= 1.0,
+ "Ratio for task %s must be between 0.0 and
1.0 inclusive.", taskType
Review Comment:
This validation should happen in `WorkerTaskRunnerConfig` itself.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -260,12 +381,20 @@ public boolean equals(Object o)
: that.blacklistedUntil != null);
}
+ public Map<String, Integer> incrementTypeSpecificCapacity(String type, int
capacityToAdd)
Review Comment:
```suggestion
public Map<String, Integer> incrementCurrCapacityUsedByTaskType(String
taskType, int capacityToAdd)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]