himanshug commented on a change in pull request #10524:
URL: https://github.com/apache/druid/pull/10524#discussion_r585157002
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or
ignored here will disable `autoScaler` even though `autoScalerConfig` is not
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points
collection. | no (default == 30000) |
Review comment:
```suggestion
| `lagCollectionIntervalMillis` | Period of lag points collection. | no
(default == 30000) |
```
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or
ignored here will disable `autoScaler` even though `autoScalerConfig` is not
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points
collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default ==
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >=
taskCountMin` | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the
value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be
the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions | no (default == 600000) |
+| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is
supported for now. | no (default == `lagBased`) |
Review comment:
Can we make the distinction that, following properties are common to any
autoscaler and rest are specific to `lagBased` autoscaler , maybe have two
tables.
```
autoScalerStrategy
enableTaskAutoScaler
taskCountMin
taskCountMax
minTriggerScaleActionFrequencyMillis
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lag points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ if (autoScalerConfig == null) {
+ log.warn("autoScalerConfig is null but dynamic allocation notice is
submitted, how can it be ?");
+ } else {
+ try {
+ long nowTime = System.currentTimeMillis();
+ if (spec.isSuspended()) {
+ log.info("Skipping DynamicAllocationTasksNotice execution because
[%s] supervisor is suspended",
+ dataSource
+ );
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]",
pendingCompletionTaskGroups,
+ dataSource
+ );
+ for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
+ if (!list.isEmpty()) {
+ log.info(
+ "Skipping DynamicAllocationTasksNotice execution for
datasource [%s] because following tasks are pending [%s]",
+ dataSource, pendingCompletionTaskGroups
+ );
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+ log.info(
+ "DynamicAllocationTasksNotice submitted again in [%d]
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+ nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+ );
+ return;
+ }
+ final Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in
the next 'RunNotice'.
+ * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and
sync it to MetadataStorage.
+ * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting the
supervisor.
+ * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+ * @return Boolean flag indicating if scale action was executed or not. If
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next
'changeTaskCount'.
+ * If false, it will do 'changeTaskCount' again after
'scaleActionPeriodMillis' millis.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean changeTaskCount(Integer desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
+ {
+ int currentActiveTaskCount;
+ Collection<TaskGroup> activeTaskGroups =
activelyReadingTaskGroups.values();
+ currentActiveTaskCount = activeTaskGroups.size();
+
+ if (desiredActiveTaskCount == -1 || desiredActiveTaskCount ==
currentActiveTaskCount) {
Review comment:
```suggestion
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount ==
currentActiveTaskCount) {
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+ private static final EmittingLogger log = new
EmittingLogger(LagBasedAutoScaler.class);
+ private final String dataSource;
+ private final CircularFifoQueue<Long> lagMetricsQueue;
+ private final ScheduledExecutorService lagComputationExec;
+ private final ScheduledExecutorService allocationExec;
+ private final SupervisorSpec spec;
+ private final SeekableStreamSupervisor supervisor;
+ private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String
dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ )
+ {
+ this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String supervisorId = StringUtils.format("Supervisor-%s",
dataSource);
+ this.dataSource = dataSource;
+ final int slots = (int)
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() /
lagBasedAutoScalerConfig
+ .getLagCollectionIntervalMillis()) + 1;
+ this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+ this.allocationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Allocation-%d");
+ this.lagComputationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Computation-%d");
+ this.spec = spec;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void start()
+ {
+ Callable<Integer> scaleAction = () -> {
+ LOCK.lock();
+ int desiredTaskCount = -1;
+ try {
+ desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
+
+ if (desiredTaskCount != -1) {
+ lagMetricsQueue.clear();
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ return desiredTaskCount;
+ };
+
+ lagComputationExec.scheduleAtFixedRate(
+ computeAndCollectLag(),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for
tasks to start up
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ allocationExec.scheduleAtFixedRate(
+ supervisor.buildDynamicAllocationTask(scaleAction),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
+ .getLagCollectionRangeMillis(),
+ lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ log.info(
+ "LagBasedAutoScaler will collect lag every [%d] millis and will keep
[%d] data points for the last [%d] millis for dataSource [%s]",
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.size(),
+ lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+ );
+ }
+
+ @Override
+ public void stop()
+ {
+ allocationExec.shutdownNow();
+ lagComputationExec.shutdownNow();
+ }
+
+ @Override
+ public void reset()
+ {
+ // clear queue for kafka lags
+ if (lagMetricsQueue != null) {
+ try {
+ LOCK.lock();
+ lagMetricsQueue.clear();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error,when clear queue in rest action");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * This method computes current consumer lag. Gets the total lag of all
partitions and fill in the lagMetricsQueue
+ *
+ * @return a Runnbale object to compute and collect lag.
+ */
+ private Runnable computeAndCollectLag()
+ {
+ return () -> {
+ LOCK.lock();
+ try {
+ if (!spec.isSuspended()) {
+ LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ lagMetricsQueue.offer(0L);
+ } else {
+ long totalLags = lagStats.getTotalLag();
+ lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
Review comment:
why shouldn't we expect lagStats.getTotalLag() to return a value >= 0 ?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
##########
@@ -318,6 +323,127 @@ public void handle()
}
}
+ // change taskCount without resubmitting.
+ private class DynamicAllocationTasksNotice implements Notice
+ {
+ Callable<Integer> scaleAction;
+
+ DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
+ {
+ this.scaleAction = scaleAction;
+ }
+
+ /**
+ * This method will do lag points collection and check dynamic scale
action is necessary or not.
+ */
+ @Override
+ public void handle()
+ {
+ if (autoScalerConfig == null) {
+ log.warn("autoScalerConfig is null but dynamic allocation notice is
submitted, how can it be ?");
+ } else {
+ try {
+ long nowTime = System.currentTimeMillis();
+ if (spec.isSuspended()) {
+ log.info("Skipping DynamicAllocationTasksNotice execution because
[%s] supervisor is suspended",
+ dataSource
+ );
+ return;
+ }
+ log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]",
pendingCompletionTaskGroups,
+ dataSource
+ );
+ for (CopyOnWriteArrayList<TaskGroup> list :
pendingCompletionTaskGroups.values()) {
+ if (!list.isEmpty()) {
+ log.info(
+ "Skipping DynamicAllocationTasksNotice execution for
datasource [%s] because following tasks are pending [%s]",
+ dataSource, pendingCompletionTaskGroups
+ );
+ return;
+ }
+ }
+ if (nowTime - dynamicTriggerLastRunTime <
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
+ log.info(
+ "DynamicAllocationTasksNotice submitted again in [%d]
millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
+ nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
+ );
+ return;
+ }
+ final Integer desriedTaskCount = scaleAction.call();
+ boolean allocationSuccess = changeTaskCount(desriedTaskCount);
+ if (allocationSuccess) {
+ dynamicTriggerLastRunTime = nowTime;
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
+ }
+ }
+ }
+ }
+
+ /**
+ * This method determines how to do scale actions based on collected lag
points.
+ * If scale action is triggered :
+ * First of all, call gracefulShutdownInternal() which will change the
state of current datasource ingest tasks from reading to publishing.
+ * Secondly, clear all the stateful data structures:
activelyReadingTaskGroups, partitionGroups, partitionOffsets,
pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in
the next 'RunNotice'.
+ * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and
sync it to MetadataStorage.
+ * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next
RunNotice will create scaled number of ingest tasks without resubmitting the
supervisor.
+ * @param desiredActiveTaskCount desired taskCount computed from AutoScaler
+ * @return Boolean flag indicating if scale action was executed or not. If
true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next
'changeTaskCount'.
+ * If false, it will do 'changeTaskCount' again after
'scaleActionPeriodMillis' millis.
+ * @throws InterruptedException
+ * @throws ExecutionException
+ * @throws TimeoutException
+ */
+ private boolean changeTaskCount(Integer desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
Review comment:
```suggestion
private boolean changeTaskCount(int desiredActiveTaskCount) throws
InterruptedException, ExecutionException, TimeoutException
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+ private static final EmittingLogger log = new
EmittingLogger(LagBasedAutoScaler.class);
+ private final String dataSource;
+ private final CircularFifoQueue<Long> lagMetricsQueue;
+ private final ScheduledExecutorService lagComputationExec;
+ private final ScheduledExecutorService allocationExec;
+ private final SupervisorSpec spec;
+ private final SeekableStreamSupervisor supervisor;
+ private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String
dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ )
+ {
+ this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String supervisorId = StringUtils.format("Supervisor-%s",
dataSource);
+ this.dataSource = dataSource;
+ final int slots = (int)
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() /
lagBasedAutoScalerConfig
+ .getLagCollectionIntervalMillis()) + 1;
+ this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+ this.allocationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Allocation-%d");
+ this.lagComputationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Computation-%d");
+ this.spec = spec;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void start()
+ {
+ Callable<Integer> scaleAction = () -> {
+ LOCK.lock();
+ int desiredTaskCount = -1;
+ try {
+ desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
+
+ if (desiredTaskCount != -1) {
+ lagMetricsQueue.clear();
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ return desiredTaskCount;
+ };
+
+ lagComputationExec.scheduleAtFixedRate(
+ computeAndCollectLag(),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for
tasks to start up
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ allocationExec.scheduleAtFixedRate(
+ supervisor.buildDynamicAllocationTask(scaleAction),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
+ .getLagCollectionRangeMillis(),
+ lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ log.info(
+ "LagBasedAutoScaler will collect lag every [%d] millis and will keep
[%d] data points for the last [%d] millis for dataSource [%s]",
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.size(),
+ lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+ );
+ }
+
+ @Override
+ public void stop()
+ {
+ allocationExec.shutdownNow();
+ lagComputationExec.shutdownNow();
+ }
+
+ @Override
+ public void reset()
+ {
+ // clear queue for kafka lags
+ if (lagMetricsQueue != null) {
+ try {
+ LOCK.lock();
+ lagMetricsQueue.clear();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error,when clear queue in rest action");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * This method computes current consumer lag. Gets the total lag of all
partitions and fill in the lagMetricsQueue
+ *
+ * @return a Runnbale object to compute and collect lag.
+ */
+ private Runnable computeAndCollectLag()
+ {
+ return () -> {
+ LOCK.lock();
+ try {
+ if (!spec.isSuspended()) {
+ LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ lagMetricsQueue.offer(0L);
+ } else {
+ long totalLags = lagStats.getTotalLag();
+ lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+ }
+ log.debug("Current lags [%s] for dataSource [%s].", new
ArrayList<>(lagMetricsQueue), dataSource);
+ } else {
+ log.warn("[%s] supervisor is suspended, skipping lag collection",
dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error while collecting lags");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ };
+ }
+
+ /**
+ * This method determines whether to do scale actions based on collected lag
points.
+ * Current algorithm of scale is simple:
+ * First of all, compute the proportion of lag points higher/lower than
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+ * Secondly, compare scaleOutThreshold/scaleInThreshold with
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale
out action has higher priority than scale in action.
+ * Finaly, if scaleOutThreshold/scaleInThreshold is higher than
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale
out/in action would be triggered.
+ *
+ * @param lags the lag metrics of Stream(Kafka/Kinesis)
+ * @return Integer. target number of tasksCount, -1 means skip scale action.
+ */
+ private Integer computeDesiredTaskCount(List<Long> lags)
Review comment:
```suggestion
private int computeDesiredTaskCount(List<Long> lags)
```
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+ private static final EmittingLogger log = new
EmittingLogger(LagBasedAutoScaler.class);
+ private final String dataSource;
+ private final CircularFifoQueue<Long> lagMetricsQueue;
+ private final ScheduledExecutorService lagComputationExec;
+ private final ScheduledExecutorService allocationExec;
+ private final SupervisorSpec spec;
+ private final SeekableStreamSupervisor supervisor;
+ private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String
dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ )
+ {
+ this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String supervisorId = StringUtils.format("Supervisor-%s",
dataSource);
+ this.dataSource = dataSource;
+ final int slots = (int)
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() /
lagBasedAutoScalerConfig
+ .getLagCollectionIntervalMillis()) + 1;
+ this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+ this.allocationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Allocation-%d");
+ this.lagComputationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Computation-%d");
+ this.spec = spec;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void start()
+ {
+ Callable<Integer> scaleAction = () -> {
+ LOCK.lock();
+ int desiredTaskCount = -1;
+ try {
+ desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
+
+ if (desiredTaskCount != -1) {
+ lagMetricsQueue.clear();
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ return desiredTaskCount;
+ };
+
+ lagComputationExec.scheduleAtFixedRate(
+ computeAndCollectLag(),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for
tasks to start up
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ allocationExec.scheduleAtFixedRate(
+ supervisor.buildDynamicAllocationTask(scaleAction),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
+ .getLagCollectionRangeMillis(),
Review comment:
not sure why `lagCollectionRangeMillis` was added to
`scaleActionStartDelayMillis` .
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or
ignored here will disable `autoScaler` even though `autoScalerConfig` is not
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points
collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default ==
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
+| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
+| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
+| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >=
taskCountMin` | yes |
+| `taskCountMin` | Minimum value of task count. When enable autoscaler, the
value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be
the number of tasks that ingestion starts going up to `taskCountMax`| yes |
+| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
+| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
+| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions | no (default == 600000) |
Review comment:
wouldn't time interval between two scale actions be always greater/equal
to `scaleActionPeriodMillis` ?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
##########
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
+
+import org.apache.commons.collections4.queue.CircularFifoQueue;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
+import
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+
+public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
+{
+ private static final EmittingLogger log = new
EmittingLogger(LagBasedAutoScaler.class);
+ private final String dataSource;
+ private final CircularFifoQueue<Long> lagMetricsQueue;
+ private final ScheduledExecutorService lagComputationExec;
+ private final ScheduledExecutorService allocationExec;
+ private final SupervisorSpec spec;
+ private final SeekableStreamSupervisor supervisor;
+ private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
+
+ private static final ReentrantLock LOCK = new ReentrantLock(true);
+
+ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String
dataSource,
+ LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
+ )
+ {
+ this.lagBasedAutoScalerConfig = autoScalerConfig;
+ final String supervisorId = StringUtils.format("Supervisor-%s",
dataSource);
+ this.dataSource = dataSource;
+ final int slots = (int)
(lagBasedAutoScalerConfig.getLagCollectionRangeMillis() /
lagBasedAutoScalerConfig
+ .getLagCollectionIntervalMillis()) + 1;
+ this.lagMetricsQueue = new CircularFifoQueue<>(slots);
+ this.allocationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Allocation-%d");
+ this.lagComputationExec =
Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) +
"-Computation-%d");
+ this.spec = spec;
+ this.supervisor = supervisor;
+ }
+
+ @Override
+ public void start()
+ {
+ Callable<Integer> scaleAction = () -> {
+ LOCK.lock();
+ int desiredTaskCount = -1;
+ try {
+ desiredTaskCount = computeDesiredTaskCount(new
ArrayList<>(lagMetricsQueue));
+
+ if (desiredTaskCount != -1) {
+ lagMetricsQueue.clear();
+ }
+ }
+ catch (Exception ex) {
+ log.warn(ex, "Exception while computing desired task count for [%s]",
dataSource);
+ }
+ finally {
+ LOCK.unlock();
+ }
+ return desiredTaskCount;
+ };
+
+ lagComputationExec.scheduleAtFixedRate(
+ computeAndCollectLag(),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for
tasks to start up
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ allocationExec.scheduleAtFixedRate(
+ supervisor.buildDynamicAllocationTask(scaleAction),
+ lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() +
lagBasedAutoScalerConfig
+ .getLagCollectionRangeMillis(),
+ lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
+ TimeUnit.MILLISECONDS
+ );
+ log.info(
+ "LagBasedAutoScaler will collect lag every [%d] millis and will keep
[%d] data points for the last [%d] millis for dataSource [%s]",
+ lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.size(),
+ lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
+ );
+ }
+
+ @Override
+ public void stop()
+ {
+ allocationExec.shutdownNow();
+ lagComputationExec.shutdownNow();
+ }
+
+ @Override
+ public void reset()
+ {
+ // clear queue for kafka lags
+ if (lagMetricsQueue != null) {
+ try {
+ LOCK.lock();
+ lagMetricsQueue.clear();
+ }
+ catch (Exception e) {
+ log.warn(e, "Error,when clear queue in rest action");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * This method computes current consumer lag. Gets the total lag of all
partitions and fill in the lagMetricsQueue
+ *
+ * @return a Runnbale object to compute and collect lag.
+ */
+ private Runnable computeAndCollectLag()
+ {
+ return () -> {
+ LOCK.lock();
+ try {
+ if (!spec.isSuspended()) {
+ LagStats lagStats = supervisor.computeLagStats();
+ if (lagStats == null) {
+ lagMetricsQueue.offer(0L);
+ } else {
+ long totalLags = lagStats.getTotalLag();
+ lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
+ }
+ log.debug("Current lags [%s] for dataSource [%s].", new
ArrayList<>(lagMetricsQueue), dataSource);
+ } else {
+ log.warn("[%s] supervisor is suspended, skipping lag collection",
dataSource);
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Error while collecting lags");
+ }
+ finally {
+ LOCK.unlock();
+ }
+ };
+ }
+
+ /**
+ * This method determines whether to do scale actions based on collected lag
points.
+ * Current algorithm of scale is simple:
+ * First of all, compute the proportion of lag points higher/lower than
scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
+ * Secondly, compare scaleOutThreshold/scaleInThreshold with
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency. P.S. Scale
out action has higher priority than scale in action.
+ * Finaly, if scaleOutThreshold/scaleInThreshold is higher than
triggerScaleOutThresholdFrequency/triggerScaleInThresholdFrequency, scale
out/in action would be triggered.
+ *
+ * @param lags the lag metrics of Stream(Kafka/Kinesis)
+ * @return Integer. target number of tasksCount, -1 means skip scale action.
+ */
+ private Integer computeDesiredTaskCount(List<Long> lags)
+ {
+ // if supervisor is not suspended, ensure required tasks are running
+ // if suspended, ensure tasks have been requested to gracefully stop
+ log.debug("Computing desired task count for [%s], based on following lags
: [%s]", dataSource, lags);
+ int beyond = 0;
+ int within = 0;
+ int metricsCount = lags.size();
+ for (Long lag : lags) {
+ if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
+ beyond++;
+ }
+ if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
+ within++;
+ }
+ }
+ double beyondProportion = beyond * 1.0 / metricsCount;
+ double withinProportion = within * 1.0 / metricsCount;
+
+ log.debug("Calculated beyondProportion is [%s] and withinProportion is
[%s] for dataSource [%s].", beyondProportion,
+ withinProportion, dataSource
+ );
+
+ int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
+ if (currentActiveTaskCount < 0) {
Review comment:
is it legitimate for `supervisor.getActiveTaskGroupsCount()` to return a
negative value? if not, then `supervisor.getActiveTaskGroupsCount()` should
always return a value >= 0 and this check shouldn't be needed.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or
ignored here will disable `autoScaler` even though `autoScalerConfig` is not
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points
collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default ==
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
+| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
+| `triggerScaleInThresholdFrequency` | If `triggerScaleInThresholdFrequency`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
Review comment:
```suggestion
| `triggerScaleInFractionThreshold` | If `triggerScaleInThresholdFrequency`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
```
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -146,6 +146,26 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
+|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the
number of Kafka ingest tasks based on Lag metrics. ONLY supported for Kafka
indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler
Properties) for details.|no (default == null)|
+
+### Task Autoscaler Properties
+| Property | Description | Required |
+| ------------- | ------------- | ------------- |
+| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or
ignored here will disable `autoScaler` even though `autoScalerConfig` is not
null| no (default == false) |
+| `lagCollectionIntervalMillis` | Define the frequency of lag points
collection. | no (default == 30000) |
+| `lagCollectionRangeMillis` | The total time window of lag collection, Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
+| `scaleOutThreshold` | The Threshold of scale out action | no (default ==
6000000) |
+| `triggerScaleOutThresholdFrequency` | If `triggerScaleOutThresholdFrequency`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
Review comment:
not sure if it is a "frequency". maybe `triggerScaleOutFractionThreshold`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]