zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r569974080



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3874,11 @@ protected void emitLag()
    * sequences. In Kafka, start offsets are always inclusive.
    */
   protected abstract boolean 
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  protected abstract void collectLag(ArrayList<Long> lags);

Review comment:
       Nice idea. Changed.

##########
File path: 
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
##########
@@ -361,6 +362,12 @@ public Supervisor createSupervisor()
     );
   }
 
+  @Override
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    return null;
+  }
+

Review comment:
       changed.

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);

Review comment:
       Ya, for now, we only support Kafka autoScaler. But based on 
`https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499`.
 It   may be not hard to support kinesis autoscaler. And I'am glad to  work on 
it after this pr merged. So maybe keep the code abstract is more meaningful and 
there are plenty preparation to ensure that users will not set kinesis auto 
scale by accident:
   1. Docs mention
   2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis 
is not supported yet. Please remove autoscalerConfig or set it null!");` in 
`KinesisSupervisorIOConfig`
   3. set `autoscalerConfig = null` when `super(xxx)` in 
`KinesisSupervisorIOConfig`
   
   Also we can't cast Supervisor to KafkaSupervisor unless add an extra 
dependency `druid-kafka-indexing-service` in `indexing-service` module.

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);
+
+  /**
+   * use for autoscaler
+   */
+  Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);

Review comment:
       Nice catch! I review the code and find out there is no need for this 
`buildDynamicAllocationTask ` method in `Supervisor.java` and autoscaler can 
build autoscale notice itself and supervisor will do scale action.
   So I removed `buildDynamicAllocationTask` func in `Supervisor.java`

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);
+
+  /**
+   * use for autoscaler
+   */
+  Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
+
+  Map getSupervisorTaskInfos();

Review comment:
       Sure. Changed.

##########
File path: 
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
##########
@@ -282,6 +283,23 @@ public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
     // do nothing
   }
 
+  @Override
+  public void collectLag(ArrayList<Long> lags)
+  {
+  }
+
+  @Override
+  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+  {
+    return null;
+  }
+
+  @Override
+  public Map getSupervisorTaskInfos()
+  {
+    return null;
+  }
+

Review comment:
       Sure. Done.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();

Review comment:
       I modified the way of obtaining configurations from Map to new interface 
`AutoScalerConfig` with a default impl `DefaultAutoScaleConfig`. So that we can 
use JackSon to Instantiate a Config with **default values** instead of 
map.get/parse everywhere. **Also ensure consistency of default values.**
   
   In this way, we don't need to get a handle to `SupervisorTaskAutoscaler`, 
just `autoScalerConfig.getTaskCountMax()` :)

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lags points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      try {
+        long nowTime = System.currentTimeMillis();
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].", 
pendingCompletionTaskGroups, dataSource);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {

Review comment:
       Actually, there are three hard conditions before do scale action:
   1. Don't scale when supervisor is suspended.
   2. Don't scale when previous task is handing off to avoid inconsistent state.
   3. Don't scale durning cool down time to avoid overly frequent scaling.
   
   And I think no matter what the task type is, no matter what the autoscaler 
impl is, it maybe batter to follow the above three common conditions.
   
   Also users can define their own condition like TaskCountLimitation in 
specific impl :)
    

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {

Review comment:
       The reason for designing this condition is that users can disable/enable 
autoscaler for a while easily rather than delete all the autoscaler-related 
configs.
   
   For examples, advertising business in Super Bowl. Traffic is much higher 
during the break time and lower durning Gaming(**Large traffic fluctuations in 
the short term**). If users don't set scale-related configs properly, it will 
trigger scale action too frequently and creates lots of small segments.
   
   Traffic like this we usually set a larger number of tasks temporarily and 
set it false to disable autoscaler for a while. 
   
   Also when scale algorithms become more advanced, it is better to remove this 
config and let autoscaler to do everything. But for now maybe it would be 
better if we keep this parameter :)
   
   

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values.
   
   As`supervisor` is newed in Druid everywhere. It is necessary to create the 
