loquisgon commented on a change in pull request #11912:
URL: https://github.com/apache/druid/pull/11912#discussion_r747871743
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -119,403 +127,142 @@ Where the file `supervisor-spec.json` contains a Kafka
supervisor spec:
}
```
-## Supervisor Configuration
-
-|Field|Description|Required|
-|--------|-----------|---------|
-|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
-|`spec`| Container object for the supervisor configuration. | yes |
-|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
-|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection
and I/O-related settings for the supervisor and indexing task. See
[KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
-|`tuningConfig`|A KafkaSupervisorTuningConfig object to define
performance-related settings for the supervisor and indexing tasks. See
[KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
-
-### KafkaSupervisorIOConfig
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic
patterns are not supported.|yes|
-|`inputFormat`|Object|`inputFormat` to define input data parsing. See
[Specifying data format](#specifying-data-format) for details about specifying
the input format.|yes|
-|`consumerProperties`|Map<String, Object>|A map of properties to pass to the
Kafka consumer. See [More on consumer
properties](#more-on-consumerproperties).|yes|
-|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll
records, in milliseconds|no (default == 100)|
-|`replicas`|Integer|The number of replica sets. "1" means a single set of
tasks without replication. Druid always assigns replica tasks to different
workers to provide resiliency against worker failure.|no (default == 1)|
-|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
The maximum number of reading tasks equals `taskCount * replicas`. Therefore,
the total number of tasks, *reading* + *publishing*, is greater than this
count. See [Capacity Planning](#capacity-planning) for more details. When
`taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less
than the `taskCount` value.|no (default == 1)|
-|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing segments.|no (default == PT1H)|
-|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts
managing tasks.|no (default == PT5S)|
-|`period`|ISO8601 Period|Frequency at which the supervisor executes its
management logic. The supervisor also runs in response to certain events. For
example task success, task failure, and tasks reaching their `taskDuration`.
The `period` value specifies the maximum time between iterations.|no (default
== PT30S)|
-|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the
first time, it obtains a set of starting offsets from Kafka. This flag
determines whether it retrieves the earliest or latest offsets in Kafka. Under
normal circumstances, subsequent tasks will start from where the previous
segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no
(default == false)|
-|`completionTimeout`|ISO8601 Period|The length of time to wait before
declaring a publishing task as failed and terminating it. If the value is too
low, your tasks may never publish. The publishing clock for a task begins
roughly after `taskDuration` elapses.|no (default == PT30M)|
-|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
-|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
-|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
-|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
-
-#### Task Autoscaler Properties
-
-> Note that Task AutoScaler is currently designated as experimental.
-
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank
disables the `autoScaler` even when `autoScalerConfig` is not null| no (default
== false) |
-| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >=
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales
reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is
ignored. | yes |
-| `taskCountMin` | Minimum number of ingestion tasks. When you enable
autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with
the `taskCountMin` number of tasks.| yes |
-| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions. | no (default == 600000) |
-| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports
`lagBased`. See [Lag Based AutoScaler Strategy Related
Properties](#lag-based-autoscaler-strategy-related-properties) for details.| no
(default == `lagBased`) |
-
-##### Lag Based AutoScaler Strategy Related Properties
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `lagCollectionIntervalMillis` | Period of lag points collection. | no
(default == 30000) |
-| `lagCollectionRangeMillis` | The total time window of lag collection. Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
-| `scaleOutThreshold` | The threshold of scale out action | no (default ==
6000000) |
-| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
-| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
-| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
-| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
-| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
-| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
-| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-
-The following example demonstrates supervisor spec with `lagBased` autoScaler
enabled:
-```json
+### Kafka input format supervisor spec example
+If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
+- the message key field.
+- message headers.
+- the Kafka message timestamp.
+
Review comment:
It also lets you ingest the kafka "value" (i.e. payload) of the kafka
event. In Kafka it is more commonly to call each entry in the topic as "event"
rather than "message". (https://kafka.apache.org/intro). In general a Kafka
event can have: a key, a value, headers, and a timestamp. The Kafka Input
Format of the supervisor spec lets you ingest any of one those. Including the
event's value which is missing from the list you provide above.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -119,403 +127,142 @@ Where the file `supervisor-spec.json` contains a Kafka
supervisor spec:
}
```
-## Supervisor Configuration
-
-|Field|Description|Required|
-|--------|-----------|---------|
-|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
-|`spec`| Container object for the supervisor configuration. | yes |
-|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
-|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection
and I/O-related settings for the supervisor and indexing task. See
[KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
-|`tuningConfig`|A KafkaSupervisorTuningConfig object to define
performance-related settings for the supervisor and indexing tasks. See
[KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
-
-### KafkaSupervisorIOConfig
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic
patterns are not supported.|yes|
-|`inputFormat`|Object|`inputFormat` to define input data parsing. See
[Specifying data format](#specifying-data-format) for details about specifying
the input format.|yes|
-|`consumerProperties`|Map<String, Object>|A map of properties to pass to the
Kafka consumer. See [More on consumer
properties](#more-on-consumerproperties).|yes|
-|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll
records, in milliseconds|no (default == 100)|
-|`replicas`|Integer|The number of replica sets. "1" means a single set of
tasks without replication. Druid always assigns replica tasks to different
workers to provide resiliency against worker failure.|no (default == 1)|
-|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
The maximum number of reading tasks equals `taskCount * replicas`. Therefore,
the total number of tasks, *reading* + *publishing*, is greater than this
count. See [Capacity Planning](#capacity-planning) for more details. When
`taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less
than the `taskCount` value.|no (default == 1)|
-|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing segments.|no (default == PT1H)|
-|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts
managing tasks.|no (default == PT5S)|
-|`period`|ISO8601 Period|Frequency at which the supervisor executes its
management logic. The supervisor also runs in response to certain events. For
example task success, task failure, and tasks reaching their `taskDuration`.
The `period` value specifies the maximum time between iterations.|no (default
== PT30S)|
-|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the
first time, it obtains a set of starting offsets from Kafka. This flag
determines whether it retrieves the earliest or latest offsets in Kafka. Under
normal circumstances, subsequent tasks will start from where the previous
segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no
(default == false)|
-|`completionTimeout`|ISO8601 Period|The length of time to wait before
declaring a publishing task as failed and terminating it. If the value is too
low, your tasks may never publish. The publishing clock for a task begins
roughly after `taskDuration` elapses.|no (default == PT30M)|
-|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
-|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
-|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
-|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
-
-#### Task Autoscaler Properties
-
-> Note that Task AutoScaler is currently designated as experimental.
-
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank
disables the `autoScaler` even when `autoScalerConfig` is not null| no (default
== false) |
-| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >=
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales
reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is
ignored. | yes |
-| `taskCountMin` | Minimum number of ingestion tasks. When you enable
autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with
the `taskCountMin` number of tasks.| yes |
-| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions. | no (default == 600000) |
-| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports
`lagBased`. See [Lag Based AutoScaler Strategy Related
Properties](#lag-based-autoscaler-strategy-related-properties) for details.| no
(default == `lagBased`) |
-
-##### Lag Based AutoScaler Strategy Related Properties
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `lagCollectionIntervalMillis` | Period of lag points collection. | no
(default == 30000) |
-| `lagCollectionRangeMillis` | The total time window of lag collection. Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
-| `scaleOutThreshold` | The threshold of scale out action | no (default ==
6000000) |
-| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
-| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
-| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
-| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
-| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
-| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
-| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-
-The following example demonstrates supervisor spec with `lagBased` autoScaler
enabled:
-```json
+### Kafka input format supervisor spec example
+If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
+- the message key field.
+- message headers.
+- the Kafka message timestamp.
+
Review comment:
Unfortunately I checked the previous terminology for "events" in
previous Druid's Kafka documentation and Druid calls Kafka events "messages",
so calling them "messages" here is ok for consistency with previous
documentation.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -41,19 +46,22 @@ Additionally, you can set `isolation.level` to
`read_uncommitted` in `consumerPr
If your Kafka cluster enables consumer-group based ACLs, you can set
`group.id` in `consumerProperties` to override the default auto generated group
id.
-## Submitting a Supervisor Spec
-
-To use the Kafka indexing service, load the `druid-kafka-indexing-service`
extension on both the Overlord and the MiddleManagers. Druid starts a
supervisor for a dataSource when you submit a supervisor spec. You can use the
following endpoint:
+## Load the Kafka indexing service
-`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor`
+To use the Kafka indexing service, load the `druid-kafka-indexing-service`
extension on both the Overlord and the MiddleManagers. See [Loading
extensions](../extensions.md#loading-extensions) for instructions on how to
configure extensions.
-For example:
+## Define a supervisor spec
+Similar to the ingestion spec for batch ingestion, the supervisor spec
configures the data ingestion for Kafka streaming ingestion. A supervisor spec
has the following sections:
+- `dataSchema` to specify the Druid datasource name, primary timestamp,
dimensions, metrics, transforms, and any necessary filters.
+- `ioConfig` to configure Druid to connect to Kafka how to parse the data.
Kafka-specific connection details go in the `consumerProperties`. For more
information, see the [Kafka supervisor
reference](./kafka-supervisor-reference.md).
+- `tuningConfig` to control various tuning parameters specific to each
ingestion method.
+For a full description of all the fields and parameters in a Kafka supervisor
spec, see the [Kafka supervisor reference](./kafka-supervisor-reference.md).
+For information on how to configure the input format, see [Data
formats](../../ingestion/data-formats.md).
-```
-curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json
http://localhost:8090/druid/indexer/v1/supervisor
-```
+The following sections contain examples to help you get started with
supervisor specs.
-Where the file `supervisor-spec.json` contains a Kafka supervisor spec:
+### JSON input format supervisor spec example
Review comment:
I find this example confusing since it uses a field called `value` and
we are also dealing with kafka topics which are made up of `key`/value` pairs.
This is a pre-existing example so it's not this commit issue but I don't feel
the added text (the following example...) adds any value, it seems it makes it
more confusing.
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -119,403 +127,142 @@ Where the file `supervisor-spec.json` contains a Kafka
supervisor spec:
}
```
-## Supervisor Configuration
-
-|Field|Description|Required|
-|--------|-----------|---------|
-|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
-|`spec`| Container object for the supervisor configuration. | yes |
-|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
-|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection
and I/O-related settings for the supervisor and indexing task. See
[KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
-|`tuningConfig`|A KafkaSupervisorTuningConfig object to define
performance-related settings for the supervisor and indexing tasks. See
[KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
-
-### KafkaSupervisorIOConfig
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic
patterns are not supported.|yes|
-|`inputFormat`|Object|`inputFormat` to define input data parsing. See
[Specifying data format](#specifying-data-format) for details about specifying
the input format.|yes|
-|`consumerProperties`|Map<String, Object>|A map of properties to pass to the
Kafka consumer. See [More on consumer
properties](#more-on-consumerproperties).|yes|
-|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll
records, in milliseconds|no (default == 100)|
-|`replicas`|Integer|The number of replica sets. "1" means a single set of
tasks without replication. Druid always assigns replica tasks to different
workers to provide resiliency against worker failure.|no (default == 1)|
-|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
The maximum number of reading tasks equals `taskCount * replicas`. Therefore,
the total number of tasks, *reading* + *publishing*, is greater than this
count. See [Capacity Planning](#capacity-planning) for more details. When
`taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less
than the `taskCount` value.|no (default == 1)|
-|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing segments.|no (default == PT1H)|
-|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts
managing tasks.|no (default == PT5S)|
-|`period`|ISO8601 Period|Frequency at which the supervisor executes its
management logic. The supervisor also runs in response to certain events. For
example task success, task failure, and tasks reaching their `taskDuration`.
The `period` value specifies the maximum time between iterations.|no (default
== PT30S)|
-|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the
first time, it obtains a set of starting offsets from Kafka. This flag
determines whether it retrieves the earliest or latest offsets in Kafka. Under
normal circumstances, subsequent tasks will start from where the previous
segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no
(default == false)|
-|`completionTimeout`|ISO8601 Period|The length of time to wait before
declaring a publishing task as failed and terminating it. If the value is too
low, your tasks may never publish. The publishing clock for a task begins
roughly after `taskDuration` elapses.|no (default == PT30M)|
-|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
-|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
-|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
-|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
-
-#### Task Autoscaler Properties
-
-> Note that Task AutoScaler is currently designated as experimental.
-
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank
disables the `autoScaler` even when `autoScalerConfig` is not null| no (default
== false) |
-| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >=
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales
reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is
ignored. | yes |
-| `taskCountMin` | Minimum number of ingestion tasks. When you enable
autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with
the `taskCountMin` number of tasks.| yes |
-| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions. | no (default == 600000) |
-| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports
`lagBased`. See [Lag Based AutoScaler Strategy Related
Properties](#lag-based-autoscaler-strategy-related-properties) for details.| no
(default == `lagBased`) |
-
-##### Lag Based AutoScaler Strategy Related Properties
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `lagCollectionIntervalMillis` | Period of lag points collection. | no
(default == 30000) |
-| `lagCollectionRangeMillis` | The total time window of lag collection. Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
-| `scaleOutThreshold` | The threshold of scale out action | no (default ==
6000000) |
-| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
-| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
-| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
-| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
-| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
-| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
-| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-
-The following example demonstrates supervisor spec with `lagBased` autoScaler
enabled:
-```json
+### Kafka input format supervisor spec example
+If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
+- the message key field.
+- message headers.
+- the Kafka message timestamp.
+
+> The Kafka inputFormat is currently designated as experimental.
+
+For example, consider the following structure for a message that represents an
fictitious wiki edit in a development environment:
+- **Event headers**: {"environment": "development"}
+- **Event key**: {"key: "wiki-edit"}
+- **Event value**: \<JSON object with message contents of the change details\>
+- **Event timestamp**: "Nov. 10, 2021 at 14:06"
+
+When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
+- `headerLabelPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header.`. Considering
the header from the example, Druid maps the header to the following column:
`kafka.header.enviornment`.
Review comment:
Spelling: `kafka.header.environment`
##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -119,403 +127,142 @@ Where the file `supervisor-spec.json` contains a Kafka
supervisor spec:
}
```
-## Supervisor Configuration
-
-|Field|Description|Required|
-|--------|-----------|---------|
-|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
-|`spec`| Container object for the supervisor configuration. | yes |
-|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
-|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection
and I/O-related settings for the supervisor and indexing task. See
[KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
-|`tuningConfig`|A KafkaSupervisorTuningConfig object to define
performance-related settings for the supervisor and indexing tasks. See
[KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
-
-### KafkaSupervisorIOConfig
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic
patterns are not supported.|yes|
-|`inputFormat`|Object|`inputFormat` to define input data parsing. See
[Specifying data format](#specifying-data-format) for details about specifying
the input format.|yes|
-|`consumerProperties`|Map<String, Object>|A map of properties to pass to the
Kafka consumer. See [More on consumer
properties](#more-on-consumerproperties).|yes|
-|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll
records, in milliseconds|no (default == 100)|
-|`replicas`|Integer|The number of replica sets. "1" means a single set of
tasks without replication. Druid always assigns replica tasks to different
workers to provide resiliency against worker failure.|no (default == 1)|
-|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*.
The maximum number of reading tasks equals `taskCount * replicas`. Therefore,
the total number of tasks, *reading* + *publishing*, is greater than this
count. See [Capacity Planning](#capacity-planning) for more details. When
`taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less
than the `taskCount` value.|no (default == 1)|
-|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading
and begin publishing segments.|no (default == PT1H)|
-|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts
managing tasks.|no (default == PT5S)|
-|`period`|ISO8601 Period|Frequency at which the supervisor executes its
management logic. The supervisor also runs in response to certain events. For
example task success, task failure, and tasks reaching their `taskDuration`.
The `period` value specifies the maximum time between iterations.|no (default
== PT30S)|
-|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the
first time, it obtains a set of starting offsets from Kafka. This flag
determines whether it retrieves the earliest or latest offsets in Kafka. Under
normal circumstances, subsequent tasks will start from where the previous
segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no
(default == false)|
-|`completionTimeout`|ISO8601 Period|The length of time to wait before
declaring a publishing task as failed and terminating it. If the value is too
low, your tasks may never publish. The publishing clock for a task begins
roughly after `taskDuration` elapses.|no (default == PT30M)|
-|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to
reject messages with timestamps earlier than this date time; for example if
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no
(default == none)|
-|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps earlier than this period before the task was created;
for example if this is set to `PT1H` and the supervisor creates a task at
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z*
will be dropped. This may help prevent concurrency issues if your data stream
has late messages and you have multiple pipelines that need to operate on the
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please
note that only one of `lateMessageRejectionPeriod` or
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
-|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject
messages with timestamps later than this period after the task reached its
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks
sometimes run past their task duration, for example, in cases of supervisor
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be
dropped unexpectedly whenever a task runs past its originally configured task
duration.|no (default == none)|
-|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no
(default == null)|
-
-#### Task Autoscaler Properties
-
-> Note that Task AutoScaler is currently designated as experimental.
-
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank
disables the `autoScaler` even when `autoScalerConfig` is not null| no (default
== false) |
-| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >=
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales
reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is
ignored. | yes |
-| `taskCountMin` | Minimum number of ingestion tasks. When you enable
autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with
the `taskCountMin` number of tasks.| yes |
-| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two
scale actions. | no (default == 600000) |
-| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports
`lagBased`. See [Lag Based AutoScaler Strategy Related
Properties](#lag-based-autoscaler-strategy-related-properties) for details.| no
(default == `lagBased`) |
-
-##### Lag Based AutoScaler Strategy Related Properties
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `lagCollectionIntervalMillis` | Period of lag points collection. | no
(default == 30000) |
-| `lagCollectionRangeMillis` | The total time window of lag collection. Use
with `lagCollectionIntervalMillis`,it means that in the recent
`lagCollectionRangeMillis`, collect lag metric points every
`lagCollectionIntervalMillis`. | no (default == 600000) |
-| `scaleOutThreshold` | The threshold of scale out action | no (default ==
6000000) |
-| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold`
percent of lag points are higher than `scaleOutThreshold`, then do scale out
action. | no (default == 0.3) |
-| `scaleInThreshold` | The Threshold of scale in action | no (default ==
1000000) |
-| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold`
percent of lag points are lower than `scaleOutThreshold`, then do scale in
action. | no (default == 0.9) |
-| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor
starts when first check scale logic. | no (default == 300000) |
-| `scaleActionPeriodMillis` | The frequency of checking whether to do scale
action in millis | no (default == 60000) |
-| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
-| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-
-The following example demonstrates supervisor spec with `lagBased` autoScaler
enabled:
-```json
+### Kafka input format supervisor spec example
+If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
+- the message key field.
+- message headers.
+- the Kafka message timestamp.
+
+> The Kafka inputFormat is currently designated as experimental.
+
+For example, consider the following structure for a message that represents an
fictitious wiki edit in a development environment:
+- **Event headers**: {"environment": "development"}
+- **Event key**: {"key: "wiki-edit"}
+- **Event value**: \<JSON object with message contents of the change details\>
+- **Event timestamp**: "Nov. 10, 2021 at 14:06"
+
+When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
+- `headerLabelPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header.`. Considering
the header from the example, Druid maps the header to the following column:
`kafka.header.enviornment`.
+- `timestampColumnName`:Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
+- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
+Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+- `headerFormat`: The default "string" decodes UTF8-encoded strings from the
Kafka header. If you need another format, you can implement your own parser.
+- `keyFormat`: Takes a Druid `inputFormat` and uses the value for first key it
finds. According to the example the value is "wiki-edit". It discards the key
name in this case. If you store the key as a string, use the `CSV` input
format. For example if you have simple string for the the key `wiki-edit`, you
can use the following to parse the key:
+ ```
+ "keyFormat": {
+ "type": "csv",
+ "hasHeaderRow": false,
+ "findColumnsFromHeader": false,
+ "columns": ["key"]
+ }
+ ```
+- `valueFormat`: Define how to parse the message contents. You can use any of
the Druid input formats that work for Kafka.
+
+For more information on data formats, see [Data
formats](../../ingestion/data-formats.md).
+
+Finally, add the Kafka message columns to the `dimensionsSpec`. For the key
and timestamp, you can use the dimension names you defined for `keyColumnName`
and `timestampColumnName`. For header dimensions, supply the append the header
key to the `headerLabelPrefix`. For example `kafka.header.environment`.
Review comment:
Reword this: "supply the append the header key"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]