capistrant commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r529982981



##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
##########
@@ -377,6 +377,11 @@ protected boolean 
useExclusiveStartSequenceNumberForNonFirstSequence()
     return true;
   }
 
+  @Override
+  protected void collectLag(ArrayList<Long> lags)
+  {

Review comment:
       add comment stating why this is not implemented

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -85,7 +88,9 @@ public KinesisSupervisorIOConfig(
         completionTimeout,
         lateMessageRejectionPeriod,
         earlyMessageRejectionPeriod,
+        dynamicAllocationTasksProperties,
         lateMessageRejectionStartDateTime
+

Review comment:
       nit: remove empty line

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();
+    log.info("Get dynamicAllocationTasksProperties from IOConfig : " + 
dynamicAllocationTasksProperties);
+
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+      this.enableDynamicAllocationTasks = true;
+    } else {
+      log.info("Disable Dynamic Allocate Tasks");
+      this.enableDynamicAllocationTasks = false;
+    }
+    int taskCountMax = 0;
+    if (enableDynamicAllocationTasks) {
+      this.metricsCollectionIntervalMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
 10000)));
+      this.metricsCollectionRangeMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
 6 * 10 * 1000)));
+      int slots = (int) (metricsCollectionRangeMillis / 
metricsCollectionIntervalMillis) + 1;
+      log.info(" The interval of metrics collection is " + 
metricsCollectionIntervalMillis + ", " + metricsCollectionRangeMillis + " 
timeRange will collect " + slots + " data points at most.");
+      this.queue = new CircularFifoQueue<>(slots);
+      taskCountMax = 
Integer.parseInt(String.valueOf(this.dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
 8)));

Review comment:
       what is the reasoning behind this default of 8?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
##########
@@ -46,6 +48,7 @@
   private final Optional<Duration> lateMessageRejectionPeriod;
   private final Optional<Duration> earlyMessageRejectionPeriod;
   private final Optional<DateTime> lateMessageRejectionStartDateTime;
+  private final Map<String, Object> dynamicAllocationTasksProperties;

Review comment:
       as your comment says below, this could be null. Should we annotate as 
nullable?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.info("[%s] supervisor is running, start to check dynamic allocate task 
logic", dataSource);
+    long scaleOutThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
 5000000)));
+    long scaleInThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
 1000000)));
+    double triggerSaleOutThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
 0.3)));
+    double triggerSaleInThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
 0.8)));
+    int taskCountMax = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
 8)));
+    int taskCountMin = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
 1)));
+    int scaleInStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
 1)));
+    int scaleOutStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
 2)));
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= scaleOutThreshold) {
+        beyond++;
+      }
+      if (lag <= scaleInThreshold) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+    log.info("triggerSaleOutThresholdFrequency is [ " + 
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [ 
" + triggerSaleInThresholdFrequency + " ]");
+    log.info("beyondProportion is [ " + beyondProportion + " ] and 
withinProportion is [ " + withinProportion + " ]");
+
+    int currentActiveTaskCount;
+    int desireActiveTaskCount;
+    Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
+    currentActiveTaskCount = activeTaskGroups.size();
+
+    if (beyondProportion >= triggerSaleOutThresholdFrequency) {
+      // Do Scale out
+      int taskCount = currentActiveTaskCount + scaleOutStep;
+      if (currentActiveTaskCount == taskCountMax) {
+        log.info("CurrentActiveTaskCount reach task count Max limit, skip to 
scale out tasks");
+        return false;
+      } else {
+        desireActiveTaskCount = Math.min(taskCount, taskCountMax);
+      }
+      log.info("Start to scale out tasks , current active task number [ " + 
currentActiveTaskCount + " ] and desire task number is [ " + 
desireActiveTaskCount + " ] ");
+      gracefulShutdownInternal();
+      // clear everything
+      clearAllocationInfos();
+      log.info("Set Task Count : " + desireActiveTaskCount);
+      setTaskCount(desireActiveTaskCount);
+      return true;
+    }
+
+    if (withinProportion >= triggerSaleInThresholdFrequency) {
+      // Do Scale in
+      int taskCount = currentActiveTaskCount - scaleInStep;
+      if (currentActiveTaskCount == taskCountMin) {
+        log.info("CurrentActiveTaskCount reach task count Min limit, skip to 
scale in tasks");
+        return false;
+      } else {
+        desireActiveTaskCount = Math.max(taskCount, taskCountMin);
+      }
+      log.info("Start to scale in tasks , current active task number [ " + 
currentActiveTaskCount + " ] and desire task number is [ " + 
desireActiveTaskCount + " ] ");
+      gracefulShutdownInternal();
+      // clear everything
+      clearAllocationInfos();
+      log.info("Set Task Count : " + desireActiveTaskCount);
+      setTaskCount(desireActiveTaskCount);
+      return true;
+    }
+    return false;
+  }
+
+  private void setTaskCount(int desireActiveTaskCount)

Review comment:
       I think this method deserves a more specific name as it is actually 
re-submitting the supervisor. Perhaps `submitSupervisorWithTaskCount` or 
something of that sort?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();
+    log.info("Get dynamicAllocationTasksProperties from IOConfig : " + 
dynamicAllocationTasksProperties);
+
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+      this.enableDynamicAllocationTasks = true;
+    } else {
+      log.info("Disable Dynamic Allocate Tasks");
+      this.enableDynamicAllocationTasks = false;
+    }
+    int taskCountMax = 0;
+    if (enableDynamicAllocationTasks) {
+      this.metricsCollectionIntervalMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
 10000)));

