pjain1 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r582669862
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lag points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ if (autoScalerConfig == null) {
+ log.warn("autoScalerConfig is null but dynamic allocation notice is
submitted, how can it be ?");
+ } else {
+ try {
+ long nowTime = System.currentTimeMillis();
+ if (spec.isSuspended()) {
+ log.info("Skipping DynamicAllocationTasksNotice execution because
[%s] supervisor is suspended",
+ dataSource
+ );
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]",
pendingCompletionTaskGroups,
+ dataSource
+ );
+ for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
+ if (!list.isEmpty()) {
+ log.info(
+ "Skipping DynamicAllocationTasksNotice execution for
datasource [%s] because following tasks are pending [%s]",
+ dataSource, pendingCompletionTaskGroups
+ );
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+ log.info(
+ "DynamicAllocationTasksNotice submitted again in [%d]
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+ nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+ );
+ return;
+ }
+ final Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in
the next 'RunNotice'.
+ * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and
sync it to MetadataStorage.
+ * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting the
supervisor.
+ * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+ * @return Boolean flag indicating if scale action was executed or not. If
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next
'changeTaskCount'.
+ * If false, it will do 'changeTaskCount' again after
'scaleActionPeriodMillis' millis.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean changeTaskCount(Integer desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ int currentActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (desiredActiveTaskCount == -1 || desiredActiveTaskCount ==
currentActiveTaskCount) {
+ return false;
+ } else {
+ log.info(
+ "Starting scale action, current active task count is [%d] and
desired task count is [%d] for dataSource [%s].",
+ currentActiveTaskCount, desiredActiveTaskCount, dataSource
+ );
+ gracefulShutdownInternal();
+ changeTaskCountInIOConfig(desiredActiveTaskCount);
+ clearAllocationInfo();
+ log.info("Changed taskCount to [%s] for dataSource [%s].",
desiredActiveTaskCount, dataSource);
+ return true;
+ }
+ }
+
+ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
+ {
+ ioConfig.setTaskCount(desiredActiveTaskCount);
+ try {
+ Optional<SupervisorManager> supervisorManager =
taskMaster.getSupervisorManager();
+ if (supervisorManager.isPresent()) {
+ MetadataSupervisorManager metadataSupervisorManager =
supervisorManager.get().getMetadataSupervisorManager();
+ metadataSupervisorManager.insert(dataSource, spec);
+ } else {
+ log.error("supervisorManager is null in taskMaster, skipping scale
action for dataSource [%s].", dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error("supervisorManager is null in taskMaster, skipping scale
action for dataSource [%s].", dataSource);
Review comment:
```suggestion
log.error("Failed to sync taskCount to MetaStorage for dataSource
[%s].", dataSource);
```
probably you copied the above one by mistake
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]