zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r580789978
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +635,42 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.autoScalerConfig = ioConfig.getAutoscalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec =
Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
this.scheduledExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Scheduler-%d");
this.reportingExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Reporting-%d");
+
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
- int workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, this.ioConfig.getTaskCount()));
+ int workerThreads;
+ int chatThreads;
+ if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoscaler()) {
+ log.info("enableTaskAutoscaler for datasource [%s]", dataSource);
Review comment:
Thanks && Changed.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +635,42 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.autoScalerConfig = ioConfig.getAutoscalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec =
Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
this.scheduledExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Scheduler-%d");
this.reportingExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Reporting-%d");
+
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
- int workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, this.ioConfig.getTaskCount()));
+ int workerThreads;
+ int chatThreads;
+ if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoscaler()) {
+ log.info("enableTaskAutoscaler for datasource [%s]", dataSource);
+
+ workerThreads = (this.tuningConfig.getWorkerThreads() != null
+ ? this.tuningConfig.getWorkerThreads()
+ : Math.min(10, autoScalerConfig.getTaskCountMax()));
+
+ chatThreads = (this.tuningConfig.getChatThreads() != null
+ ? this.tuningConfig.getChatThreads()
+ : Math.min(10, autoScalerConfig.getTaskCountMax() *
this.ioConfig.getReplicas()));
+ } else {
+ log.info("Disable dynamic allocate tasks for [%s]", dataSource);
Review comment:
Thanks && Changed.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +324,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (autoScalerConfig != null && nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [%s]. Defined
minTriggerDynamicFrequency is [%s] for dataSource [%s], CALM DOWN NOW !",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource);
+ return;
+ }
+
+ Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
+
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
Review comment:
Thanks && Changed.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +324,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (autoScalerConfig != null && nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [%s]. Defined
minTriggerDynamicFrequency is [%s] for dataSource [%s], CALM DOWN NOW !",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource);
+ return;
+ }
+
+ Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
+
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled
next 'RunNotice'.
+ * Finally, change taskCount in SeekableStreamSupervisorIOConfig and sync
it to MetaStorage.
+ * After changed taskCount in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting
supervisors.
+ * @param desiredActiveTaskCount desired taskCount compute from autoscaler
+ * @return Boolean flag, do scale action successfully or not. If true , it
will take at least 'minTriggerDynamicFrequency' before next 'dynamicAllocate'.
+ * If false, it will do 'dynamicAllocate' again after
'dynamicCheckPeriod'.
Review comment:
Thanks && Changed.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +324,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (autoScalerConfig != null && nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [%s]. Defined
minTriggerDynamicFrequency is [%s] for dataSource [%s], CALM DOWN NOW !",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource);
+ return;
+ }
+
+ Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
+
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled
next 'RunNotice'.
+ * Finally, change taskCount in SeekableStreamSupervisorIOConfig and sync
it to MetaStorage.
+ * After changed taskCount in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting
supervisors.
+ * @param desiredActiveTaskCount desired taskCount compute from autoscaler
+ * @return Boolean flag, do scale action successfully or not. If true , it
will take at least 'minTriggerDynamicFrequency' before next 'dynamicAllocate'.
+ * If false, it will do 'dynamicAllocate' again after
'dynamicCheckPeriod'.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean dynamicAllocate(Integer desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ int currentActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (desiredActiveTaskCount == -1 || desiredActiveTaskCount ==
currentActiveTaskCount) {
+ return false;
+ } else {
+ log.debug("Start to scale action tasks, current active task number [%s]
and desired task number is [%s] for dataSource [%s].", currentActiveTaskCount,
desiredActiveTaskCount, dataSource);
Review comment:
Thanks && Changed.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +324,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (autoScalerConfig != null && nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerDynamicFrequencyMillis()) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [%s]. Defined
minTriggerDynamicFrequency is [%s] for dataSource [%s], CALM DOWN NOW !",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerDynamicFrequencyMillis(), dataSource);
+ return;
+ }
+
+ Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = dynamicAllocate(desriedTaskCount);
+
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled
next 'RunNotice'.
+ * Finally, change taskCount in SeekableStreamSupervisorIOConfig and sync
it to MetaStorage.
+ * After changed taskCount in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting
supervisors.
+ * @param desiredActiveTaskCount desired taskCount compute from autoscaler
+ * @return Boolean flag, do scale action successfully or not. If true , it
will take at least 'minTriggerDynamicFrequency' before next 'dynamicAllocate'.
+ * If false, it will do 'dynamicAllocate' again after
'dynamicCheckPeriod'.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean dynamicAllocate(Integer desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ int currentActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (desiredActiveTaskCount == -1 || desiredActiveTaskCount ==
currentActiveTaskCount) {
+ return false;
+ } else {
+ log.debug("Start to scale action tasks, current active task number [%s]
and desired task number is [%s] for dataSource [%s].", currentActiveTaskCount,
desiredActiveTaskCount, dataSource);
+ gracefulShutdownInternal();
+ changeTaskCountInIOConfig(desiredActiveTaskCount);
+ // clear everything
+ clearAllocationInfos();
+ log.info("Changed taskCount to [%s] for dataSource [%s].",
desiredActiveTaskCount, dataSource);
+ return true;
+ }
+ }
+
+ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
+ {
+ ioConfig.setTaskCount(desiredActiveTaskCount);
+ try {
+ Optional<SupervisorManager> supervisorManager =
taskMaster.getSupervisorManager();
+ if (supervisorManager.isPresent()) {
+ MetadataSupervisorManager metadataSupervisorManager =
supervisorManager.get().getMetadataSupervisorManager();
+ metadataSupervisorManager.insert(dataSource, spec);
+ } else {
+ log.warn("supervisorManager is null in taskMaster, skip to do scale
action for dataSource [%s].", dataSource);
Review comment:
Thanks && Changed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]