kfaraz commented on code in PR #16889:
URL: https://github.com/apache/druid/pull/16889#discussion_r1777026242
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -29,14 +34,17 @@ public class WorkerTaskRunnerConfig
@JsonProperty
private double parallelIndexTaskSlotRatio = 1;
+ @JsonProperty
+ private Map<String, @Min(value = 0, message = "Task slot ratio for must be
at least 0.") @Max(value = 1, message = "Task slot ratio cannot be greater than
1") Double> taskSlotRatiosPerWorker = new HashMap<>();
Review Comment:
Maybe get rid of the Min/Max annotations here (as it somewhat compromises
readability) and move the validation to the constructor (you would need to add
a new constructor), to adhere to the style used in the rest of the Druid
codebase.
##########
processing/src/main/java/org/apache/druid/guice/JsonConfigurator.java:
##########
@@ -206,6 +206,32 @@ public Message apply(String input)
return config;
}
+ // To have a detailed error message instead of noSuchFieldException if
problematic field is in the superclass
Review Comment:
This is a core class that is used in a lot of places. Let's not change it as
a part of this PR.
If this change is required, it can be in a separate PR.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
+ return
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) -
getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+ }
+
+ final Integer limit = getLimitForTask(task.getType(),
config.getTaskSlotRatiosPerWorker());
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ return limit -
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >=
task.getTaskResource()
+
.getRequiredCapacity();
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Double ratioLimit = ratiosMap.get(taskType);
Review Comment:
```suggestion
Double taskSlotRatio = ratiosMap.get(taskType);
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -160,6 +180,12 @@ public int getCurrParallelIndexCapacityUsed()
return currParallelIndexCapacityUsed;
}
+ @JsonProperty("currCapacityUsedByTaskType")
+ public Map<String, Integer> getCurrCapacityUsedByTaskType()
Review Comment:
Please mark this method as `@Nullable`.
##########
indexing-service/src/test/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfoTest.java:
##########
@@ -225,11 +239,14 @@ public void test_canRunTask()
new Worker("http", "testWorker2", "192.0.0.1", 10, "v1",
WorkerConfig.DEFAULT_CATEGORY),
6,
0,
+ Collections.emptyMap(),
ImmutableSet.of("grp1", "grp2"),
ImmutableSet.of("task1", "task2"),
DateTimes.of("2015-01-01T01:01:02Z")
);
+ WorkerTaskRunnerConfig config = mock(WorkerTaskRunnerConfig.class);
Review Comment:
Please use a concrete config object instead of a mock.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -195,21 +221,24 @@ public boolean isValidVersion(String minVersion)
return worker.getVersion().compareTo(minVersion) >= 0;
}
- public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * returns true only if:
Review Comment:
```suggestion
* Returns true only if:
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -195,21 +221,24 @@ public boolean isValidVersion(String minVersion)
return worker.getVersion().compareTo(minVersion) >= 0;
}
- public boolean canRunTask(Task task, double parallelIndexTaskSlotRatio)
+ /**
+ * Determines if a specific task can be executed on the worker based on
+ * various capacity, custom limits, and availability conditions.
+ * <p>
+ * returns true only if:
+ * <ul>
+ * <li>The worker has sufficient capacity to handle the task.</li>
+ * <li>The task is of the parallel index type and can operate within the
permitted ratio of slots designated for that task type.</li>
+ * <li>The the task can run under custom-defined limits for its type,
+ * such as a maximum number of tasks allowed or a ratio of slots the task
type can occupy.</li>
Review Comment:
```suggestion
* <li>Task slot ratio for this task type has not been exhausted on this
worker.</li>
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
+ return
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) -
getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+ }
+
+ final Integer limit = getLimitForTask(task.getType(),
config.getTaskSlotRatiosPerWorker());
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ return limit -
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >=
task.getTaskResource()
+
.getRequiredCapacity();
+ }
+
+ private Integer getLimitForTask(
+ String taskType,
+ Map<String, Double> ratiosMap
+ )
+ {
+ Double ratioLimit = ratiosMap.get(taskType);
+
+ if (ratioLimit == null) {
+ return null;
+ }
+
+ return calculateTaskCapacityFromRatio(ratioLimit, worker.getCapacity());
Review Comment:
Method `calculateTaskCapacityFromRatio` is not required, it can be inlined.
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
Review Comment:
```suggestion
if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() < 1) {
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/ImmutableWorkerInfo.java:
##########
@@ -225,6 +254,43 @@ private int getWorkerParallelIndexCapacity(double
parallelIndexTaskSlotRatio)
return workerParallelIndexCapacity;
}
+ private boolean canRunTaskBasedOnLimit(Task task, WorkerTaskRunnerConfig
config)
+ {
+ if (task.getType().equals(ParallelIndexSupervisorTask.TYPE) &&
config.getParallelIndexTaskSlotRatio() != 1) {
+ return
getWorkerParallelIndexCapacity(config.getParallelIndexTaskSlotRatio()) -
getCurrParallelIndexCapacityUsed()
+ >= task.getTaskResource().getRequiredCapacity();
+ }
+
+ final Integer limit = getLimitForTask(task.getType(),
config.getTaskSlotRatiosPerWorker());
+
+ if (limit == null) {
+ return true; // No limit specified, so task can run
+ }
+
+ return limit -
getCurrCapacityUsedByTaskType().getOrDefault(task.getType(), 0) >=
task.getTaskResource()
+
.getRequiredCapacity();
+ }
+
+ private Integer getLimitForTask(
Review Comment:
```suggestion
private Integer getMaxCapacityForTaskType(
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/WorkerTaskRunnerConfig.java:
##########
@@ -29,14 +34,17 @@ public class WorkerTaskRunnerConfig
@JsonProperty
private double parallelIndexTaskSlotRatio = 1;
+ @JsonProperty
+ private Map<String, @Min(value = 0, message = "Task slot ratio for must be
at least 0.") @Max(value = 1, message = "Task slot ratio cannot be greater than
1") Double> taskSlotRatiosPerWorker = new HashMap<>();
Review Comment:
Please also add a unit test for the validation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]