Review comment:
       wondering if it would be better to have all these defaults be final 
constants instantiated at top of class for easy reference?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.info("[%s] supervisor is running, start to check dynamic allocate task 
logic", dataSource);
+    long scaleOutThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
 5000000)));
+    long scaleInThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
 1000000)));
+    double triggerSaleOutThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
 0.3)));
+    double triggerSaleInThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
 0.8)));
+    int taskCountMax = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
 8)));
+    int taskCountMin = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
 1)));
+    int scaleInStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
 1)));
+    int scaleOutStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
 2)));
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= scaleOutThreshold) {
+        beyond++;
+      }
+      if (lag <= scaleInThreshold) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+    log.info("triggerSaleOutThresholdFrequency is [ " + 
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [ 
" + triggerSaleInThresholdFrequency + " ]");
+    log.info("beyondProportion is [ " + beyondProportion + " ] and 
withinProportion is [ " + withinProportion + " ]");
+
+    int currentActiveTaskCount;
+    int desireActiveTaskCount;
+    Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
+    currentActiveTaskCount = activeTaskGroups.size();
+
+    if (beyondProportion >= triggerSaleOutThresholdFrequency) {
+      // Do Scale out
+      int taskCount = currentActiveTaskCount + scaleOutStep;
+      if (currentActiveTaskCount == taskCountMax) {
+        log.info("CurrentActiveTaskCount reach task count Max limit, skip to 
scale out tasks");
+        return false;
+      } else {
+        desireActiveTaskCount = Math.min(taskCount, taskCountMax);
+      }
+      log.info("Start to scale out tasks , current active task number [ " + 
currentActiveTaskCount + " ] and desire task number is [ " + 
desireActiveTaskCount + " ] ");
+      gracefulShutdownInternal();
+      // clear everything
+      clearAllocationInfos();
+      log.info("Set Task Count : " + desireActiveTaskCount);
+      setTaskCount(desireActiveTaskCount);
+      return true;
+    }
+
+    if (withinProportion >= triggerSaleInThresholdFrequency) {
+      // Do Scale in
+      int taskCount = currentActiveTaskCount - scaleInStep;
+      if (currentActiveTaskCount == taskCountMin) {
+        log.info("CurrentActiveTaskCount reach task count Min limit, skip to 
scale in tasks");
+        return false;
+      } else {
+        desireActiveTaskCount = Math.max(taskCount, taskCountMin);
+      }
+      log.info("Start to scale in tasks , current active task number [ " + 
currentActiveTaskCount + " ] and desire task number is [ " + 
desireActiveTaskCount + " ] ");
+      gracefulShutdownInternal();
+      // clear everything
+      clearAllocationInfos();
+      log.info("Set Task Count : " + desireActiveTaskCount);
+      setTaskCount(desireActiveTaskCount);
+      return true;
+    }
+    return false;
+  }
+
+  private void setTaskCount(int desireActiveTaskCount)

Review comment:
       what are the consequences of failure at this point? we have called 
gracefulShutdownInternal so I assume we will be left with no active supervisor 
for the datasource?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
     }
   }
 
+  private Runnable collectAndcollectLags()
+  {
+    return new Runnable() {
+      @Override
+      public void run()

Review comment:
       logs should provide context about what supervisor they are referring. As 
in other places, lets assess what can be changed to debug to reduce chattiness

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3843,6 @@ protected void emitLag()
    * sequences. In Kafka, start offsets are always inclusive.
    */
   protected abstract boolean 
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+  protected abstract void collectLag(ArrayList<Long> lags);

Review comment:
       javadoc please

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -652,6 +857,11 @@ public void stop(boolean stopGracefully)
       try {
         scheduledExec.shutdownNow(); // stop recurring executions
         reportingExec.shutdownNow();
+        log.info("Shut Down allocationExec now");

Review comment:
       I don't think this log or the one below is needed since there aren't 
logs for the other Execs

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3526,6 +3789,25 @@ protected void emitLag()
     }
   }
 
+
+  protected void computeLags(Map<PartitionIdType, Long> partitionLags, 
ArrayList<Long> lags)

Review comment:
       pretty straightforward method, but a short javadoc would be nice since 
we are updating an important lag related object

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
     }
   }
 
