kfaraz commented on code in PR #17988:
URL: https://github.com/apache/druid/pull/17988#discussion_r2095875080


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);

Review Comment:
   Nit: this log can go into the `factorize` method itself.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);

Review Comment:
   Maybe this should be an alert rather than a warn?
   
   ```suggestion
         log.warn("Supervisor[%s] has an invalid number of partitions[%d].", 
spec.getId(), partitionCount);
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,
         withinProportion, dataSource
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
-    int desiredActiveTaskCount;
-    int partitionCount = supervisor.getPartitionCount();
-    if (partitionCount <= 0) {
-      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+      return currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
+    } else if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+      return currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
+    }
+
+    return currentActiveTaskCount;
+  }
+
+
+  private int applyMinMaxChecks(int desiredActiveTaskCount, final int 
currentActiveTaskCount, final int partitionCount)
+  {
+    // for now, only attempt to scale to nearest factor for scale up
+    if (lagBasedAutoScalerConfig.getUseNearestFactorScaling() && 
desiredActiveTaskCount > currentActiveTaskCount) {
+      desiredActiveTaskCount = 
nearestPartitionCountFactor(desiredActiveTaskCount).orElse(desiredActiveTaskCount);
+    }
+
+    if (desiredActiveTaskCount == currentActiveTaskCount) {
+      log.debug("No change in task count for dataSource [%s].", dataSource);
       return -1;
     }
 
-    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
-      // Do Scale out
-      int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
-
-      int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
-      if (currentActiveTaskCount == actualTaskCountMax) {
-        log.debug("CurrentActiveTaskCount reached task count Max limit, 
skipping scale out action for dataSource [%s].",
-            dataSource
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at max task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
-      }
-      return desiredActiveTaskCount;
+    final int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+    final int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+
+    if (currentActiveTaskCount == actualTaskCountMax && currentActiveTaskCount 
< desiredActiveTaskCount) {
+      log.debug(
+          "CurrentActiveTaskCount reached task count Max limit, skipping scale 
out action for dataSource[%s].",
+          dataSource
+      );
+      emitSkipMetric("Already at max task count", desiredActiveTaskCount);
+      return -1;
+    } else if (currentActiveTaskCount == actualTaskCountMin && 
currentActiveTaskCount > desiredActiveTaskCount) {
+      log.debug(
+          "CurrentActiveTaskCount reached task count Min limit, skipping scale 
in action for dataSource[%s].",
+          dataSource
+      );
+      emitSkipMetric("Already at min task count", desiredActiveTaskCount);
+      return -1;
     }
 
-    if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
-      // Do Scale in
-      int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
-      if (currentActiveTaskCount == actualTaskCountMin) {
-        log.debug("CurrentActiveTaskCount reached task count Min limit, 
skipping scale in action for dataSource[%s].",
-            dataSource
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at min task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
+    desiredActiveTaskCount = Math.min(desiredActiveTaskCount, 
actualTaskCountMax);
+    desiredActiveTaskCount = Math.max(desiredActiveTaskCount, 
actualTaskCountMin);
+    return desiredActiveTaskCount;
+  }
+
+  private void emitSkipMetric(final String reason, final int taskCount)
+  {
+    emitter.emit(metricBuilder
+                     
.setDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, reason)
+                     
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
+  }
+
+  /*
+    Finds the next largest factor of the topic's partition count that is ≥ 
`desiredTaskCount`.
+    If no such factor exists, return an empty optional.
+  */
+  private Optional<Integer> nearestPartitionCountFactor(final int 
desiredTaskCount)
+  {
+    int lo = 0;
+    int hi = partitionFactors.size();
+    while (lo < hi) {
+      final int mid = lo + (hi - lo) / 2;
+      if (partitionFactors.get(mid) >= desiredTaskCount) {
+        hi = mid;
       } else {
-        desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
+        lo = mid + 1;
       }
-      return desiredActiveTaskCount;
     }
-    return -1;
+
+    final Optional<Integer> factor = lo < partitionFactors.size()
+                                     ? Optional.of(partitionFactors.get(lo))
+                                     : Optional.empty();
+    log.debug("Given desiredTaskCount=[%d], found nearest task count=[%s]", 
desiredTaskCount, factor);
+    return factor;
   }
 
   public LagBasedAutoScalerConfig getAutoScalerConfig()
   {
     return lagBasedAutoScalerConfig;
   }
+
+  private static ImmutableList<Integer> factorize(final int partitionCount)
+  {
+    final ImmutableList.Builder<Integer> factors = new 
ImmutableList.Builder<>();
+    for (int i = 1; i <= partitionCount; ++i) {

Review Comment:
   I feel it's cleaner have this method just take care of the factorization.
   Further filtration can be done by the caller as needed.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -51,6 +55,7 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
   private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
   private final ServiceEmitter emitter;
   private final ServiceMetricEvent.Builder metricBuilder;
+  private ImmutableList<Integer> partitionFactors;

Review Comment:
   As suggested in the other comment, this need not be cached and can be a 
local variable instead.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java:
##########
@@ -100,6 +102,7 @@ public LagBasedAutoScalerConfig(
     this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
     this.minTriggerScaleActionFrequencyMillis = 
minTriggerScaleActionFrequencyMillis
         != null ? minTriggerScaleActionFrequencyMillis : 600000;
+    this.useNearestFactorScaling = useNearestFactorScaling != null ? 
useNearestFactorScaling : false;

Review Comment:
   Nit: Probably more readable using `Configs.valueOrDefault()`



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -69,13 +74,15 @@ public LagBasedAutoScaler(
         .getLagCollectionIntervalMillis()) + 1;
     this.lagMetricsQueue = new CircularFifoQueue<>(slots);
     this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
-    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId)
+                                                            + 
"-Computation-%d");

Review Comment:
   Nit: Doesn't seem necessary since the previous line is also nearly as long.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,
         withinProportion, dataSource
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
-    int desiredActiveTaskCount;
-    int partitionCount = supervisor.getPartitionCount();
-    if (partitionCount <= 0) {
-      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+      return currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
+    } else if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+      return currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
+    }
+
+    return currentActiveTaskCount;
+  }
+
+
+  private int applyMinMaxChecks(int desiredActiveTaskCount, final int 
currentActiveTaskCount, final int partitionCount)
+  {
+    // for now, only attempt to scale to nearest factor for scale up
+    if (lagBasedAutoScalerConfig.getUseNearestFactorScaling() && 
desiredActiveTaskCount > currentActiveTaskCount) {
+      desiredActiveTaskCount = 
nearestPartitionCountFactor(desiredActiveTaskCount).orElse(desiredActiveTaskCount);
+    }
+
+    if (desiredActiveTaskCount == currentActiveTaskCount) {
+      log.debug("No change in task count for dataSource [%s].", dataSource);
       return -1;
     }
 
-    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
-      // Do Scale out
-      int taskCount = currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
-
-      int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
-      if (currentActiveTaskCount == actualTaskCountMax) {
-        log.debug("CurrentActiveTaskCount reached task count Max limit, 
skipping scale out action for dataSource [%s].",
-            dataSource
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at max task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
-      } else {
-        desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
-      }
-      return desiredActiveTaskCount;
+    final int actualTaskCountMax = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
+    final int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
+
+    if (currentActiveTaskCount == actualTaskCountMax && currentActiveTaskCount 
< desiredActiveTaskCount) {
+      log.debug(
+          "CurrentActiveTaskCount reached task count Max limit, skipping scale 
out action for dataSource[%s].",
+          dataSource
+      );
+      emitSkipMetric("Already at max task count", desiredActiveTaskCount);
+      return -1;
+    } else if (currentActiveTaskCount == actualTaskCountMin && 
currentActiveTaskCount > desiredActiveTaskCount) {
+      log.debug(
+          "CurrentActiveTaskCount reached task count Min limit, skipping scale 
in action for dataSource[%s].",
+          dataSource
+      );
+      emitSkipMetric("Already at min task count", desiredActiveTaskCount);
+      return -1;
     }
 
-    if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
-      // Do Scale in
-      int taskCount = currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
-      int actualTaskCountMin = 
Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
-      if (currentActiveTaskCount == actualTaskCountMin) {
-        log.debug("CurrentActiveTaskCount reached task count Min limit, 
skipping scale in action for dataSource[%s].",
-            dataSource
-        );
-        emitter.emit(metricBuilder
-                         .setDimension(
-                             
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
-                             "Already at min task count"
-                         )
-                         
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
-        return -1;
+    desiredActiveTaskCount = Math.min(desiredActiveTaskCount, 
actualTaskCountMax);
+    desiredActiveTaskCount = Math.max(desiredActiveTaskCount, 
actualTaskCountMin);
+    return desiredActiveTaskCount;
+  }
+
+  private void emitSkipMetric(final String reason, final int taskCount)
+  {
+    emitter.emit(metricBuilder
+                     
.setDimension(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION, reason)
+                     
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, 
taskCount));
+  }
+
+  /*
+    Finds the next largest factor of the topic's partition count that is ≥ 
`desiredTaskCount`.
+    If no such factor exists, return an empty optional.
+  */
+  private Optional<Integer> nearestPartitionCountFactor(final int 
desiredTaskCount)
+  {
+    int lo = 0;
+    int hi = partitionFactors.size();
+    while (lo < hi) {
+      final int mid = lo + (hi - lo) / 2;
+      if (partitionFactors.get(mid) >= desiredTaskCount) {
+        hi = mid;
       } else {
-        desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
+        lo = mid + 1;
       }
-      return desiredActiveTaskCount;
     }
-    return -1;
+
+    final Optional<Integer> factor = lo < partitionFactors.size()
+                                     ? Optional.of(partitionFactors.get(lo))
+                                     : Optional.empty();
+    log.debug("Given desiredTaskCount=[%d], found nearest task count=[%s]", 
desiredTaskCount, factor);
+    return factor;
   }
 
   public LagBasedAutoScalerConfig getAutoScalerConfig()
   {
     return lagBasedAutoScalerConfig;
   }
+
+  private static ImmutableList<Integer> factorize(final int partitionCount)
+  {
+    final ImmutableList.Builder<Integer> factors = new 
ImmutableList.Builder<>();
+    for (int i = 1; i <= partitionCount; ++i) {
+      if (partitionCount % i == 0) {

Review Comment:
   I haven't tried it out, but how does `%` operator behave with 0 and negative 
numbers?
   For completeness, probably best to have a condition at the top of this 
method to just return an empty in those cases.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,
         withinProportion, dataSource
     );
 
-    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
-    int desiredActiveTaskCount;
-    int partitionCount = supervisor.getPartitionCount();
-    if (partitionCount <= 0) {
-      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+    if (beyondProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+      return currentActiveTaskCount + 
lagBasedAutoScalerConfig.getScaleOutStep();
+    } else if (withinProportion >= 
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+      return currentActiveTaskCount - 
lagBasedAutoScalerConfig.getScaleInStep();
+    }
+
+    return currentActiveTaskCount;
+  }
+
+
+  private int applyMinMaxChecks(int desiredActiveTaskCount, final int 
currentActiveTaskCount, final int partitionCount)
+  {
+    // for now, only attempt to scale to nearest factor for scale up

Review Comment:
   We should do it for scale in too. It is okay since this is an experimental 
config anyway.
   The reason we need the nearest factor scaling is to ensure that the task 
count is _always_ a factor of the number of partitions.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,

Review Comment:
   ```suggestion
           "Calculated beyondProportion[%s] and withinProportion[%s] for 
