gianm commented on code in PR #18936:
URL: https://github.com/apache/druid/pull/18936#discussion_r2720649222


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -57,6 +57,11 @@ public class CostBasedAutoScaler implements 
SupervisorTaskAutoScaler
 
   private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
   private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = 
MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
+  private static final int LAG_STEP = 100_000;

Review Comment:
   There's a lot of magic-looking constants here. Please add javadocs to these 
constants describing what they do, and consider whether any of them should be 
configurable to allow for experimentation without changing the code. (Of 
course, ideally, users do not need to configure anything.)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -230,32 +240,77 @@ int computeOptimalTaskCount(CostMetrics metrics)
   }
 
   /**
-   * Generates valid task counts based on partitions-per-task ratios.
+   * Generates valid task counts based on partitions-per-task ratios and 
lag-driven PPT relaxation.
    * This enables gradual scaling and avoids large jumps.
    * Limits the range of task counts considered to avoid excessive computation.
    *
    * @return sorted list of valid task counts within bounds
    */
-  static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+  static int[] computeValidTaskCounts(
+      int partitionCount,
+      int currentTaskCount,
+      double aggregateLag,
+      int taskCountMax
+  )
   {
-    if (partitionCount <= 0) {
+    if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) {
       return new int[]{};
     }
 
     Set<Integer> result = new HashSet<>();
     final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+    final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+        aggregateLag,
+        partitionCount,
+        currentTaskCount,
+        taskCountMax
+    );
+    final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
+
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
-    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
effectiveMaxIncrease);
     final int maxPartitionsPerTask = Math.min(
         partitionCount,
         currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
     );
 
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
-      result.add(taskCount);
+      if (taskCount <= taskCountMax) {
+        result.add(taskCount);
+      }
+    }
+    return result.stream().mapToInt(Integer::intValue).sorted().toArray();

Review Comment:
   Could use an `IntSet` for `result` to avoid the boxing and stream usage. (It 
has a method that directly returns an `int[]`)



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -3461,6 +3461,7 @@ void maybeScaleDuringTaskRollover()
       if (rolloverTaskCount > 0) {
         log.info("Autoscaler recommends scaling down to [%d] tasks during 
rollover", rolloverTaskCount);
         changeTaskCountInIOConfig(rolloverTaskCount);
+        autoScalerConfig.setTaskCountStart(rolloverTaskCount);

Review Comment:
   What's this needed for? I recall in the discussion on #18745 we ended up 
deciding that `taskCountStart` would be immutable, and the autoscaler would 
change only `taskCount`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to