jtuglu-netflix commented on code in PR #17988:
URL: https://github.com/apache/druid/pull/17988#discussion_r2175466168
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java:
##########
@@ -200,84 +209,141 @@ private Runnable computeAndCollectLag()
* @param lags the lag metrics of Stream(Kafka/Kinesis)
* @return Integer. target number of tasksCount, -1 means skip scale action.
*/
- private int computeDesiredTaskCount(List<Long> lags)
+ @VisibleForTesting
+ int computeDesiredTaskCount(List<Long> lags)
{
- // if supervisor is not suspended, ensure required tasks are running
- // if suspended, ensure tasks have been requested to gracefully stop
log.debug("Computing desired task count for [%s], based on following lags
: [%s]", dataSource, lags);
+ final int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+ final int partitionCount = supervisor.getPartitionCount();
+ if (partitionCount <= 0) {
+ log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
+ return -1;
+ }
+
+ // Cache the factorization in an immutable list for quick lookup later
+ // Partition counts *can* change externally without a new instance of this
class being created
+ if (partitionFactors.isEmpty() || partitionCount !=
partitionFactors.get(partitionFactors.size() - 1)) {
Review Comment:
I don't really like the idea of doing repeated work in a method that could
be called on an arbitrary period specified by the user (we might call this 4
times a minute, etc.).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]