kfaraz commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1773405124
##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is
running in remote mode. For
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to
send tasks to. The version number is a string. This affects the expected
behavior during certain operations like comparison against
`druid.worker.version`. Specifically, the version comparison follows dictionary
order. Use ISO8601 date format for the version to accommodate date comparisons.
|"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots
available for parallel indexing supervisor tasks per worker. The specified
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type,
and the corresponding value represents the limit on the number of task slots
that a task of that type can occupy on a worker. The key is a `String` that
specifies the task type. The value can either be a Double or Integer. A
`Double` in the range [0, 1], representing a ratio of the available task slots
that tasks of this type can occupy. For example, a value of 0.5 means that
tasks of this type can occupy up to 50% of the task slots on a worker. A value
of 0 means that tasks of this type can occupy no slots. A value of 1.0 means no
restriction, allowing tasks of this type to occupy all available slots. An
`Integer` that is greater than or equal to 0, representing an absolute limit on
the number of task slots that tasks of this type can occupy. For example, a
value of 5 means that tasks of this type can occupy up to 5 task slots on a
worker. `taskSlotLimits = {"index_parallel": 0.5, "query_controller": 3}`.
In this example 'index_parallel' tasks can occupy up to 50% of task slots and
'query_controller' can occupy up to 3 task slots |Empty map|
Review Comment:
I don't think it is intuitive to put absolute limits and ratios in the same
property.
It would be better to have two separate properties.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -29,6 +34,10 @@ public class WorkerTaskRunnerConfig
@JsonProperty
private double parallelIndexTaskSlotRatio = 1;
+ @JsonProperty
+ @JsonDeserialize(using = TaskSlotLimitsDeserializer.class)
+ private Map<String, Number> taskSlotLimits = new HashMap<>();
Review Comment:
Let's not try to merge task slot ratios and absolute limits into one field.
They are best kept separate.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * This method checks:
+ * <ul>
+ * <li>Whether the worker has sufficient capacity to handle the task.</li>
+ * <li>Whether the task can run under custom-defined limits for its type,
+ * such as a maximum number of tasks allowed or a ratio of slots the task
type can occupy.</li>
+ * <li>Whether the availability group of the task is currently
available.</li>
+ * </ul>
+ *
+ * @param task The {@link Task} to be executed. The task contains details
such as required capacity
+ * and its type.
+ * @param taskLimits A map containing custom limits for different task
types. The key is a string
+ * representing the task type, and the value is a {@link
Number} which can be:
+ * <ul>
+ * <li>A {@code Double} representing a ratio of
available slots the task type can use (0 to 1).</li>
+ * <li>An {@code Integer} representing an absolute limit
of slots the task type can occupy.</li>
+ * </ul>
+ * If the task type is not present in this map, the task
can use all available slots.
+ * @return {@code true} if the task can run, meaning the worker has
sufficient capacity,
+ * the task type does not exceed custom limits, and the task's
availability group is available.
+ * Returns {@code false} otherwise.
Review Comment:
this is not really needed, seems too verbose.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -42,9 +46,11 @@
@PublicApi
public class ImmutableWorkerInfo
{
+ private static final Logger logger = new Logger(ImmutableWorkerInfo.class);
Review Comment:
We should not be doing any logging in this class.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * This method checks:
+ * <ul>
+ * <li>Whether the worker has sufficient capacity to handle the task.</li>
+ * <li>Whether the task can run under custom-defined limits for its type,
+ * such as a maximum number of tasks allowed or a ratio of slots the task
type can occupy.</li>
+ * <li>Whether the availability group of the task is currently
available.</li>
+ * </ul>
+ *
+ * @param task The {@link Task} to be executed. The task contains details
such as required capacity
+ * and its type.
+ * @param taskLimits A map containing custom limits for different task
types. The key is a string
+ * representing the task type, and the value is a {@link
Number} which can be:
+ * <ul>
+ * <li>A {@code Double} representing a ratio of
available slots the task type can use (0 to 1).</li>
+ * <li>An {@code Integer} representing an absolute limit
of slots the task type can occupy.</li>
+ * </ul>
+ * If the task type is not present in this map, the task
can use all available slots.
+ * @return {@code true} if the task can run, meaning the worker has
sufficient capacity,
+ * the task type does not exceed custom limits, and the task's
availability group is available.
+ * Returns {@code false} otherwise.
+ */
+ public boolean canRunTask(Task task, Map<String, Number> taskLimits)
Review Comment:
We don't need to have two separate `canRunTask` methods. You can just pass
`WorkerTaskRunnerConfig` instead of `parallelIndexTaskSlotRatio` into the
existing `canRunTask` method.
Once we do that, some of the new private methods will go away, and simplify
the change set.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -260,12 +399,24 @@ public boolean equals(Object o)
: that.blacklistedUntil != null);
}
+ public Map<String, Integer> incrementTypeSpecificCapacity(String type, int
capacityToAdd)
+ {
+ Map<String, Integer> result = new HashMap<>(typeSpecificCapacityMap);
+ if (result.containsKey(type)) {
+ result.put(type, result.get(type) + capacityToAdd);
+ } else {
+ result.put(type, capacityToAdd);
+ }
Review Comment:
maybe simplified with `result.merge()`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * This method checks:
Review Comment:
```suggestion
* This method returns true only if:
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -48,4 +57,46 @@ public double getParallelIndexTaskSlotRatio()
{
return parallelIndexTaskSlotRatio;
}
+
+ /**
+ * The `taskSlotLimits` configuration is a map where each key is a task type,
+ * and the corresponding value represents the limit on the number of task
slots
+ * that a task of that type can occupy on a worker.
+ * <p>
+ * The key is a `String` that specifies the task type.
+ * The value can either be a Double or Integer:
+ * <p>
+ * 1. A `Double` in the range [0, 1], representing a ratio of the available
task slots
+ * that tasks of this type can occupy. For example, a value of 0.5 means
that tasks
+ * of this type can occupy up to 50% of the task slots on a worker.
+ * A value of 0 means that tasks of this type can occupy no slots (i.e.,
they are effectively disabled).
+ * A value of 1.0 means no restriction, allowing tasks of this type to
occupy all available slots.
+ * <p>
+ * 2. An `Integer` that is greater than or equal to 0, representing an
absolute limit
+ * on the number of task slots that tasks of this type can occupy. For
example, a value of 5
+ * means that tasks of this type can occupy up to 5 task slots on a worker.
+ * <p>
+ * If a task type is not present in the `taskSlotLimits` map, there is no
restriction
+ * on the number of task slots it can occupy, meaning it can use all
available slots.
+ * <p>
+ * Example:
+ * <p>
+ * taskSlotLimits = {
+ * "index_parallel": 0.5, // 'index_parallel' can occupy up to 50% of task
slots
+ * "query_controller": 3 // 'query_controller' can occupy up to 3 task
slots
+ * }
+ * <p>
+ * This configuration allows for granular control over the allocation of
task slots
+ * based on the specific needs of different task types, helping to prevent
any one type
+ * of task from monopolizing worker resources and reducing the risk of
deadlocks.
+ *
+ * @return A map where the key is the task type (`String`), and the value is
either a `Double` (0 to 1)
+ * representing the ratio of task slots available for that type, or an
`Integer` (>= 0)
+ * representing the absolute limit of task slots for that type. If a task
type is absent,
+ * it is not limited in terms of the number of task slots it can occupy.
+ */
Review Comment:
Please try to simplify and shorten this javadoc retaining only the necessary
information in a concise manner.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/util/TaskSlotLimitsDeserializer.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.overlord.util;
+
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.JsonDeserializer;
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+public class TaskSlotLimitsDeserializer extends JsonDeserializer<Map<String,
Number>>
Review Comment:
This should not be needed once task slot ratio and task slot limits are
separated.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java:
##########
@@ -201,6 +211,7 @@ public void testEqualsAndSerde()
),
3,
0,
+ new HashMap<>(),
Review Comment:
Use `Collections.emptyMap()` instead.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -57,6 +63,7 @@ public ImmutableWorkerInfo(
@JsonProperty("worker") Worker worker,
@JsonProperty("currCapacityUsed") int currCapacityUsed,
@JsonProperty("currParallelIndexCapacityUsed") int
currParallelIndexCapacityUsed,
+ @JsonProperty("currTypeSpecificCapacityUsed") @Nullable Map<String,
Integer> typeSpecificCapacityMap,
Review Comment:
Please use the same name for the argument, the JSON serialized field and the
class member for readability.
Also, this name is a little ambiguous, use something else like
`currCapacityUsedByTaskType`.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +253,114 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * This method checks:
+ * <ul>
+ * <li>Whether the worker has sufficient capacity to handle the task.</li>
Review Comment:
```suggestion
* <li>The worker has sufficient capacity to handle the task.</li>
```
##########
docs/configuration/index.md:
##########
@@ -1135,6 +1135,7 @@ The following configs only apply if the Overlord is
running in remote mode. For
|`druid.indexer.runner.taskAssignmentTimeout`|How long to wait after a task
has been assigned to a Middle Manager before throwing an error.|`PT5M`|
|`druid.indexer.runner.minWorkerVersion`|The minimum Middle Manager version to
send tasks to. The version number is a string. This affects the expected
behavior during certain operations like comparison against
`druid.worker.version`. Specifically, the version comparison follows dictionary
order. Use ISO8601 date format for the version to accommodate date comparisons.
|"0"|
| `druid.indexer.runner.parallelIndexTaskSlotRatio`| The ratio of task slots
available for parallel indexing supervisor tasks per worker. The specified
value must be in the range `[0, 1]`. |1|
+|`druid.indexer.runner.taskSlotLimits`| A map where each key is a task type,
and the corresponding value represents the limit on the number of task slots
that a task of that type can occupy on a worker. The key is a `String` that
specifies the task type. The value can either be a Double or Integer. A
`Double` in the range [0, 1], representing a ratio of the available task slots
that tasks of this type can occupy. An `Integer` that is greater than or equal
to 0, representing an absolute limit on the number of task slots that tasks of
this type can occupy.|Empty map|
Review Comment:
I think the limit on compaction tasks (or kill tasks for that matter) should
not be a concern.
This is a runtime property, typically controlled by an admin.
So, if an admin wants to restrict the number of concurrent compaction tasks,
it is fair to honor that irrespective of the value of `compactionTaskSlotRatio`
or `maxCompactionTaskSlots` set in the coordinator dynamic configs.
We just need to call it out clearly in the release notes and the docs of the
new property.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ZkWorker.java:
##########
@@ -167,6 +167,29 @@ private int getCurrParallelIndexCapacityUsed(Map<String,
TaskAnnouncement> tasks
return currParallelIndexCapacityUsed;
}
+ @JsonProperty("currTypeSpecificCapacityUsed")
+ public Map<String, Integer> getCurrTypeSpecificCapacityUsed()
Review Comment:
Yes, ZK-based task runner is deprecated and we should not support the new
feature with ZK.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]