pjain1 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r580202114
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +324,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (autoScalerConfig != null && nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [%s]. Defined
minTriggerDynamicFrequency is [%s] for dataSource [%s], CALM DOWN NOW !",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource);
+ return;
+ }
+
+ Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
+
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
Review comment:
```suggestion
if (autoScalerConfig == null) {
log.warn("autoScalerConfig is null but dynamic allocation notice is
submitted, how can it be ?");
} else {
try {
long nowTime = System.currentTimeMillis();
if (spec.isSuspended()) {
log.info("Skipping DynamicAllocationTasksNotice execution
because [%s] supervisor is suspended",
dataSource
);
return;
}
log.debug("PendingCompletionTaskGroups is [%s] for dataSource
[%s]", pendingCompletionTaskGroups,
dataSource
);
for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
if (!list.isEmpty()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution for
datasource [%s] because following tasks are pending [%s]",
dataSource, pendingCompletionTaskGroups
);
return;
}
}
if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d]
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource
);
return;
}
Integer desriedTaskCount = scaleAction.call();
boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
} catch (Exception ex) {
log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
}
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]