Fly-Style commented on code in PR #18936:
URL: https://github.com/apache/druid/pull/18936#discussion_r2721909828


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java:
##########
@@ -230,32 +240,77 @@ int computeOptimalTaskCount(CostMetrics metrics)
   }
 
   /**
-   * Generates valid task counts based on partitions-per-task ratios.
+   * Generates valid task counts based on partitions-per-task ratios and 
lag-driven PPT relaxation.
    * This enables gradual scaling and avoids large jumps.
    * Limits the range of task counts considered to avoid excessive computation.
    *
    * @return sorted list of valid task counts within bounds
    */
-  static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
+  static int[] computeValidTaskCounts(
+      int partitionCount,
+      int currentTaskCount,
+      double aggregateLag,
+      int taskCountMax
+  )
   {
-    if (partitionCount <= 0) {
+    if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) {
       return new int[]{};
     }
 
     Set<Integer> result = new HashSet<>();
     final int currentPartitionsPerTask = partitionCount / currentTaskCount;
+    final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
+        aggregateLag,
+        partitionCount,
+        currentTaskCount,
+        taskCountMax
+    );
+    final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + 
extraIncrease;
+
     // Minimum partitions per task correspond to the maximum number of tasks 
(scale up) and vice versa.
-    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
MAX_INCREASE_IN_PARTITIONS_PER_TASK);
+    final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 
effectiveMaxIncrease);
     final int maxPartitionsPerTask = Math.min(
         partitionCount,
         currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
     );
 
     for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= 
minPartitionsPerTask; partitionsPerTask--) {
       final int taskCount = (partitionCount + partitionsPerTask - 1) / 
partitionsPerTask;
-      result.add(taskCount);
+      if (taskCount <= taskCountMax) {
+        result.add(taskCount);
+      }
+    }
+    return result.stream().mapToInt(Integer::intValue).sorted().toArray();
+  }
+
+  /**
+   * Computes extra allowed increase in partitions-per-task in scenarios when 
the average
+   * per-partition lag is relatively high.

Review Comment:
   Eventually, we may make those constants configurable, but now they are baked 
only by my observations as 'good' defaults.
   
   I thought even more and I make some constants configurable (like lag 
threshold) even in the current PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to