bsyk commented on code in PR #17988:
URL: https://github.com/apache/druid/pull/17988#discussion_r2176201417
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
* @param lags the lag metrics of Stream(Kafka/Kinesis)
* @return Integer. target number of tasksCount, -1 means skip scale action.
*/
- private int computeDesiredTaskCount(List<Long> lags)
+ @VisibleForTesting
+ int computeDesiredTaskCount(List<Long> lags)
{
- // if supervisor is not suspended, ensure required tasks are running
- // if suspended, ensure tasks have been requested to gracefully stop
log.debug("Computing desired task count for [%s], based on following lags
: [%s]", dataSource, lags);
+ final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+ final int partitionCount = supervisor.getPartitionCount();
+ if (partitionCount <= 0) {
+ log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ return -1;
+ }
+
+ // Cache the factorization in an immutable list for quick lookup later
+ // Partition counts *can* change externally without a new instance of this
class being created
+ if (partitionFactors.isEmpty() || partitionCount !=
partitionFactors.get(partitionFactors.size() - 1)) {
+ log.debug("(Re)computing partitionCount factorization for
partitionCount=[%d]", partitionCount);
+ partitionFactors = factorize(partitionCount);
+ }
+
+ Preconditions.checkState(!partitionFactors.isEmpty(), "partitionFactors
should not be empty");
+
+ final int desiredActiveTaskCount = computeDesiredTaskCountHelper(lags,
currentActiveTaskCount);
+ return applyMinMaxChecks(desiredActiveTaskCount, currentActiveTaskCount,
partitionCount);
+ }
+
+ private int computeDesiredTaskCountHelper(final List<Long> lags, final int
currentActiveTaskCount)
+ {
int beyond = 0;
int within = 0;
- int metricsCount = lags.size();
- for (Long lag : lags) {
+ final int metricsCount = lags.size();
+ for (final Long lag : lags) {
if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
beyond++;
}
if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
within++;
}
}
- double beyondProportion = beyond * 1.0 / metricsCount;
- double withinProportion = within * 1.0 / metricsCount;
+ final double beyondProportion = beyond * 1.0 / metricsCount;
+ final double withinProportion = within * 1.0 / metricsCount;
- log.debug("Calculated beyondProportion is [%s] and withinProportion is
[%s] for dataSource [%s].", beyondProportion,
+ log.debug(
+ "Calculated beyondProportion is [%s] and withinProportion is [%s] for
dataSource [%s].", beyondProportion,
withinProportion, dataSource
);
- int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
- int desiredActiveTaskCount;
- int partitionCount = supervisor.getPartitionCount();
- if (partitionCount <= 0) {
- log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ if (beyondProportion >=
lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
+ return currentActiveTaskCount +
lagBasedAutoScalerConfig.getScaleOutStep();
+ } else if (withinProportion >=
lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
+ return currentActiveTaskCount -
lagBasedAutoScalerConfig.getScaleInStep();
+ }
+
+ return currentActiveTaskCount;
+ }
+
+
+ private int applyMinMaxChecks(int desiredActiveTaskCount, final int
currentActiveTaskCount, final int partitionCount)
+ {
+ // for now, only attempt to scale to nearest factor for scale up
Review Comment:
> This is because you risk scaling down more than you need to
If we scale to the next largest _above_ the desired, we won't scale down too
far. And we'd protect some indexers from having more partitions assigned than
others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]