supervisor[%s].", beyondProportion,
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {
+      log.debug("(Re)computing partitionCount factorization for 
partitionCount=[%d]", partitionCount);
+      partitionFactors = factorize(partitionCount);
+    }
+
+    Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors 
should not be empty");
+
+    final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags, 
currentActiveTaskCount);
+    return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount, 
partitionCount);
+  }
+
+  private int computeDesiredTaskCountHelper(final List<Long> lags, final int 
currentActiveTaskCount)
+  {
     int beyond = 0;
     int within = 0;
-    int metricsCount = lags.size();
-    for (Long lag : lags) {
+    final int metricsCount = lags.size();
+    for (final Long lag : lags) {
       if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
         beyond++;
       }
       if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
         within++;
       }
     }
-    double beyondProportion = beyond * 1.0 / metricsCount;
-    double withinProportion = within * 1.0 / metricsCount;
+    final double beyondProportion = beyond * 1.0 / metricsCount;
+    final double withinProportion = within * 1.0 / metricsCount;
 
-    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+    log.debug(
+        "Calculated beyondProportion is [%s] and withinProportion is [%s] for 
dataSource [%s].", beyondProportion,
         withinProportion, dataSource

Review Comment:
   ```suggestion
           withinProportion, spec.getId()
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
    * @param lags the lag metrics of Stream(Kafka/Kinesis)
    * @return Integer. target number of tasksCount, -1 means skip scale action.
    */
-  private int computeDesiredTaskCount(List<Long> lags)
+  @VisibleForTesting
+  int computeDesiredTaskCount(List<Long> lags)
   {
-    // if supervisor is not suspended, ensure required tasks are running
-    // if suspended, ensure tasks have been requested to gracefully stop
     log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    final int partitionCount = supervisor.getPartitionCount();
+    if (partitionCount <= 0) {
+      log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+      return -1;
+    }
+
+    // Cache the factorization in an immutable list for quick lookup later
+    // Partition counts *can* change externally without a new instance of this 
class being created
+    if (partitionFactors.isEmpty() || partitionCount != 
partitionFactors.get(partitionFactors.size() - 1)) {

Review Comment:
   If the computation is fast enough, do we even need to cache it?
   Even if the partition count was 1k, the computation would finish in some µs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to