zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r585325687



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);

Review comment:
       Because we can occasionally get negative lags in our practice. Something 
like 
https://stackoverflow.com/questions/60847952/how-to-get-rid-of-negative-consumer-lag-in-kafka
   
   Negative lag values is un-necessary and a poison into our lag metrics. So 
just filter it here.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+          }
+          log.debug("Current lags [%s] for dataSource [%s].", new 
ArrayList<>(lagMetricsQueue), dataSource);
+        } else {
+          log.warn("[%s] supervisor is suspended, skipping lag collection", 
dataSource);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error while collecting lags");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+  }
+
+  /**
+   * This method determines whether to do scale actions based on collected lag 
points.
+   * Current algorithm of scale is simple:
+   * First of all, compute the proportion of lag points higher/lower than 
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+   * Secondly, compare scaleOutThreshold/scaleInThreshold with 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale 
out action has higher priority than scale in action.
+   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale 
out/in action would be triggered.
+   *
+   * @param lags the lag metrics of Stream(Kafka/Kinesis)
+   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   */
+  private Integer computeDesiredTaskCount(List<Long> lags)

Review comment:
       Thanks && changed.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+          }
+          log.debug("Current lags [%s] for dataSource [%s].", new 
ArrayList<>(lagMetricsQueue), dataSource);
+        } else {
+          log.warn("[%s] supervisor is suspended, skipping lag collection", 
dataSource);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error while collecting lags");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+  }
+
+  /**
+   * This method determines whether to do scale actions based on collected lag 
points.
+   * Current algorithm of scale is simple:
+   * First of all, compute the proportion of lag points higher/lower than 
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+   * Secondly, compare scaleOutThreshold/scaleInThreshold with 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale 
out action has higher priority than scale in action.
+   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale 
out/in action would be triggered.
+   *
+   * @param lags the lag metrics of Stream(Kafka/Kinesis)
+   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   */
+  private Integer computeDesiredTaskCount(List<Long> lags)
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
+        beyond++;
+      }
+      if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+
+    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+        withinProportion, dataSource
+    );
+
+    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    if (currentActiveTaskCount < 0) {

Review comment:
       Thanks && removed.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lag points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      if (autoScalerConfig == null) {
+        log.warn("autoScalerConfig is null but dynamic allocation notice is 
submitted, how can it be ?");
+      } else {
+        try {
+          long nowTime = System.currentTimeMillis();
+          if (spec.isSuspended()) {
+            log.info("Skipping DynamicAllocationTasksNotice execution because 
[%s] supervisor is suspended",
+                    dataSource
+            );
+            return;
+          }
+          log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
+                  dataSource
+          );
+          for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
+            if (!list.isEmpty()) {
+              log.info(
+                      "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
+                      dataSource, pendingCompletionTaskGroups
+              );
+              return;
+            }
+          }
+          if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+            log.info(
+                    "DynamicAllocationTasksNotice submitted again in [%d] 
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                    nowTime - dynamicTriggerLastRunTime, 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+            );
+            return;
+          }
+          final Integer desriedTaskCount = scaleAction.call();
+          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+          if (allocationSuccess) {
+            dynamicTriggerLastRunTime = nowTime;
+          }
+        }
+        catch (Exception ex) {
+          log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method determines how to do scale actions based on collected lag 
points.
+   * If scale action is triggered :
+   *    First of all, call gracefulShutdownInternal() which will change the 
state of current datasource ingest tasks from reading to publishing.
+   *    Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
+   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
+   * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
+   *         If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  private boolean changeTaskCount(Integer desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       changed.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),

Review comment:
       changed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to