himanshug commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r585157002



##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the 
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka 
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler 
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or 
ignored here will disable `autoScaler` even though `autoScalerConfig` is not 
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points 
collection.  | no (default == 30000) |

Review comment:
       ```suggestion
   | `lagCollectionIntervalMillis` | Period of lag points collection.  | no 
(default == 30000) |
   ```

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the 
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka 
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler 
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or 
ignored here will disable `autoScaler` even though `autoScalerConfig` is not 
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points 
collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use 
with `lagCollectionIntervalMillis`,it means that in the recent 
`lagCollectionRangeMillis`, collect lag metric points every 
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` 
percent of lag points are higher than `scaleOutThreshold`, then do scale out 
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency` 
percent of lag points are lower than `scaleOutThreshold`, then do scale in 
action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor 
starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale 
action in millis | no (default == 60000) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= 
taskCountMin` | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the 
value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be 
the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two 
scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is 
supported for now. | no (default == `lagBased`) |

Review comment:
       Can we make the distinction that, following properties are common to any 
autoscaler and rest are specific to `lagBased` autoscaler , maybe have two 
tables.
   
   ```
   autoScalerStrategy
   enableTaskAutoScaler
   taskCountMin
   taskCountMax
   minTriggerScaleActionFrequencyMillis
   
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lag points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      if (autoScalerConfig == null) {
+        log.warn("autoScalerConfig is null but dynamic allocation notice is 
submitted, how can it be ?");
+      } else {
+        try {
+          long nowTime = System.currentTimeMillis();
+          if (spec.isSuspended()) {
+            log.info("Skipping DynamicAllocationTasksNotice execution because 
[%s] supervisor is suspended",
+                    dataSource
+            );
+            return;
+          }
+          log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
+                  dataSource
+          );
+          for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
+            if (!list.isEmpty()) {
+              log.info(
+                      "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
+                      dataSource, pendingCompletionTaskGroups
+              );
+              return;
+            }
+          }
+          if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+            log.info(
+                    "DynamicAllocationTasksNotice submitted again in [%d] 
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                    nowTime - dynamicTriggerLastRunTime, 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+            );
+            return;
+          }
+          final Integer desriedTaskCount = scaleAction.call();
+          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+          if (allocationSuccess) {
+            dynamicTriggerLastRunTime = nowTime;
+          }
+        }
+        catch (Exception ex) {
+          log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method determines how to do scale actions based on collected lag 
points.
+   * If scale action is triggered :
+   *    First of all, call gracefulShutdownInternal() which will change the 
state of current datasource ingest tasks from reading to publishing.
+   *    Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
+   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
+   * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
+   *         If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  private boolean changeTaskCount(Integer desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException
+  {
+    int currentActiveTaskCount;
+    Collection<TaskGroup> activeTaskGroups = 
activelyReadingTaskGroups.values();
+    currentActiveTaskCount = activeTaskGroups.size();
+
+    if (desiredActiveTaskCount == -1 || desiredActiveTaskCount == 
currentActiveTaskCount) {

Review comment:
       ```suggestion
       if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == 
currentActiveTaskCount) {
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);

Review comment:
       why shouldn't we expect lagStats.getTotalLag() to return a value >= 0 ?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lag points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      if (autoScalerConfig == null) {
+        log.warn("autoScalerConfig is null but dynamic allocation notice is 
submitted, how can it be ?");
+      } else {
+        try {
+          long nowTime = System.currentTimeMillis();
+          if (spec.isSuspended()) {
+            log.info("Skipping DynamicAllocationTasksNotice execution because 
[%s] supervisor is suspended",
+                    dataSource
+            );
+            return;
+          }
+          log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", 
pendingCompletionTaskGroups,
+                  dataSource
+          );
+          for (CopyOnWriteArrayList<TaskGroup> list : 
pendingCompletionTaskGroups.values()) {
+            if (!list.isEmpty()) {
+              log.info(
+                      "Skipping DynamicAllocationTasksNotice execution for 
datasource [%s] because following tasks are pending [%s]",
+                      dataSource, pendingCompletionTaskGroups
+              );
+              return;
+            }
+          }
+          if (nowTime - dynamicTriggerLastRunTime < 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+            log.info(
+                    "DynamicAllocationTasksNotice submitted again in [%d] 
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+                    nowTime - dynamicTriggerLastRunTime, 
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+            );
+            return;
+          }
+          final Integer desriedTaskCount = scaleAction.call();
+          boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+          if (allocationSuccess) {
+            dynamicTriggerLastRunTime = nowTime;
+          }
+        }
+        catch (Exception ex) {
+          log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method determines how to do scale actions based on collected lag 
points.
+   * If scale action is triggered :
+   *    First of all, call gracefulShutdownInternal() which will change the 
state of current datasource ingest tasks from reading to publishing.
+   *    Secondly, clear all the stateful data structures: 
activelyReadingTaskGroups, partitionGroups, partitionOffsets, 
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in 
the next 'RunNotice'.
+   *    Finally, change the taskCount in SeekableStreamSupervisorIOConfig and 
sync it to MetadataStorage.
+   * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next 
RunNotice will create scaled number of ingest tasks without resubmitting the 
supervisor.
+   * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+   * @return Boolean flag indicating if scale action was executed or not. If 
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 
'changeTaskCount'.
+   *         If false, it will do 'changeTaskCount' again after 
'scaleActionPeriodMillis' millis.
+   * @throws InterruptedException
+   * @throws ExecutionException
+   * @throws TimeoutException
+   */
+  private boolean changeTaskCount(Integer desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException

Review comment:
       ```suggestion
     private boolean changeTaskCount(int desiredActiveTaskCount) throws 
InterruptedException, ExecutionException, TimeoutException
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+          }
+          log.debug("Current lags [%s] for dataSource [%s].", new 
ArrayList<>(lagMetricsQueue), dataSource);
+        } else {
+          log.warn("[%s] supervisor is suspended, skipping lag collection", 
dataSource);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error while collecting lags");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+  }
+
+  /**
+   * This method determines whether to do scale actions based on collected lag 
points.
+   * Current algorithm of scale is simple:
+   * First of all, compute the proportion of lag points higher/lower than 
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+   * Secondly, compare scaleOutThreshold/scaleInThreshold with 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale 
out action has higher priority than scale in action.
+   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale 
out/in action would be triggered.
+   *
+   * @param lags the lag metrics of Stream(Kafka/Kinesis)
+   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   */
+  private Integer computeDesiredTaskCount(List<Long> lags)

Review comment:
       ```suggestion
     private int computeDesiredTaskCount(List<Long> lags)
   ```

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),

Review comment:
       not sure why `lagCollectionRangeMillis` was added to 
`scaleActionStartDelayMillis` .

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the 
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka 
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler 
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or 
ignored here will disable `autoScaler` even though `autoScalerConfig` is not 
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points 
collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use 
with `lagCollectionIntervalMillis`,it means that in the recent 
`lagCollectionRangeMillis`, collect lag metric points every 
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` 
percent of lag points are higher than `scaleOutThreshold`, then do scale out 
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency` 
percent of lag points are lower than `scaleOutThreshold`, then do scale in 
action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor 
starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale 
action in millis | no (default == 60000) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= 
taskCountMin` | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the 
value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be 
the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two 
scale actions | no (default == 600000) |

Review comment:
       wouldn't time interval between two scale actions be always greater/equal 
to `scaleActionPeriodMillis` ?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+  private static final EmittingLogger log = new 
EmittingLogger(LagBasedAutoScaler.class);
+  private final String dataSource;
+  private final CircularFifoQueue<Long> lagMetricsQueue;
+  private final ScheduledExecutorService lagComputationExec;
+  private final ScheduledExecutorService allocationExec;
+  private final SupervisorSpec spec;
+  private final SeekableStreamSupervisor supervisor;
+  private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+  private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+  public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String 
dataSource,
+      LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+  )
+  {
+    this.lagBasedAutoScalerConfig = autoScalerConfig;
+    final String supervisorId = StringUtils.format("Supervisor-%s", 
dataSource);
+    this.dataSource = dataSource;
+    final int slots = (int) 
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / 
lagBasedAutoScalerConfig
+        .getLagCollectionIntervalMillis()) + 1;
+    this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+    this.allocationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Allocation-%d");
+    this.lagComputationExec = 
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + 
"-Computation-%d");
+    this.spec = spec;
+    this.supervisor = supervisor;
+  }
+
+  @Override
+  public void start()
+  {
+    Callable<Integer> scaleAction = () -> {
+      LOCK.lock();
+      int desiredTaskCount = -1;
+      try {
+        desiredTaskCount = computeDesiredTaskCount(new 
ArrayList<>(lagMetricsQueue));
+
+        if (desiredTaskCount != -1) {
+          lagMetricsQueue.clear();
+        }
+      }
+      catch (Exception ex) {
+        log.warn(ex, "Exception while computing desired task count for [%s]", 
dataSource);
+      }
+      finally {
+        LOCK.unlock();
+      }
+      return desiredTaskCount;
+    };
+
+    lagComputationExec.scheduleAtFixedRate(
+        computeAndCollectLag(),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for 
tasks to start up
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    allocationExec.scheduleAtFixedRate(
+        supervisor.buildDynamicAllocationTask(scaleAction),
+        lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + 
lagBasedAutoScalerConfig
+            .getLagCollectionRangeMillis(),
+        lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+        TimeUnit.MILLISECONDS
+    );
+    log.info(
+        "LagBasedAutoScaler will collect lag every [%d] millis and will keep 
[%d] data points for the last [%d] millis for dataSource [%s]",
+        lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 
lagMetricsQueue.size(),
+        lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+    );
+  }
+
+  @Override
+  public void stop()
+  {
+    allocationExec.shutdownNow();
+    lagComputationExec.shutdownNow();
+  }
+
+  @Override
+  public void reset()
+  {
+    // clear queue for kafka lags
+    if (lagMetricsQueue != null) {
+      try {
+        LOCK.lock();
+        lagMetricsQueue.clear();
+      }
+      catch (Exception e) {
+        log.warn(e, "Error,when clear queue in rest action");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * This method computes current consumer lag. Gets the total lag of all 
partitions and fill in the lagMetricsQueue
+   *
+   * @return a Runnbale object to compute and collect lag.
+   */
+  private Runnable computeAndCollectLag()
+  {
+    return () -> {
+      LOCK.lock();
+      try {
+        if (!spec.isSuspended()) {
+          LagStats lagStats = supervisor.computeLagStats();
+          if (lagStats == null) {
+            lagMetricsQueue.offer(0L);
+          } else {
+            long totalLags = lagStats.getTotalLag();
+            lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+          }
+          log.debug("Current lags [%s] for dataSource [%s].", new 
ArrayList<>(lagMetricsQueue), dataSource);
+        } else {
+          log.warn("[%s] supervisor is suspended, skipping lag collection", 
dataSource);
+        }
+      }
+      catch (Exception e) {
+        log.error(e, "Error while collecting lags");
+      }
+      finally {
+        LOCK.unlock();
+      }
+    };
+  }
+
+  /**
+   * This method determines whether to do scale actions based on collected lag 
points.
+   * Current algorithm of scale is simple:
+   * First of all, compute the proportion of lag points higher/lower than 
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+   * Secondly, compare scaleOutThreshold/scaleInThreshold with 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale 
out action has higher priority than scale in action.
+   * Finaly, if scaleOutThreshold/scaleInThreshold is higher than 
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale 
out/in action would be triggered.
+   *
+   * @param lags the lag metrics of Stream(Kafka/Kinesis)
+   * @return Integer. target number of tasksCount, -1 means skip scale action.
+   */
+  private Integer computeDesiredTaskCount(List<Long> lags)
+  {
+    // if supervisor is not suspended, ensure required tasks are running
+    // if suspended, ensure tasks have been requested to gracefully stop
+    log.debug("Computing desired task count for [%s], based on following lags 
: [%s]", dataSource, lags);
+    int beyond = 0;
+    int within = 0;
+    int metricsCount = lags.size();
+    for (Long lag : lags) {
+      if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
+        beyond++;
+      }
+      if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
+        within++;
+      }
+    }
+    double beyondProportion = beyond * 1.0 / metricsCount;
+    double withinProportion = within * 1.0 / metricsCount;
+
+    log.debug("Calculated beyondProportion is [%s] and withinProportion is 
[%s] for dataSource [%s].", beyondProportion,
+        withinProportion, dataSource
+    );
+
+    int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+    if (currentActiveTaskCount < 0) {

Review comment:
       is it legitimate for `supervisor.getActiveTaskGroupsCount()` to return a 
negative value? if not, then `supervisor.getActiveTaskGroupsCount()` should 
always return a value >= 0 and this check shouldn't be needed.

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the 
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka 
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler 
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or 
ignored here will disable `autoScaler` even though `autoScalerConfig` is not 
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points 
collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use 
with `lagCollectionIntervalMillis`,it means that in the recent 
`lagCollectionRangeMillis`, collect lag metric points every 
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` 
percent of lag points are higher than `scaleOutThreshold`, then do scale out 
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default == 
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency` 
percent of lag points are lower than `scaleOutThreshold`, then do scale in 
action. | no (default == 0.9) |

Review comment:
       ```suggestion
   | `triggerScaleInFractionThreshold` | If `triggerScaleInThresholdFrequency` 
percent of lag points are lower than `scaleOutThreshold`, then do scale in 
action. | no (default == 0.9) |
   ```

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the 
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka 
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler 
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or 
ignored here will disable `autoScaler` even though `autoScalerConfig` is not 
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points 
collection.  | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use 
with `lagCollectionIntervalMillis`,it means that in the recent 
`lagCollectionRangeMillis`, collect lag metric points every 
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default == 
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency` 
percent of lag points are higher than `scaleOutThreshold`, then do scale out 
action. | no (default == 0.3) |

Review comment:
       not sure if it is a "frequency". maybe `triggerScaleOutFractionThreshold`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to