zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r585332382



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lag points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      if (autoScalerConfig == null) {
+        log.warn("autoScalerConfig is null but dynamic allocation notice is 
submitted, how can it be ?");
+      } else {
+        try {
+          long nowTime = System.currentTimeMillis();
+          if (spec.isSuspended()) {
+            log.info("Skipping DynamicAllocationTasksNotice execution because 
[%s] supervisor is suspended",
+                    dataSource
+            );
+            return;
+          }
+          log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
+                  dataSource
+          );
+          for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
+            if (!list.isEmpty()) {
+              log.info(
+                      "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
+                      dataSource, pendingCompletionTaskGroups
+              );
+              return;
+            }
+          }
+          if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+            log.info(
+                    "DynamicAllocationTasksNotice submitted again in [%d] 
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                    nowTime - dynamicTriggerLastRunTime, 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+            );
+            return;
+          }
+          final Integer desriedTaskCount = scaleAction.call();
+          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+          if (allocationSuccess) {
+            dynamicTriggerLastRunTime = nowTime;
+          }
+        }
+        catch (Exception ex) {
+          log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method determines how to do scale actions based on collected lag 
points.
+   * If scale action is triggered :
+   *    First of all, call gracefulShutdownInternal() which will change the 
state of current datasource ingest tasks from reading to publishing.
+   *    Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
+   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
+   * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
+   *         If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  private boolean changeTaskCount(Integer desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    int currentActiveTaskCount;
+    Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
+    currentActiveTaskCount = activeTaskGroups.size();
+
+    if (desiredActiveTaskCount == -1 || desiredActiveTaskCount == 
currentActiveTaskCount) {

Review comment:
       Thanks && changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to