gianm commented on code in PR #18936:
URL: https://github.com/apache/druid/pull/18936#discussion_r2722096099
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -230,32 +240,77 @@ int computeOptimalTaskCount(CostMetrics metrics)
}
/**
- * Generates valid task counts based on partitions-per-task ratios.
+ * Generates valid task counts based on partitions-per-task ratios and
lag-driven PPT relaxation.
* This enables gradual scaling and avoids large jumps.
* Limits the range of task counts considered to avoid excessive computation.
*
* @return sorted list of valid task counts within bounds
*/
- static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+ static int[] computeValidTaskCounts(
+ int partitionCount,
+ int currentTaskCount,
+ double aggregateLag,
+ int taskCountMax
+ )
{
- if (partitionCount <= 0) {
+ if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) {
return new int[]{};
}
Set<Integer> result = new HashSet<>();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+ final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+ aggregateLag,
+ partitionCount,
+ currentTaskCount,
+ taskCountMax
+ );
+ final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
+
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
- final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+ final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
effectiveMaxIncrease);
final int maxPartitionsPerTask = Math.min(
partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
);
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
- result.add(taskCount);
+ if (taskCount <= taskCountMax) {
+ result.add(taskCount);
+ }
+ }
+ return result.stream().mapToInt(Integer::intValue).sorted().toArray();
+ }
+
+ /**
+ * Computes extra allowed increase in partitions-per-task in scenarios when
the average
+ * per-partition lag is relatively high.
Review Comment:
Ideally nothing needs to be configured although I also feel that it may be
useful for now to allow the lag threshold to be configured. The reason is that
it's expressed in number of messages, but in practice, processing time of a
message can vary quite a lot. Messages can be 100 bytes and processed very
quickly, or they can be 10KB and processed more slowly. The difference between
these two extremes is 100x so one number does not fit all. A lag of 100,000
messages could represent less than a second of processing, or could represent a
minute of processing.
Ultimately I think we want the decision-making to be based on the system's
assessment of how many messages it can process per unit of time. But that
doesn't need to be done now. Probably for now we can set the initial thresholds
assuming some medium rate of processing, like assuming we can process 5,000 or
10,000 messages per second per task.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]