+  private Runnable collectAndcollectLags()

Review comment:
       is this supposed to be `collectAndComputeLags()`? As far as I can tell, 
the log on line 982 seems to suggest that is the name you may have meant to use

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()

Review comment:
       javadoc would be helpful as this is complex/important method override

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       javadoc would be helpful as this is important/complex method

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java
##########
@@ -113,12 +119,23 @@ public Integer getReplicas()
     return replicas;
   }
 
+  @JsonProperty

Review comment:
       should this be annotated as nullable if the instance can be null as your 
comment in the constructor suggests?

##########
File path: 
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
##########
@@ -824,12 +827,32 @@ private static SeekableStreamSupervisorIOConfig 
getIOConfig()
         false,
         new Period("PT30M"),
         null,
-        null, null
+        null, getProperties(), null
     )
     {
     };
   }
 
+  private static Map<String, Object> getProperties()
+  {
+    HashMap<String, Object> dynamicAllocationTasksProperties = new HashMap<>();

Review comment:
       we need to document all of these new configs in kafka-ingestion.md in 
the `KafkaSupervisorIOConfig` section

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -495,6 +655,12 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable 
TaskGroup taskGroup)
   private volatile boolean stopped = false;
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
+  private final boolean enableDynamicAllocationTasks;
+  private volatile long metricsCollectionIntervalMillis;
+  private volatile long metricsCollectionRangeMillis;
+  private volatile long dynamicCheckStartDelayMillis;
+  private volatile long dynamicCheckPeriod;
+  private volatile CircularFifoQueue<Long> queue;

Review comment:
       I think refactoring with a more descriptive name would be beneficial for 
readability 

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -518,20 +684,52 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();
+    log.info("Get dynamicAllocationTasksProperties from IOConfig : " + 
dynamicAllocationTasksProperties);
+
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      log.info("EnableDynamicAllocationTasks for datasource " + dataSource);
+      this.enableDynamicAllocationTasks = true;
+    } else {
+      log.info("Disable Dynamic Allocate Tasks");
+      this.enableDynamicAllocationTasks = false;
+    }
+    int taskCountMax = 0;
+    if (enableDynamicAllocationTasks) {
+      this.metricsCollectionIntervalMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionIntervalMillis",
 10000)));
+      this.metricsCollectionRangeMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
 6 * 10 * 1000)));
+      int slots = (int) (metricsCollectionRangeMillis / 
metricsCollectionIntervalMillis) + 1;
+      log.info(" The interval of metrics collection is " + 
metricsCollectionIntervalMillis + ", " + metricsCollectionRangeMillis + " 
timeRange will collect " + slots + " data points at most.");
+      this.queue = new CircularFifoQueue<>(slots);
+      taskCountMax = 
Integer.parseInt(String.valueOf(this.dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
 8)));
+      this.dynamicCheckStartDelayMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("dynamicCheckStartDelayMillis",
 300000)));
+      this.dynamicCheckPeriod = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("dynamicCheckPeriod",
 600000)));
+      this.metricsCollectionRangeMillis = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("metricsCollectionRangeMillis",
 600000)));
+    }
+
     this.tuningConfig = spec.getTuningConfig();
     this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
     this.supervisorId = supervisorId;
     this.exec = Execs.singleThreaded(supervisorId);
     this.scheduledExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Scheduler-%d");
     this.reportingExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Reporting-%d");
+    this.allocationExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Allocation-%d");
+    this.lagComputationExec = Execs.scheduledSingleThreaded(supervisorId + 
"-Computation-%d");
     this.stateManager = new SeekableStreamSupervisorStateManager(
         spec.getSupervisorStateManagerConfig(),
         spec.isSuspended()
     );
 
-    int workerThreads = (this.tuningConfig.getWorkerThreads() != null
-                         ? this.tuningConfig.getWorkerThreads()
-                         : Math.min(10, this.ioConfig.getTaskCount()));
+    int workerThreads;
+    if (enableDynamicAllocationTasks) {
+      workerThreads = (this.tuningConfig.getWorkerThreads() != null
+              ? this.tuningConfig.getWorkerThreads()
+              : Math.min(10, taskCountMax));
+    } else {
+      workerThreads = (this.tuningConfig.getWorkerThreads() != null
+              ? this.tuningConfig.getWorkerThreads()
+              : Math.min(10, this.ioConfig.getTaskCount()));
+    }
 
     this.workerExec = 
MoreExecutors.listeningDecorator(Execs.multiThreaded(workerThreads, 
supervisorId + "-Worker-%d"));
     log.info("Created worker pool with [%d] threads for dataSource [%s]", 
workerThreads, this.dataSource);

Review comment:
       same thought about debug level and context about the supervisor it is 
referring to

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -1137,6 +1394,20 @@ public void gracefulShutdownInternal() throws 
ExecutionException, InterruptedExc
   @VisibleForTesting
   public void resetInternal(DataSourceMetadata dataSourceMetadata)
   {
+    // clear queue for kafka lags
+    if (enableDynamicAllocationTasks && queue != null) {
+      try {
+        lock.lock();
+        queue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");

Review comment:
       what are the implications of this failing? we are catching and carrying 
on. Can anything negative come from that?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));

Review comment:
       wondering if this default value should be final constant instantiated at 
top of class?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
     }
   }
 
+  private Runnable collectAndcollectLags()
+  {
+    return new Runnable() {
+      @Override
+      public void run()
+      {
+        lock.lock();
+        try {
+          if (!spec.isSuspended()) {
+            ArrayList<Long> metricsInfo = new ArrayList<>(3);
+            collectLag(metricsInfo);
+            long totalLags = metricsInfo.size() < 3 ? 0 : metricsInfo.get(1);
+            queue.offer(totalLags > 0 ? totalLags : 0);
+            log.info("Current lag metric points : " + new ArrayList<>(queue));
+          } else {
+            log.info("[%s] supervisor is suspended, skip to collect kafka 
lags", dataSource);
+          }
+        }
+        catch (Exception e) {
+          log.error(e, "Error, When collect kafka lags");

Review comment:
       should this be warn if we catch and move on?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -768,7 +978,22 @@ public void tryInit()
         );
 
         scheduleReporting(reportingExec);
-
+        if (enableDynamicAllocationTasks) {
+          log.info("Collect and compute lags at fixed rate of " + 
metricsCollectionIntervalMillis);

Review comment:
       include reference to the datasource in this log and the one for the lag 
computation executor below. Should they be debug to reduce info level 
chattiness?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       logs added in this method should provide context about what supervisor 
they refer to. I also think we should evaluate what logs should be changed to 
debug too so limit the chattiness of info level

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {

Review comment:
       does this mean we have not collected enough historical lag data to 
decide on scale in/scale out? I think the log can be updated to be more 
descriptive since it may not be obvious to log reader why it matters that queue 
is not full

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -495,6 +655,12 @@ boolean isValidTaskGroup(int taskGroupId, @Nullable 
TaskGroup taskGroup)
   private volatile boolean stopped = false;
   private volatile boolean lifecycleStarted = false;
   private final ServiceEmitter emitter;
+  private final boolean enableDynamicAllocationTasks;
+  private volatile long metricsCollectionIntervalMillis;
+  private volatile long metricsCollectionRangeMillis;
+  private volatile long dynamicCheckStartDelayMillis;
+  private volatile long dynamicCheckPeriod;
+  private volatile CircularFifoQueue<Long> queue;
 
   public SeekableStreamSupervisor(

Review comment:
       logs in this constructor should include info on the supervisor being 
referred to. I think we should also evaluate what can be debug to reduce 
chattiness in info level logging.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.info("[%s] supervisor is running, start to check dynamic allocate task 
logic", dataSource);
+    long scaleOutThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
 5000000)));
+    long scaleInThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
 1000000)));
+    double triggerSaleOutThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
 0.3)));
