himanshug commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r556981649
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -3561,4 +3874,11 @@ protected void emitLag()
* sequences. In Kafka, start offsets are always inclusive.
*/
protected abstract boolean
useExclusiveStartSequenceNumberForNonFirstSequence();
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ protected abstract void collectLag(ArrayList<Long> lags);
Review comment:
nit: I see that concept of storing lag stats in `ArrayList<Long>`
predates your PR, it might be simpler to define a new class like and change
to..... and make related changes in other places where this `ArrayList` is used
```suggestion
protected abstract LagStats computeLagStats();
static class LagStats
{
private final long maxLag;
private final long totalLag;
private final long avgLag;
public LagStats(long maxLag, long totalLag, long avgLag)
{
this.maxLag = maxLag;
this.totalLag = totalLag;
this.avgLag = avgLag;
}
public long getMaxLag()
{
return maxLag;
}
public long getTotalLag()
{
return totalLag;
}
public long getAvgLag()
{
return avgLag;
}
}
```
##########
File path:
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java
##########
@@ -361,6 +362,12 @@ public Supervisor createSupervisor()
);
}
+ @Override
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ return null;
+ }
+
Review comment:
can this be added as `default` impl in `SupervisorSpec` interface?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
Review comment:
I am not sure why we need extra property `enableDynamicAllocationTasks`,
if user added a non-null `dynamicAllocationTasksProperties` that alone should
mean that user wanted to enable autoscaling.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
+
+#### Dynamic Allocation Tasks Properties
+
+| Property | Description | Default |
+| ------------- | ------------- | ------------- |
+| `enableDynamicAllocationTasks` | whether enable this feature or not | false |
Review comment:
not sure if we need this, if user added
`dynamicAllocationTasksProperties ` section in the supervisor spec, that alone
should be enough to enable autoscaling?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java
##########
@@ -151,6 +156,29 @@ public DruidMonitorSchedulerConfig
getMonitorSchedulerConfig()
@Override
public abstract Supervisor createSupervisor();
+ /**
+ * need to notice that autoScaler would be null which means autoscale is
dissable.
+ * @param supervisor
+ * @return autoScaler, disable autoscale will return dummyAutoScaler and
enable autoscale wiil return defaultAutoScaler by default.
+ */
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED", justification =
"using siwtch(String)")
+ public SupervisorTaskAutoscaler createAutoscaler(Supervisor supervisor)
+ {
+ String dataSource = getId();
+ SupervisorTaskAutoscaler autoScaler = new DummyAutoScaler(supervisor,
dataSource);
+ Map<String, Object> dynamicAllocationTasksProperties =
ingestionSchema.getIOConfig().getDynamicAllocationTasksProperties();
+ if (dynamicAllocationTasksProperties != null &&
!dynamicAllocationTasksProperties.isEmpty() &&
Boolean.parseBoolean(String.valueOf(dynamicAllocationTasksProperties.getOrDefault("enableDynamicAllocationTasks",
false)))) {
+ String autoScalerStrategy =
String.valueOf(dynamicAllocationTasksProperties.getOrDefault("autoScalerStrategy",
"default"));
+
+ // will thorw 'Return value of String.hashCode() ignored :
RV_RETURN_VALUE_IGNORED' just Suppress it.
+ switch (StringUtils.toLowerCase(autoScalerStrategy)) {
+ default: autoScaler = new DefaultAutoScaler(supervisor, dataSource,
dynamicAllocationTasksProperties, this);
+ }
Review comment:
can we create the autoscaler instance using jackson ... i.e. something
like `jsonMapper.readValueFrom...()`
##########
File path:
extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
##########
@@ -282,6 +283,23 @@ public void checkpoint(int taskGroupId, DataSourceMetadata
checkpointMetadata)
// do nothing
}
+ @Override
+ public void collectLag(ArrayList<Long> lags)
+ {
+ }
+
+ @Override
+ public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
+ {
+ return null;
+ }
+
+ @Override
+ public Map getSupervisorTaskInfos()
+ {
+ return null;
+ }
+
Review comment:
these should throw UnSupportedOperationException instead as they are not
supposed to be called
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
Review comment:
I think autoscaling better describes this feature , so maybe call it
`autoscalerConfig`
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
+
+ /**
+ * use for autoscaler
+ */
+ Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
Review comment:
can we instead have `void reconcileTaskCount()` which looks at current
task count in the io config, and does things to match that many number of
active tasks.
autoscale impl would be responsible for updating the task count in task io
config and then calling this method.
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
Review comment:
At this time, I think this is very specific to `KafkaSupervisor` and it
seems that currently we only want to support autoscaling for kafka indexing ,
so I would say in this PR, we rename `DefaultAutoScaler` to
`KafkaIndexingDefaultAutoScaler` and let `KafkaIndexingDefaultAutoScaler` cast
`Supervisor` to `KafkaSupervisor` so as to use `KafkaSupervisor.collectLag(..)`
directly and not have it in the interface.
If, at a later time, Kinesis starts using it in some form, then `Supervisor`
interface can be modified at that time.
##########
File path:
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
##########
@@ -64,4 +66,18 @@ default Boolean isHealthy()
* @param checkpointMetadata metadata for the sequence to currently
checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
+
+ /**
+ * Collect maxLag, totalLag, avgLag into ArrayList<Long> lags
+ * Only support Kafka ingestion so far.
+ * @param lags , Notice : The order of values is maxLag, totalLag and avgLag.
+ */
+ void collectLag(ArrayList<Long> lags);
+
+ /**
+ * use for autoscaler
+ */
+ Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction);
+
+ Map getSupervisorTaskInfos();
Review comment:
seems we only really need the active task group count, so, can we have
`int getActiveTaskGroupsCount()` instead ?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +322,114 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lags points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ try {
+ long nowTime = System.currentTimeMillis();
+ // Only queue is full and over minTriggerDynamicFrequency can trigger
scale out/in
+ if (spec.isSuspended()) {
+ log.info("[%s] supervisor is suspended, skip to check dynamic
allocate task logic", dataSource);
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s].",
pendingCompletionTaskGroups, dataSource);
+ for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values())
{
+ if (!list.isEmpty()) {
+ log.info("Still hand off tasks unfinished, skip to do scale action
[%s] for dataSource [%s].", pendingCompletionTaskGroups, dataSource);
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime < minTriggerDynamicFrequency) {
Review comment:
this type of logic should live inside the autoscaler impl I think which
should decide when to trigger the autoscaling
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -519,20 +636,40 @@ public SeekableStreamSupervisor(
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
+ this.dynamicAllocationTasksProperties =
ioConfig.getDynamicAllocationTasksProperties();
Review comment:
could we instead get a handle to `SupervisorTaskAutoscaler` and have
`SupervisorTaskAutoscaler.getMaxTaskCount()` provide maximum task count ?
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`dynamicAllocationTasksProperties`|Object|`dynamicAllocationTasksProperties`
to specify how to auto scale the number of Kafka ingest tasks based on Lag
metrics. See [Dynamic Allocation Tasks Properties](#Dynamic Allocation Tasks
Properties) for details.|no (default == null)|
Review comment:
be very specific that this is ONLY supported for kafka indexing as of
now.
##########
File path:
extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java
##########
@@ -70,6 +72,7 @@ public KinesisSupervisorIOConfig(
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
+ @JsonProperty("dynamicAllocationTasksProperties") Map<String, Object>
dynamicAllocationTasksProperties,
Review comment:
we should actually throw exception if someone sets this on a kinesis
supervisor spec ... as that is not expected.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]