himanshug commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r556981649



##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3874,11 @@ protected void emitLag()
    * sequences. In Kafka, start offsets are always inclusive.
    */
   protected abstract boolean 
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  protected abstract void collectLag(ArrayList<Long> lags);

Review comment:
       nit: I see that concept of storing lag stats in `ArrayList<Long>` 
predates your PR, it might be simpler to define a new class like and change 
to..... and make related changes in other places where this `ArrayList` is used
   
   ```suggestion
     protected abstract LagStats computeLagStats();
     
     static class LagStats
     {
       private final long maxLag;
       private final long totalLag;
       private final long avgLag;
   
       public LagStats(long maxLag, long totalLag, long avgLag)
       {
         this.maxLag = maxLag;
         this.totalLag = totalLag;
         this.avgLag = avgLag;
       }
   
       public long getMaxLag()
       {
         return maxLag;
       }
   
       public long getTotalLag()
       {
         return totalLag;
       }
   
       public long getAvgLag()
       {
         return avgLag;
       }
     }
   ```

##########
File path: 
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
##########
@@ -361,6 +362,12 @@ public Supervisor createSupervisor()
     );
   }
 
+  @Override
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    return null;
+  }
+

Review comment:
       can this be added as `default` impl in `SupervisorSpec` interface?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {

Review comment:
       I am not sure why we need extra property `enableDynamicAllocationTasks`, 
if user added a non-null `dynamicAllocationTasksProperties` that alone should 
mean that user wanted to enable autoscaling.

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |

Review comment:
       not sure if we need this, if user added 
`dynamicAllocationTasksProperties ` section in the supervisor spec, that alone 
should be enough to enable autoscaling?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig 
getMonitorSchedulerConfig()
   @Override
   public abstract Supervisor createSupervisor();
 
+  /**
+   * need to notice that autoScaler would be null which means autoscale is 
dissable.
+   * @param supervisor
+   * @return autoScaler, disable autoscale will return dummyAutoScaler and 
enable autoscale wiil return defaultAutoScaler by default.
+   */
+  @Override
+  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification = 
"using siwtch(String)")
+  public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+  {
+    String dataSource = getId();
+    SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor, 
dataSource);
+    Map<String, Object> dynamicAllocationTasksProperties = 
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+    if (dynamicAllocationTasksProperties != null && 
!dynamicAllocationTasksProperties.isEmpty() && 
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
 false)))) {
+      String autoScalerStrategy = 
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
 "default"));
+
+      // will thorw 'Return value of String.hashCode() ignored : 
RV_RETURN_VALUE_IGNORED' just Suppress it.
+      switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+        default: autoScaler = new DefaultAutoScaler(supervisor, dataSource, 
dynamicAllocationTasksProperties, this);
+      }

Review comment:
       can we create the autoscaler instance using jackson ... i.e. something 
like `jsonMapper.readValueFrom...()`

##########
File path: 
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
##########
@@ -282,6 +283,23 @@ public void checkpoint(int taskGroupId, DataSourceMetadata 
checkpointMetadata)
     // do nothing
   }
 
+  @Override
+  public void collectLag(ArrayList<Long> lags)
+  {
+  }
+
+  @Override
+  public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+  {
+    return null;
+  }
+
+  @Override
+  public Map getSupervisorTaskInfos()
+  {
+    return null;
+  }
+

Review comment:
       these should throw UnSupportedOperationException instead as they are not 
supposed to be called

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|

Review comment:
       I think autoscaling better describes this feature , so maybe call it 
`autoscalerConfig` 

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);
+
+  /**
+   * use for autoscaler
+   */
+  Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);

Review comment:
       can we instead have `void reconcileTaskCount()` which looks at current 
task count in the io config, and does things to match that many number of 
active tasks.
   
   autoscale impl would be responsible for updating the task count in task io 
config and then calling this method.

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);

Review comment:
       At this time, I think this is very specific to `KafkaSupervisor` and it 
seems that currently we only want to support autoscaling for kafka indexing , 
so I would say in this PR, we rename `DefaultAutoScaler` to 
`KafkaIndexingDefaultAutoScaler` and let `KafkaIndexingDefaultAutoScaler` cast 
`Supervisor` to `KafkaSupervisor` so as to use `KafkaSupervisor.collectLag(..)` 
directly and not have it in the interface.
   
   If, at a later time, Kinesis starts using it in some form, then `Supervisor` 
interface can be modified at that time.

##########
File path: 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
    * @param checkpointMetadata metadata for the sequence to currently 
checkpoint
    */
   void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+  /**
+   * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+   * Only support Kafka ingestion so far.
+   * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+   */
+  void collectLag(ArrayList<Long> lags);
+
+  /**
+   * use for autoscaler
+   */
+  Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
+
+  Map getSupervisorTaskInfos();

Review comment:
       seems we only really need the active task group count, so, can we have 
`int getActiveTaskGroupsCount()` instead ?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
     }
   }
 
+  // change taskCount without resubmitting.
+  private class DynamicAllocationTasksNotice implements Notice
+  {
+    Callable<Integer> scaleAction;
+
+    DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+    {
+      this.scaleAction = scaleAction;
+    }
+
+    /**
+     * This method will do lags points collection and check dynamic scale 
action is necessary or not.
+     */
+    @Override
+    public void handle()
+    {
+      try {
+        long nowTime = System.currentTimeMillis();
+        // Only queue is full and over minTriggerDynamicFrequency can trigger 
scale out/in
+        if (spec.isSuspended()) {
+          log.info("[%s] supervisor is suspended, skip to check dynamic 
allocate task logic", dataSource);
+          return;
+        }
+        log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].", 
pendingCompletionTaskGroups, dataSource);
+        for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) 
{
+          if (!list.isEmpty()) {
+            log.info("Still hand off tasks unfinished, skip to do scale action 
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+            return;
+          }
+        }
+        if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {

Review comment:
       this type of logic should live inside the autoscaler impl I think which 
should decide when to trigger the autoscaling

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
     this.useExclusiveStartingSequence = useExclusiveStartingSequence;
     this.dataSource = spec.getDataSchema().getDataSource();
     this.ioConfig = spec.getIoConfig();
+    this.dynamicAllocationTasksProperties = 
ioConfig.getDynamicAllocationTasksProperties();

Review comment:
       could we instead get a handle to `SupervisorTaskAutoscaler` and have 
`SupervisorTaskAutoscaler.getMaxTaskCount()` provide maximum task count ?

##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
 |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
 |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
 |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties` 
to specify how to auto scale the number of Kafka ingest tasks based on Lag 
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks 
Properties) for details.|no (default == null)|

Review comment:
       be very specific that this is  ONLY supported for kafka indexing as of 
now.

##########
File path: 
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -70,6 +72,7 @@ public KinesisSupervisorIOConfig(
       @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
       @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
       @JsonProperty("awsExternalId") String awsExternalId,
+      @JsonProperty("dynamicAllocationTasksProperties") Map<String, Object> 
dynamicAllocationTasksProperties,

Review comment:
       we should actually throw exception if someone sets this on a kinesis 
supervisor spec ... as that is not expected.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to