zhangyue19921010 commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r569974080
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3874,11 @@ protected void emitLag()
* sequences. In Kafka, start offsets are always inclusive.
*/
protected abstract boolean
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ protected abstract void collectLag(ArrayList<Long> lags);
Review comment:
Nice idea. Changed.
##########
File path:
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
##########
@@ -361,6 +362,12 @@ public Supervisor createSupervisor()
);
}
+ @Override
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
Review comment:
changed.
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
Review comment:
Ya, for now, we only support Kafka autoScaler. But based on
`https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499`.
It may be not hard to support kinesis autoscaler. And I'am glad to work on
it after this pr merged. So maybe keep the code abstract is more meaningful and
there are plenty preparation to ensure that users will not set kinesis auto
scale by accident:
1. Docs mention
2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis
is not supported yet. Please remove autoscalerConfig or set it null!");` in
`KinesisSupervisorIOConfig`
3. set `autoscalerConfig = null` when `super(xxx)` in
`KinesisSupervisorIOConfig`
Also we can't cast Supervisor to KafkaSupervisor unless add an extra
dependency `druid-kafka-indexing-service` in `indexing-service` module.
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
+
+ /**
+ * use for autoscaler
+ */
+ Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
Review comment:
Nice catch! I review the code and find out there is no need for this
`buildDynamicAllocationTask ` method in `Supervisor.java` and autoscaler can
build autoscale notice itself and supervisor will do scale action.
So I removed `buildDynamicAllocationTask` func in `Supervisor.java`
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
+
+ /**
+ * use for autoscaler
+ */
+ Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
+
+ Map getSupervisorTaskInfos();
Review comment:
Sure. Changed.
##########
File path:
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
##########
@@ -282,6 +283,23 @@ public void checkpoint(int taskGroupId, DataSourceMetadata
checkpointMetadata)
// do nothing
}
+ @Override
+ public void collectLag(ArrayList<Long> lags)
+ {
+ }
+
+ @Override
+ public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+ {
+ return null;
+ }
+
+ @Override
+ public Map getSupervisorTaskInfos()
+ {
+ return null;
+ }
+
Review comment:
Sure. Done.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
Review comment:
I modified the way of obtaining configurations from Map to new interface
`AutoScalerConfig` with a default impl `DefaultAutoScaleConfig`. So that we can
use JackSon to Instantiate a Config with **default values** instead of
map.get/parse everywhere. **Also ensure consistency of default values.**
In this way, we don't need to get a handle to `SupervisorTaskAutoscaler`,
just `autoScalerConfig.getTaskCountMax()` :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
Review comment:
Actually, there are three hard conditions before do scale action:
1. Don't scale when supervisor is suspended.
2. Don't scale when previous task is handing off to avoid inconsistent state.
3. Don't scale durning cool down time to avoid overly frequent scaling.
And I think no matter what the task type is, no matter what the autoscaler
impl is, it maybe batter to follow the above three common conditions.
Also users can define their own condition like TaskCountLimitation in
specific impl :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
Review comment:
The reason for designing this condition is that users can disable/enable
autoscaler for a while easily rather than delete all the autoscaler-related
configs.
For examples, advertising business in Super Bowl. Traffic is much higher
during the break time and lower durning Gaming(**Large traffic fluctuations in
the short term**). If users don't set scale-related configs properly, it will
trigger scale action too frequently and creates lots of small segments.
Traffic like this we usually set a larger number of tasks temporarily and
set it false to disable autoscaler for a while.
Also when scale algorithms become more advanced, it is better to remove this
config and let autoscaler to do everything. But for now maybe it would be
better if we keep this parameter :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values.
As`supervisor` is newed in Druid everywhere. It is necessary to create the
autoscaler instance using jackson? 🤕
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
like I mentioned above. This is a compromise. Current algorithms is
relatively simple while can meet most scenarios.
But for extreme cases, if users don't set scale-related configs properly, it
will trigger scale action too frequently and creates lots of small segments. At
this time, the user needs to manually control taskCount and this config can
make disable/enable work more convenient.
When the algorithm is smart enough, It is better to remove this parameter.
As for default value false, I think it is the insurance to prevent users to
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete
all the autoscaler related configs.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
Review comment:
Make sense. Changed
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -70,6 +72,7 @@ public KinesisSupervisorIOConfig(
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
+ @JsonProperty("dynamicAllocationTasksProperties") Map<String, Object>
dynamicAllocationTasksProperties,
Review comment:
Sure. Done.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
like I mentioned above. This is a compromise. Current algorithms is
relatively simple while can meet most scenarios(for examples regular traffic
peak/sudden traffic peak).
But for extreme cases, if users don't set scale-related configs properly, it
will trigger scale action too frequently and creates lots of small segments. At
this time, the user needs to manually control taskCount and this config can
make disable/enable work more convenient.
When the algorithm is smart enough, It is better to remove this parameter.
As for default value false, I think it is the insurance to prevent users to
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete
all the autoscaler related configs.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
like I mentioned above. This is a compromise. Current algorithms is
relatively simple while can meet most scenarios(for examples regular traffic
peak/sudden traffic peak).
But for extreme cases, if users don't set scale-related configs properly, it
will trigger scale action too frequently and creates lots of small segments. At
this time, the user needs to manually control taskCount and this config can
make disable/enable work more convenient.
When the algorithm is smart enough, It is better to remove this parameter.
As for default value false, I think it is the insurance to prevent users to
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete
all the autoscaler related configs.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
like I mentioned above. This is an insurance and compromise. Current
algorithms is relatively simple while can meet most scenarios(for examples
regular traffic peak/sudden traffic peak).
But for extreme cases, if users don't set scale-related configs properly, it
will trigger scale action too frequently and creates lots of small segments. At
this time, the user needs to manually control taskCount and this config can
make disable/enable work more convenient.
When the algorithm is smart enough, It is better to remove this parameter.
As for default value false, I think it is the insurance to prevent users to
enable autoscaler by accident like left `"autoscalerConfig": {}` after delete
all the autoscaler related configs.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values.
As`supervisor` is newed in Druid everywhere. It is necessary to create the
autoscaler which hold `supervisor` instance using jackson? 🤕
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
Review comment:
Ya, for now, we only support Kafka autoScaler. But based on
https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499
It may be not hard to support kinesis autoscaler. And I'm glad to work on it
after this pr merged. So maybe keep the code abstract is more meaningful and
there are plenty preparation to ensure that users will not set kinesis auto
scale by accident:
1. Docs mention
2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis
is not supported yet. Please remove autoscalerConfig or set it null!");` in
`KinesisSupervisorIOConfig`
3. set `autoscalerConfig = null` when `super(xxx)` in
`KinesisSupervisorIOConfig`
Also we can't cast Supervisor to KafkaSupervisor directly unless add an
extra dependency `druid-kafka-indexing-service` in `indexing-service` module.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
Review comment:
Sure, It is a little strange that use map.getOrDefault() here, because
default value is hard to be unified.
I modified the way of obtaining configurations from Map to new interface
`AutoScalerConfig` with a default impl `DefaultAutoScaleConfig`. So that we can
use JackSon to Instantiate a Config with **default values** instead of
map.get/parse everywhere. **Also ensure consistency of default values.**
In this way, we don't need to get a handle to `SupervisorTaskAutoscaler`,
just `autoScalerConfig.getTaskCountMax()` :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
Review comment:
Thanks for your attention.
Actually, there are three hard conditions before do scale action:
1. Don't scale when supervisor is suspended.
2. Don't scale when previous task is handing off to avoid inconsistent state.
3. Don't scale durning cool down time to avoid overly frequent scaling.
And I think no matter what the task type is, no matter what the autoscaler
impl is, it maybe better to follow these three common conditions.
Also users can define their own conditions like TaskCountLimitation in
specific impl :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
Review comment:
The reason for designing this condition is that users can disable/enable
autoscaler for a while easily using this config rather than delete all the
autoscaler-related configs.
For examples, advertising business in Super Bowl. Traffic is much higher
during the break time and lower durning Gaming(**Large traffic fluctuations in
the short term**). If users don't set scale-related configs properly, it will
trigger scale action too frequently and creates lots of small segments.
Traffic like this we usually set a larger number of tasks temporarily and
set it false to disable autoscaler for a while.
Also when scale algorithms become more advanced, it is better to remove this
config and let autoscaler to do everything. But for now maybe it would be
better if we keep this parameter :)
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values. Also we don't
need to do map.get/parse work anymore.
As`supervisor` is newed in Druid everywhere. It is necessary to create the
autoscaler which hold `supervisor` instance using jackson? 🤕
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values. Also we don't
need to do map.get/parse work anymore.
As`supervisor` is newed in Druid everywhere. Is it necessary to create the
autoscaler which hold `supervisor` instance using jackson? 🤕
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
like I mentioned above. This is an insurance and compromise. Current
algorithms is relatively simple while can meet most scenarios(for examples
regular traffic peak/sudden traffic peak).
But for extreme cases, if users don't set scale-related configs properly, it
will trigger scale action too frequently and creates lots of small segments. At
this time, the user needs to manually control taskCount and this config can
make disable/enable work more convenient.
When the algorithm is smart enough, It is better to remove this parameter.
As for default value false, I think it is the insurance to prevent users to
enable autoscaler by accident like left `"autoscalerConfig": {}` after deleted
all the autoscaler related configs.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values. We don't need to
do map.get/parse work anymore.
As`supervisor` is newed in Druid everywhere. Is it necessary to create the
autoscaler which hold `supervisor` instance using jackson? 🤕
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
Nice catch! I modified the way of obtaining configurations using
Jackson like I mentioned above.
In this way, Users can not only defined their own scale algorithms, but also
can build corresponding configuration.
Also it is easier to ensure consistency of default values. We don't need to
do map.get/parse work anymore.
As`supervisor instance` is newed in Druid everywhere. Is it necessary to
create the autoscaler which hold `supervisor` instance using jackson? 🤕
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
Review comment:
Ya, for now, we only support Kafka autoScaler. But based on
https://github.com/apache/druid/blob/118b50195e5c2989e04e0f5290aa72cae114db39/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java#L3499
It may be not hard to support kinesis autoscaler. And I'm glad to work on it
soon. So maybe keep the code abstract is more meaningful and there are plenty
preparation to ensure that users will not set kinesis auto scale by accident:
1. Docs mention
2. `throw new UnsupportedOperationException("Tasks auto scaler for kinesis
is not supported yet. Please remove autoscalerConfig or set it null!");` in
`KinesisSupervisorIOConfig`
3. set `autoscalerConfig = null` when `super(xxx)` in
`KinesisSupervisorIOConfig`
Also we can't cast Supervisor to KafkaSupervisor directly unless add an
extra dependency `druid-kafka-indexing-service` in `indexing-service` module.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]