capistrant commented on code in PR #18936:
URL: https://github.com/apache/druid/pull/18936#discussion_r2721573924
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -230,32 +240,77 @@ int computeOptimalTaskCount(CostMetrics metrics)
}
/**
- * Generates valid task counts based on partitions-per-task ratios.
+ * Generates valid task counts based on partitions-per-task ratios and
lag-driven PPT relaxation.
* This enables gradual scaling and avoids large jumps.
* Limits the range of task counts considered to avoid excessive computation.
*
* @return sorted list of valid task counts within bounds
*/
- static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+ static int[] computeValidTaskCounts(
+ int partitionCount,
+ int currentTaskCount,
+ double aggregateLag,
+ int taskCountMax
+ )
{
- if (partitionCount <= 0) {
+ if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) {
return new int[]{};
}
Set<Integer> result = new HashSet<>();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+ final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+ aggregateLag,
+ partitionCount,
+ currentTaskCount,
+ taskCountMax
+ );
+ final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK +
extraIncrease;
+
// Minimum partitions per task correspond to the maximum number of tasks
(scale up) and vice versa.
- final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+ final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask -
effectiveMaxIncrease);
final int maxPartitionsPerTask = Math.min(
partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
);
for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >=
minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) /
partitionsPerTask;
- result.add(taskCount);
+ if (taskCount <= taskCountMax) {
+ result.add(taskCount);
+ }
+ }
+ return result.stream().mapToInt(Integer::intValue).sorted().toArray();
+ }
+
+ /**
+ * Computes extra allowed increase in partitions-per-task in scenarios when
the average
+ * per-partition lag is relatively high.
Review Comment:
relatively high compared to what? I think I'm failing to grasp how the
constants for lag thresholds can be so generally applicable that we don't need
to open them up to configuration. I think this idea of allowing for a burst in
PPT scale factor is reasonable, but is this specific impl coming from tuning
for too specific of a case? I think this goes along with Gian's question on the
constants above.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java:
##########
@@ -106,10 +110,14 @@ public CostResult computeCost(CostMetrics metrics, int
proposedTaskCount, CostBa
/**
- * Estimates the idle ratio for a given task count using a capacity-based
linear model.
+ * Estimates the idle ratio for a proposed task count.
+ * Includes lag-based adjustment to eliminate high lag and
Review Comment:
I think here again I am wondering about how generally applicable a constant
for per partition lag in the real world. Same/similar questions as from the PPT
scale limits computed in `CostBasedAutoScaler`.
Also, in addition to the above, I think adding in this lag consideration
does add some complexity here. Mainly it generally starts us down the path of
making the cost function harder to easily and quickly understand for a
newcomer, IMO. If this added complexity is considered a negative or "cost", the
positive of improved behavior should outweigh it. So I guess that begs the
question, how did we or are we going to measure the improvement that this
additional logic/computation provides?
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java:
##########
@@ -29,7 +29,11 @@
public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);
-
+ private static final double HIHG_LAG_SCALE_FACTOR = 100_000.0;
Review Comment:
is this supposed to be `HIGH` or am I unfamiliar with what `HIHG` may stand
for in math terms? I tried to google it in case it is an abbreviation I don't
know, but didn't find anything so I tend to think maybe just misspelled?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]