autoscaler instance using jackson? 🤕 

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       like I mentioned above. This is a compromise. Current algorithms is 
relatively simple while can meet most scenarios.
   But for extreme cases, if users don't set scale-related configs properly, it 
will trigger scale action too frequently and creates lots of small segments. At 
this time, the user needs to manually control taskCount and this config can 
make disable/enable work more convenient.
   
   When the algorithm is smart enough, It is better to remove this parameter.
   
   As for default value false, I think it is the insurance to prevent users to 
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete 
all the autoscaler related configs.
   

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|

Review comment:
       Make sense. Changed

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -70,6 +72,7 @@ public KinesisSupervisorIOConfig(
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
+      @JsonProperty("dynamicAllocationTasksProperties") Map<String, Object> 
dynamicAllocationTasksProperties,

Review comment:
       Sure. Done.

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       like I mentioned above. This is a compromise. Current algorithms is 
relatively simple while can meet most scenarios(for examples regular traffic 
peak/sudden traffic peak).
   But for extreme cases, if users don't set scale-related configs properly, it 
will trigger scale action too frequently and creates lots of small segments. At 
this time, the user needs to manually control taskCount and this config can 
make disable/enable work more convenient.
   
   When the algorithm is smart enough, It is better to remove this parameter.
   
   As for default value false, I think it is the insurance to prevent users to 
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete 
all the autoscaler related configs.
   

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       like I mentioned above. This is a compromise. Current algorithms is 
relatively simple while can meet most scenarios(for examples regular traffic 
peak/sudden traffic peak).
   
   But for extreme cases, if users don't set scale-related configs properly, it 
will trigger scale action too frequently and creates lots of small segments. At 
this time, the user needs to manually control taskCount and this config can 
make disable/enable work more convenient.
   
   When the algorithm is smart enough, It is better to remove this parameter.
   
   As for default value false, I think it is the insurance to prevent users to 
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete 
all the autoscaler related configs.
   

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       like I mentioned above. This is an insurance and compromise. Current 
algorithms is relatively simple while can meet most scenarios(for examples 
regular traffic peak/sudden traffic peak).
   
   But for extreme cases, if users don't set scale-related configs properly, it 
will trigger scale action too frequently and creates lots of small segments. At 
this time, the user needs to manually control taskCount and this config can 
make disable/enable work more convenient.
   
   When the algorithm is smart enough, It is better to remove this parameter.
   
   As for default value false, I think it is the insurance to prevent users to 
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete 
all the autoscaler related configs.
   

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values.
   
   As`supervisor` is newed in Druid everywhere. It is necessary to create the 
autoscaler which hold `supervisor` instance using jackson? 🤕 

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);

Review comment:
       Ya, for now, we only support Kafka autoScaler. But based on 
https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499
 It   may be not hard to support kinesis autoscaler. And I'm glad to work on it 
after this pr merged. So maybe keep the code abstract is more meaningful and 
there are plenty preparation to ensure that users will not set kinesis auto 
scale by accident:
   1. Docs mention
   2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis 
is not supported yet. Please remove autoscalerConfig or set it null!");` in 
`KinesisSupervisorIOConfig`
   3. set `autoscalerConfig = null` when `super(xxx)` in 
`KinesisSupervisorIOConfig`
   
   Also we can't cast Supervisor to KafkaSupervisor directly unless add an 
extra dependency `druid-kafka-indexing-service` in `indexing-service` module.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();

Review comment:
       Sure, It is a little strange that use map.getOrDefault() here, because 
default value is hard to be unified. 
   
   I modified the way of obtaining configurations from Map to new interface 
`AutoScalerConfig` with a default impl `DefaultAutoScaleConfig`. So that we can 
use JackSon to Instantiate a Config with **default values** instead of 
map.get/parse everywhere. **Also ensure consistency of default values.**
   
   In this way, we don't need to get a handle to `SupervisorTaskAutoscaler`, 
