kfaraz commented on code in PR #17988:
URL: https://github.com/apache/druid/pull/17988#discussion_r2175558914


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,
         withinProportion, dataSource
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
-    int desiredActiveTaskCount;
-    int partitionCount = supervisor.getPartitionCount();
-    if (partitionCount <= 0) {
-      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+      return currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
+    } else if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+      return currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
+    }
+
+    return currentActiveTaskCount;
+  }
+
+
+  private int applyMinMaxChecks(int desiredActiveTaskCount, final int 
currentActiveTaskCount, final int partitionCount)
+  {
+    // for now, only attempt to scale to nearest factor for scale up

Review Comment:
   Yes, I agree that sometimes the scale down (or even scale up) can be too 
aggressive, which is why I mentioned that at every evaluation, we would need to 
check if the increase or decrease in lag is enough to justify changing the task 
count.
   
   The change can be especially drastic if someone decides to have a prime 
number of partitions! 😅 
   
   But having a mix of factors and fixed scale in/scale out step size is 
probably not the way to go either.
   Having an unequal number of partitions assigned to tasks is known to cause 
issues, which is why the factor-based task count was considered in the first 
place.
   
   @uds5501 is also working in this area to identify the causes of increase in 
lag when task count changes.
   In conjunction to that, it would also be nice to have some setup where we 
can compare multiple strategies and determine if one is empirically better than 
the other. It is difficult to gauge just by the theoretical distribution of 
partitions if one distribution would really perform better or worse than 
another in practice.
   
   Until then, I don't think we should rush the change in this patch since 
there are multiple factors at play here.
   
   @cryptoe , what are your thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to