+    double triggerSaleInThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleInThresholdFrequency",
 0.8)));
+    int taskCountMax = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMax",
 8)));
+    int taskCountMin = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("taskCountMin",
 1)));
+    int scaleInStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInStep",
 1)));
+    int scaleOutStep = 
Integer.parseInt(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutStep",
 2)));
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= scaleOutThreshold) {
+        beyond++;
+      }
+      if (lag <= scaleInThreshold) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+    log.info("triggerSaleOutThresholdFrequency is [ " + 
triggerSaleOutThresholdFrequency + " ] and triggerSaleInThresholdFrequency is [ 
" + triggerSaleInThresholdFrequency + " ]");

Review comment:
       same spelling callout as above

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.info("[%s] supervisor is running, start to check dynamic allocate task 
logic", dataSource);
+    long scaleOutThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleOutThreshold",
 5000000)));
+    long scaleInThreshold = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("scaleInThreshold",
 1000000)));
+    double triggerSaleOutThresholdFrequency = 
Double.parseDouble(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("triggerSaleOutThresholdFrequency",
 0.3)));

Review comment:
       I think these may be spelling mistakes in variable name and config value 
for this and next config. `triggerSale*` --> `triggerScale*` ?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()

Review comment:
       also the logs added should add context about what supervisor is being 
logged. I think we should evaluate what logs should be changed to debug too so 
limit the chattiness of info level

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -791,6 +1016,38 @@ public void tryInit()
     }
   }
 
+  private Runnable collectAndcollectLags()

Review comment:
       also, a javadoc would be helpful too if you don't mind

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -317,6 +322,157 @@ public void handle()
     }
   }
 
+  // same as submit supervisor logic
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    @Override
+    public void handle()
+    {
+      lock.lock();
+      try {
+        long nowTime = System.currentTimeMillis();
+        long minTriggerDynamicFrequency = 
Long.parseLong(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("minTriggerDynamicFrequencyMillis",
 1200000)));
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        // max(minTriggerDynamicFrequency, metricsCollectionRangeMillis)
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.info("PendingCompletionTaskGroups is : " + 
pendingCompletionTaskGroups);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[" + pendingCompletionTaskGroups + "]");
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
+          log.info("NowTime - dynamicTriggerLastRunTime is [" + (nowTime - 
dynamicTriggerLastRunTime) + "]. Defined minTriggerDynamicFrequency is [" + 
minTriggerDynamicFrequency + "] , CLAM DOWN NOW !");
+          return;
+        }
+        if (!queue.isAtFullCapacity()) {
+          log.info("Metrics collection is not at full capacity, skip to check 
dynamic allocate task : [" + queue.size() + " vs " + queue.maxSize() + "]");
+          return;
+        }
+        List<Long> lags = collectTotalLags();
+        boolean allocationSuccess = dynamicAllocate(lags);
+        if (allocationSuccess) {
+          dynamicTriggerLastRunTime = nowTime;
+          queue.clear();
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error, when parse DynamicAllocationTasksNotice");
+      }
+      finally {
+        lock.unlock();
+      }
+    }
+  }
+
+  private boolean dynamicAllocate(List<Long> lags) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       should the config defaults be instantiated as final constants at top of 
class?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to