just `autoScalerConfig.getTaskCountMax()` :)

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lags points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      try {
+        long nowTime = System.currentTimeMillis();
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].", 
pendingCompletionTaskGroups, dataSource);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {

Review comment:
       Thanks for your attention.
   
   Actually, there are three hard conditions before do scale action:
   1. Don't scale when supervisor is suspended.
   2. Don't scale when previous task is handing off to avoid inconsistent state.
   3. Don't scale durning cool down time to avoid overly frequent scaling.
   
   And I think no matter what the task type is, no matter what the autoscaler 
impl is, it maybe better to follow these three common conditions.
   
   Also users can define their own conditions like TaskCountLimitation in 
specific impl :)
    

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {

Review comment:
       The reason for designing this condition is that users can disable/enable 
autoscaler for a while easily using this config rather than delete all the 
autoscaler-related configs.
   
   For examples, advertising business in Super Bowl. Traffic is much higher 
during the break time and lower durning Gaming(**Large traffic fluctuations in 
the short term**). If users don't set scale-related configs properly, it will 
trigger scale action too frequently and creates lots of small segments.
   
   Traffic like this we usually set a larger number of tasks temporarily and 
set it false to disable autoscaler for a while. 
   
   Also when scale algorithms become more advanced, it is better to remove this 
config and let autoscaler to do everything. But for now maybe it would be 
better if we keep this parameter :)
   
   

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values. Also we don't 
need to do map.get/parse work anymore.
   
   As`supervisor` is newed in Druid everywhere. It is necessary to create the 
autoscaler which hold `supervisor` instance using jackson? 🤕 

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values. Also we don't 
need to do map.get/parse work anymore.
   
   As`supervisor` is newed in Druid everywhere. Is it necessary to create the 
autoscaler which hold `supervisor` instance using jackson? 🤕 

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       like I mentioned above. This is an insurance and compromise. Current 
algorithms is relatively simple while can meet most scenarios(for examples 
regular traffic peak/sudden traffic peak).
   
   But for extreme cases, if users don't set scale-related configs properly, it 
will trigger scale action too frequently and creates lots of small segments. At 
this time, the user needs to manually control taskCount and this config can 
make disable/enable work more convenient.
   
   When the algorithm is smart enough, It is better to remove this parameter.
   
   As for default value false, I think it is the insurance to prevent users to 
enable autoscaler by accident like left `"autoscalerConfig": {}` after deleted 
all the autoscaler related configs.
   

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values. We don't need to 
do map.get/parse work anymore.
   
   As`supervisor` is newed in Druid everywhere. Is it necessary to create the 
autoscaler which hold `supervisor` instance using jackson? 🤕 

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       Nice catch!  I modified the way of obtaining configurations using 
Jackson like I mentioned above.
   In this way, Users can not only defined their own scale algorithms, but also 
can build corresponding configuration.
   Also it is easier to ensure consistency of default values. We don't need to 
do map.get/parse work anymore.
   
   As`supervisor instance` is newed in Druid everywhere. Is it necessary to 
create the autoscaler which hold `supervisor` instance using jackson? 🤕 

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);

Review comment:
       Ya, for now, we only support Kafka autoScaler. But based on 
https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499
 It   may be not hard to support kinesis autoscaler. And I'm glad to work on it 
soon. So maybe keep the code abstract is more meaningful and there are plenty 
preparation to ensure that users will not set kinesis auto scale by accident:
   1. Docs mention
   2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis 
is not supported yet. Please remove autoscalerConfig or set it null!");` in 
`KinesisSupervisorIOConfig`
   3. set `autoscalerConfig = null` when `super(xxx)` in 
`KinesisSupervisorIOConfig`
   
   Also we can't cast Supervisor to KafkaSupervisor directly unless add an 
extra dependency `druid-kafka-indexing-service` in `indexing-service` module.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to