zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r531680563
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
}
}
+ // same as submit supervisor logic
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ @Override
+ public void handle()
+ {
+ lock.lock();
+ try {
+ long nowTime = System.currentTimeMillis();
+ long minTriggerDynamicFrequency =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
1200000)));
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.info("PendingCompletionTaskGroups is : " +
pendingCompletionTaskGroups);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[" + pendingCompletionTaskGroups + "]");
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+ log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime -
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" +
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+ return;
+ }
+ if (!queue.isAtFullCapacity()) {
+ log.info("Metrics collection is not at full capacity, skip to check
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+ return;
+ }
+ List<Long> lags = collectTotalLags();
+ boolean allocationSuccess = dynamicAllocate(lags);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ queue.clear();
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private boolean dynamicAllocate(List<Long> lags) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.info("[%s] supervisor is running, start to check dynamic allocate task
logic", dataSource);
+ long scaleOutThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
5000000)));
+ long scaleInThreshold =
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
1000000)));
+ double triggerSaleOutThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
0.3)));
+ double triggerSaleInThresholdFrequency =
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
0.8)));
+ int taskCountMax =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
8)));
+ int taskCountMin =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
1)));
+ int scaleInStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
1)));
+ int scaleOutStep =
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
2)));
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= scaleOutThreshold) {
+ beyond++;
+ }
+ if (lag <= scaleInThreshold) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+ log.info("triggerSaleOutThresholdFrequency is [ " +
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [
" + triggerSaleInThresholdFrequency + " ]");
+ log.info("beyondProportion is [ " + beyondProportion + " ] and
withinProportion is [ " + withinProportion + " ]");
+
+ int currentActiveTaskCount;
+ int desireActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (beyondProportion >= triggerSaleOutThresholdFrequency) {
+ // Do Scale out
+ int taskCount = currentActiveTaskCount + scaleOutStep;
+ if (currentActiveTaskCount == taskCountMax) {
+ log.info("CurrentActiveTaskCount reach task count Max limit, skip to
scale out tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.min(taskCount, taskCountMax);
+ }
+ log.info("Start to scale out tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+
+ if (withinProportion >= triggerSaleInThresholdFrequency) {
+ // Do Scale in
+ int taskCount = currentActiveTaskCount - scaleInStep;
+ if (currentActiveTaskCount == taskCountMin) {
+ log.info("CurrentActiveTaskCount reach task count Min limit, skip to
scale in tasks");
+ return false;
+ } else {
+ desireActiveTaskCount = Math.max(taskCount, taskCountMin);
+ }
+ log.info("Start to scale in tasks , current active task number [ " +
currentActiveTaskCount + " ] and desire task number is [ " +
desireActiveTaskCount + " ] ");
+ gracefulShutdownInternal();
+ // clear everything
+ clearAllocationInfos();
+ log.info("Set Task Count : " + desireActiveTaskCount);
+ setTaskCount(desireActiveTaskCount);
+ return true;
+ }
+ return false;
+ }
+
+ private void setTaskCount(int desireActiveTaskCount)
Review comment:
1. renamed as `changeTaskCountInIOConfig`
2. the method `gracefulShutdownInternal` will not suspend the supervisor. It
will make the ingest tasks stop reading and start to publish data. So that
whether the func fails or not, the supervisor is always active. What's more, if
this func failed, current scale action will be canceled. And will try another
scale action in `dynamicCheckPeriod`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]