techdocsmith commented on code in PR #15591: URL: https://github.com/apache/druid/pull/15591#discussion_r1446526062
########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. Review Comment: "the same datasource" reads ambiguously here. Consider: ```suggestion Creates a new supervisor spec or updates an existing one with new configuration and schema information. When updating a supervisor spec, the datasource must remain the same as the previous supervisor. ``` or something similar ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. Review Comment: Do we need to mention about the supervisor persisting in the metadata database? How does this affect my behavior? It has some potential for confusion because there is a record in the metadata store, but there is also a process(task?) running on the file system to periodically launch ingestion tasks. ########## docs/api-reference/supervisor-api.md: ########## @@ -3063,10 +3245,16 @@ Host: http://ROUTER_IP:ROUTER_PORT ### Reset a supervisor -Resets the specified supervisor. This endpoint clears _all_ stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). It kills and recreates active tasks to read from valid positions. +The supervisor must be running for this endpoint to be available. + +Resets the specified supervisor. This endpoint clears all stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). +After clearing all stored offsets in Kafka or sequence numbers in Kinesis, the supervisor kills and recreates active tasks, +so that tasks begin reading from valid positions. Use this endpoint to recover from a stopped state due to missing offsets in Kafka or sequence numbers in Kinesis. Use this endpoint with caution as it may result in skipped messages and lead to data loss or duplicate data. +The indexing service keeps track of the latest persisted offsets in Kafka or sequence numbers in Kinesis to provide exactly-once ingestion guarantees across tasks. Subsequent tasks must start reading from where the previous task completed for the generated segments to be accepted. If the messages at the expected starting offsets in Kafka or sequence numbers in Kinesis are no longer available (typically because the message retention period has elapsed or the topic was removed and re-created) the supervisor will refuse to start and in flight tasks will fail. This endpoint enables you to recover from this condition. Review Comment: ```suggestion The indexing service keeps track of the latest persisted offsets in Kafka or sequence numbers in Kinesis to provide exactly-once ingestion guarantees across tasks. Subsequent tasks must start reading from where the previous task completed for Druid to accept the generated segments. If the messages at the expected starting offsets in Kafka or sequence numbers in Kinesis are no longer available, the supervisor refuses to start and in-flight tasks will fail. Possible causes for missing messages include the message retention period elapsing or the topic being removed and re-created. Use the `reset` endpoint to recover from this condition. ``` ########## docs/api-reference/supervisor-api.md: ########## @@ -3180,8 +3374,8 @@ The following table defines the fields within the `partitions` object in the res #### Sample request -The following example shows how to reset offsets for a kafka supervisor with the name `social_media`. Let's say the supervisor is reading -from a kafka topic `ads_media_stream` and has the stored offsets: `{"0": 0, "1": 10, "2": 20, "3": 40}`. +The following example shows how to reset offsets for a Kafka supervisor with the name `social_media`. Let's say the supervisor is reading Review Comment: ```suggestion The following example shows how to reset offsets for a Kafka supervisor with the name `social_media`. For example, the supervisor is reading ``` ########## docs/development/extensions-core/kafka-ingestion.md: ########## @@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS } ``` -### Kafka input format supervisor spec example - -If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format. - -The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, -the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. - -For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment: - -- **Kafka timestamp**: `1680795276351` -- **Kafka topic**: `wiki-edits` -- **Kafka headers**: - - `env=development` - - `zone=z1` -- **Kafka key**: `wiki-edit` -- **Kafka payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` - -Using `{ "type": "json" }` as the input format would only parse the payload value. -To parse the Kafka metadata in addition to the payload, use the `kafka` input format. - -You would configure it as follows: - -- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`). -- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`. -- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource. -- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header. - Other supported encoding formats include the following: - - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1. - - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set. - - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark. - - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order. - - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte order. -- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is `kafka.header.`. - Considering the header from the example, Druid maps the headers to the following columns: `kafka.header.env`, `kafka.header.zone`. -- `keyFormat`: Supply an input format to parse the key. Only the first value will be used. - If, as in the example, your key values are simple strings, then you can use the `tsv` format to parse them. - ``` - { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - ``` - Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`. -- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`. +</details> -Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`) +### I/O configuration -```json -{ - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } -} -``` +The following table outlines the configuration options for `ioConfig`: -would parse the example message as follows: +|Property|Type|Description|Required|Default| +|--------|----|-----------|--------|-------| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid does not support topic patterns. To ingest data from multiple topic, see [Ingest from multiple topics](#ingest-from-multiple-topics). |Yes|| +|`inputFormat`|Object|The [input format](../../ingestion/data-formats.md#input-format) to define input data parsing.|Yes|| +|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes|| +|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| +|`replicas`|Integer|The number of replica sets, where 1 is a single set of tasks (no replication). Druid always assigns replicate tasks to different workers to provide resiliency against process failure.|No|1| +|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The maximum number of reading tasks equals `taskCount * replicas`. The total number of tasks, reading and publishing, is greater than this count. See [Capacity planning](../../ingestion/supervisor.md#capacity-planning) for more details. When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less than the `taskCount` value.|No|1| +|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading and begin publishing segments.|No|PT1H| +|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts managing tasks.|No|PT5S| +|`period`|ISO 8601 period|Determines how often the supervisor executes its management logic. Note that the supervisor also runs in response to certain events, such as tasks succeeding, failing, and reaching their task duration. The `period` value specifies the maximum time between iterations.|No|PT30S| +|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`| +|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT30M| +|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| +|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline. Note that you can specify only one of the late message rejection properties.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more information.|No|null| +|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for more details.|No|null| -```json -{ - "channel": "#sv.wikipedia", - "timestamp": "2016-06-27T00:00:11.080Z", - "page": "Salo Toraut", - "delta": 31, - "namespace": "Main", - "kafka.timestamp": 1680795276351, - "kafka.topic": "wiki-edits", - "kafka.header.env": "development", - "kafka.header.zone": "z1", - "kafka.key": "wiki-edit" -} -``` +#### Consumer properties -For more information on data formats, see [Data formats](../../ingestion/data-formats.md). +Consumer properties must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. +By default, `isolation.level` is set to `read_committed`. If you use older versions of Kafka servers without transactions support or don't want Druid to consume only committed transactions, set `isolation.level` to `read_uncommitted`. -Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. - -The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions: +In some cases, you may need to fetch consumer properties at runtime. For example, when `bootstrap.servers` is not known upfront, or is not static. To enable SSL connections, you must provide passwords for `keystore`, `truststore`, and `key` secretly. You can provide configurations at runtime with a dynamic config provider implementation like the environment variable config provider that comes with Druid. For more information, see [Dynamic config provider](../../operations/dynamic-config-provider.md). + +For example, if you are using SASL and SSL with Kafka, set the following environment variables for the Druid user on the machines running the Overlord and the Peon services: ``` -{ - "type": "kafka", - "spec": { - "ioConfig": { - "type": "kafka", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" - }, - "topic": "wiki-edits", - "inputFormat": { - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - }, - "useEarliestOffset": true - }, - "dataSchema": { - "dataSource": "wikiticker", - "timestampSpec": { - "column": "timestamp", - "format": "posix" - }, - "dimensionsSpec": "dimensionsSpec": { - "useSchemaDiscovery": true, - "includeAllDimensions": true - }, - "granularitySpec": { - "queryGranularity": "none", - "rollup": false, - "segmentGranularity": "day" +export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';" +export SSL_KEY_PASSWORD=mysecretkeypassword +export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword +export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword +``` + +```json + "druid.dynamic.config.provider": { + "type": "environment", + "variables": { + "sasl.jaas.config": "KAFKA_JAAS_CONFIG", + "ssl.key.password": "SSL_KEY_PASSWORD", + "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD" } - }, - "tuningConfig": { - "type": "kafka" - } } -} ``` -After Druid ingests the data, you can query the Kafka metadata columns as follows: +Verify that you've changed the values for all configurations to match your own environment. In the Druid data loader interface, you can use the environment variable config provider syntax in the **Consumer properties** field on the **Connect tab**. When connecting to Kafka, Druid replaces the environment variables with their corresponding values. -```sql -SELECT - "kafka.header.env", - "kafka.key", - "kafka.timestamp", - "kafka.topic" -FROM "wikiticker" -``` +#### Task autoscaler -This query returns: +You can optionally configure autoscaling behavior for ingestion tasks using the `autoScalerConfig` property of the `ioConfig` object. -| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` | -|--------------------|-----------|---------------|---------------| -| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +The following table outlines the configuration options for `autoScalerConfig`: -For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka). +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`| +|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is ignored.|Yes|| +|`taskCountMin`|Minimum number of ingestion tasks. When you enable the autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts with the `taskCountMin` number of tasks to launch.|Yes|| +|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two scale actions.| No|600000| +|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`| -## Submit a supervisor spec +##### Autoscaler strategy -Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the data loader in the web console or you can submit a supervisor spec to the following endpoint: +The following table outlines the configuration options for `autoScalerStrategy`: -`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor` +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`lagCollectionIntervalMillis`|The time period during which Druid collects lag metric points.|No|30000| +|`lagCollectionRangeMillis`|The total time window of lag collection. Use with `lagCollectionIntervalMillis` to specify the intervals at which to collect lag metric points.|No|600000| +|`scaleOutThreshold`|The threshold of scale out action. |No|6000000| +|`triggerScaleOutFractionThreshold`|Enables scale out action if `triggerScaleOutFractionThreshold` percent of lag points is higher than `scaleOutThreshold`.|No|0.3| +|`scaleInThreshold`|The threshold of scale in action.|No|1000000| +|`triggerScaleInFractionThreshold`|Enables scale in action if `triggerScaleInFractionThreshold` percent of lag points is lower than `scaleOutThreshold`.|No|0.9| +|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the supervisor starts before the first scale logic check.|No|300000| +|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000| +|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1| +|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2| -For example: +#### Idle supervisor configuration -``` -curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor -``` +:::info +Idle state transitioning is currently designated as experimental. +::: + +When the supervisor enters the idle state, no new tasks are launched subsequent to the completion of the currently executing tasks. This strategy may lead to reduced costs for cluster operators while using topics that get sporadic data. + +The following table outlines the configuration options for `idleConfig`: + +|Property|Description|Required| +|--------|-----------|--------| +|`enabled`|If `true`, the supervisor becomes idle if there is no data on input stream or topic for some time.|No|`false`| +|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|No|`600_000`| + +The following example shows a supervisor spec with `lagBased` autoscaler and idle configuration enabled: + +<details> + <summary>Click to view the example</summary> -Where the file `supervisor-spec.json` contains your Kafka supervisor spec file. +```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + ... + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 6000000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "taskCount":1, + "replicas":1, + "taskDuration":"PT1H", + "idleConfig": { + "enabled": true, + "inactiveAfterMillis": 600000 + } + }, + "tuningConfig":{ + ... + } + } +} +``` +</details> + +#### Ingest from multiple topics + +:::info +If you enable multi-topic ingestion for a datasource, downgrading to a version older than +28.0.0 will cause the ingestion for that datasource to fail. +::: + +To ingest data from multiple topics, you set `topicPattern` instead of `topic in the supervisor `ioConfig` object`. +You can pass multiple topics as a regex pattern as the value for `topicPattern` in `ioConfig`. For example, to +ingest data from clicks and impressions, set `topicPattern` to `clicks|impressions` in `ioCofig`. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid automatically starts +ingesting from those new topics. A topic name that only matches partially such as `my-metrics-12` will not be Review Comment: ```suggestion ingesting from those new topics. Topic names that match partially, such as `my-metrics-12`, will not be ``` ########## docs/development/extensions-core/kinesis-ingestion.md: ########## @@ -719,9 +406,16 @@ If resharding occurs when the supervisor is suspended and `useEarliestSequence` ## Kinesis known issues -Before you deploy the Kinesis extension to production, consider the following known issues: +Before you deploy the `druid-kinesis-indexing-service` extension to production, consider the following known issues: Review Comment: Might be worth checking with druid devs to see if any of these have been addressed. ########## docs/api-reference/supervisor-api.md: ########## @@ -2218,6 +2399,7 @@ Content-Length: 1359 ### Suspend a running supervisor Suspends a single running supervisor. Returns the updated supervisor spec, where the `suspended` property is set to `true`. The suspended supervisor continues to emit logs and metrics. +Indexing tasks remain suspended until the supervisor is resumed. Review Comment: ```suggestion Indexing tasks remain suspended until the supervisor resumes. ``` Consider active. May need to specify that you have to call the resume endpoint. ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +The following table lists the properties of a supervisor spec: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes| Review Comment: ```suggestion |`type`|String|The supervisor type. One of `kafka` or `kinesis`.|Yes| ``` I think "One of" is the convention for a list? ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +The following table lists the properties of a supervisor spec: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes| +|`spec`|Object|The container object for the supervisor configuration.|Yes| +|`ioConfig`|Object|The I/O configuration object to define the connection and I/O-related settings for the supervisor and indexing task.|Yes| +|`dataSchema`|Object|The schema for the indexing task to use during ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more information.|Yes| +|`tuningConfig`|Object|The tuning configuration object to define performance-related settings for the supervisor and indexing tasks.|No| When you call this endpoint on an existing supervisor for the same datasource, the running supervisor signals its tasks to stop reading and begin publishing, exiting itself. Druid then uses the provided configuration from the request body to create a new supervisor. Druid submits a new schema while retaining existing publishing tasks and starts new tasks at the previous task offsets. Review Comment: See comment line 2216. If you clarify there, I think you can remove "for the same datasource" here, which reads awkwardly. ########## docs/api-reference/supervisor-api.md: ########## @@ -3128,13 +3316,19 @@ Host: http://ROUTER_IP:ROUTER_PORT ``` </details> -### Reset Offsets for a supervisor +### Reset offsets for a supervisor + +The supervisor must be running for this endpoint to be available. + +Resets the specified offsets for partitions without resetting the entire set. + +This endpoint clears only the specified offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume reading data from the specified offsets. +If there are no stored offsets, the specified offsets are set in the metadata store. -Resets the specified offsets for a supervisor. This endpoint clears _only_ the specified offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. -If there are no stored offsets, the specified offsets will be set in the metadata store. The supervisor will start from the reset offsets for the partitions specified and for the other partitions from the stored offset. -It kills and recreates active tasks pertaining to the partitions specified to read from valid offsets. +After resetting stored offsets, the supervisor kills and recreates any active tasks pertaining to the specified partitions, +so that tasks begin reading specified offsets. For partitions that are not specified in this operation, the supervisor resumes from the last stored offset. -Use this endpoint to selectively reset offsets for partitions without resetting the entire set. +Use this endpoint with caution as it may result in skipped messages, leading to data loss or duplicate data. Review Comment: ```suggestion Use this endpoint with caution. It can cause skipped messages, leading to data loss or duplicate data. ``` ########## docs/api-reference/supervisor-api.md: ########## @@ -3063,10 +3245,16 @@ Host: http://ROUTER_IP:ROUTER_PORT ### Reset a supervisor -Resets the specified supervisor. This endpoint clears _all_ stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). It kills and recreates active tasks to read from valid positions. +The supervisor must be running for this endpoint to be available. + +Resets the specified supervisor. This endpoint clears all stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor will start from the earliest or latest available position, depending on the platform (offsets in Kafka or sequence numbers in Kinesis). Review Comment: ```suggestion Resets the specified supervisor. This endpoint clears all stored offsets in Kafka or sequence numbers in Kinesis, prompting the supervisor to resume data reading. The supervisor restarts from the earliest or latest available position, depending on the platform: offsets in Kafka or sequence numbers in Kinesis. ``` avoid future. avoid parens. ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +The following table lists the properties of a supervisor spec: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes| +|`spec`|Object|The container object for the supervisor configuration.|Yes| +|`ioConfig`|Object|The I/O configuration object to define the connection and I/O-related settings for the supervisor and indexing task.|Yes| Review Comment: link to I/O config? ########## docs/development/extensions-core/kafka-ingestion.md: ########## @@ -26,45 +26,42 @@ description: "Overview of the Kafka indexing service for Druid. Includes example When you enable the Kafka indexing service, you can configure supervisors on the Overlord to manage the creation and lifetime of Kafka indexing tasks. -Kafka indexing tasks read events using Kafka's own partition and offset mechanism to guarantee exactly-once ingestion. The supervisor oversees the state of the indexing tasks to: - - coordinate handoffs - - manage failures - - ensure that scalability and replication requirements are maintained. +Kafka indexing tasks read events using Kafka's own partition and offset mechanism to guarantee exactly-once ingestion. The supervisor oversees the state of the indexing tasks to coordinate handoffs, manage failures, and ensure that scalability and replication requirements are maintained. +This topic contains configuration reference information for the Kafka indexing service supervisor for Apache Druid. -This topic covers how to submit a supervisor spec to ingest event data, also known as message data, from Kafka. See the following for more information: -- For a reference of Kafka supervisor spec configuration options, see the [Kafka supervisor reference](./kafka-supervisor-reference.md). -- For operations reference information to help run and maintain Apache Kafka supervisors, see [Kafka supervisor operations](./kafka-supervisor-operations.md). -- For a walk-through, see the [Loading from Apache Kafka](../../tutorials/tutorial-kafka.md) tutorial. +## Setup -## Kafka support +To use the Kafka indexing service, you must first load the `druid-kafka-indexing-service` extension on both the Overlord and the MiddleManager. See [Loading extensions](../../configuration/extensions.md) for more information. + +### Kafka support The Kafka indexing service supports transactional topics introduced in Kafka 0.11.x by default. The consumer for Kafka indexing service is incompatible with older Kafka brokers. If you are using an older version, refer to the [Kafka upgrade guide](https://kafka.apache.org/documentation/#upgrade). Additionally, you can set `isolation.level` to `read_uncommitted` in `consumerProperties` if either: - You don't need Druid to consume transactional topics. -- You need Druid to consume older versions of Kafka. Make sure offsets are sequential, since there is no offset gap check in Druid anymore. - -If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group id. +- You need Druid to consume older versions of Kafka. Make sure offsets are sequential, since there is no offset gap check in Druid. -## Load the Kafka indexing service +If your Kafka cluster enables consumer-group based ACLs, you can set `group.id` in `consumerProperties` to override the default auto generated group ID. -To use the Kafka indexing service, load the `druid-kafka-indexing-service` extension on both the Overlord and the MiddleManagers. See [Loading extensions](../../configuration/extensions.md) for instructions on how to configure extensions. +## Supervisor spec -## Define a supervisor spec +Similar to the ingestion spec for batch ingestion, the [supervisor spec](../../ingestion/supervisor.md#supervisor-spec) configures the data ingestion for Kafka streaming ingestion. -Similar to the ingestion spec for batch ingestion, the supervisor spec configures the data ingestion for Kafka streaming ingestion. A supervisor spec has the following sections: -- `dataSchema` to specify the Druid datasource name, primary timestamp, dimensions, metrics, transforms, and any necessary filters. -- `ioConfig` to configure Kafka connection settings and configure how Druid parses the data. Kafka-specific connection details go in the `consumerProperties`. The `ioConfig` is also where you define the input format (`inputFormat`) of your Kafka data. For supported formats for Kafka and information on how to configure the input format, see [Data formats](../../ingestion/data-formats.md). -- `tuningConfig` to control various tuning parameters specific to each ingestion method. -For a full description of all the fields and parameters in a Kafka supervisor spec, see the [Kafka supervisor reference](./kafka-supervisor-reference.md). +The following table outlines the high-level configuration options for the Kafka supervisor spec: +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type; this should always be `kafka`.|Yes| Review Comment: ```suggestion |`type`|String|The supervisor type; must be `kafka`.|Yes| ``` ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +The following table lists the properties of a supervisor spec: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes| +|`spec`|Object|The container object for the supervisor configuration.|Yes| +|`ioConfig`|Object|The I/O configuration object to define the connection and I/O-related settings for the supervisor and indexing task.|Yes| +|`dataSchema`|Object|The schema for the indexing task to use during ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more information.|Yes| +|`tuningConfig`|Object|The tuning configuration object to define performance-related settings for the supervisor and indexing tasks.|No| When you call this endpoint on an existing supervisor for the same datasource, the running supervisor signals its tasks to stop reading and begin publishing, exiting itself. Druid then uses the provided configuration from the request body to create a new supervisor. Druid submits a new schema while retaining existing publishing tasks and starts new tasks at the previous task offsets. +In this way, configuration changes can be applied without requiring any pause in ingestion. Review Comment: ```suggestion This way, you can apply configuration changes without a pause in ingestion. ``` consider active voice ########## docs/api-reference/supervisor-api.md: ########## @@ -3216,7 +3410,7 @@ Content-Type: application/json } ``` -The above operation will reset offsets only for partitions 0 and 2 to 100 and 650 respectively. After a successful reset, +The above operation will reset offsets only for partitions `0` and `2` to 100 and 650 respectively. After a successful reset, Review Comment: ```suggestion The example operation resets offsets only for partitions `0` and `2` to 100 and 650 respectively. After a successful reset, ``` Avoid future, avoid location above/below indicators ########## docs/configuration/index.md: ########## @@ -1159,7 +1159,7 @@ If autoscaling is enabled, you can set these additional configs: |`druid.supervisor.idleConfig.enabled`|If `true`, supervisor can become idle if there is no data on input stream/topic for some time.|false| |`druid.supervisor.idleConfig.inactiveAfterMillis`|Supervisor is marked as idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|`600_000`| -The `druid.supervisor.idleConfig.*` specified in the Overlord runtime properties defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../development/extensions-core/kafka-supervisor-reference.md#supervisor-io-configuration) to override it for an individual supervisor. +The `druid.supervisor.idleConfig.*` specified in the Overlord runtime properties defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../development/extensions-core/kinesis-ingestion.md#io-configuration) to override it for an individual supervisor. Review Comment: ```suggestion The `druid.supervisor.idleConfig.*` specification in the Overlord runtime properties defines the default behavior for the entire cluster. See [Idle Configuration in Kafka Supervisor IOConfig](../development/extensions-core/kinesis-ingestion.md#io-configuration) to override it for an individual supervisor. ``` ########## docs/api-reference/supervisor-api.md: ########## @@ -2046,9 +2215,22 @@ Host: http://ROUTER_IP:ROUTER_PORT Creates a new supervisor or updates an existing one for the same datasource with a new schema and configuration. -You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kafka-ingestion.md#define-a-supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. +You can define a supervisor spec for [Apache Kafka](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) or [Amazon Kinesis](../development/extensions-core/kinesis-ingestion.md#supervisor-spec) streaming ingestion methods. Once created, the supervisor persists in the metadata database. + +The following table lists the properties of a supervisor spec: + +|Property|Type|Description|Required| +|--------|----|-----------|--------| +|`type`|String|The supervisor type. Choose from `kafka` or `kinesis`.|Yes| +|`spec`|Object|The container object for the supervisor configuration.|Yes| +|`ioConfig`|Object|The I/O configuration object to define the connection and I/O-related settings for the supervisor and indexing task.|Yes| +|`dataSchema`|Object|The schema for the indexing task to use during ingestion. See [`dataSchema`](../ingestion/ingestion-spec.md#dataschema) for more information.|Yes| +|`tuningConfig`|Object|The tuning configuration object to define performance-related settings for the supervisor and indexing tasks.|No| When you call this endpoint on an existing supervisor for the same datasource, the running supervisor signals its tasks to stop reading and begin publishing, exiting itself. Druid then uses the provided configuration from the request body to create a new supervisor. Druid submits a new schema while retaining existing publishing tasks and starts new tasks at the previous task offsets. +In this way, configuration changes can be applied without requiring any pause in ingestion. + +You can achieve seamless schema migrations by submitting the new schema using the `/druid/indexer/v1/supervisor` endpoint. Review Comment: ```suggestion Submit updated schemas for a datasource to the `/druid/indexer/v1/supervisor` endpoint to achieve seamless schema migrations. ``` This seems a little repetitive of line 2231. If you keep it, consider active voice. "seamless" is setting off an amber flag for me, but not opposed to it. ########## docs/development/extensions-core/kafka-ingestion.md: ########## @@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS } ``` -### Kafka input format supervisor spec example - -If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format. - -The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, -the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. - -For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment: - -- **Kafka timestamp**: `1680795276351` -- **Kafka topic**: `wiki-edits` -- **Kafka headers**: - - `env=development` - - `zone=z1` -- **Kafka key**: `wiki-edit` -- **Kafka payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` - -Using `{ "type": "json" }` as the input format would only parse the payload value. -To parse the Kafka metadata in addition to the payload, use the `kafka` input format. - -You would configure it as follows: - -- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`). -- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`. -- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource. -- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header. - Other supported encoding formats include the following: - - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1. - - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set. - - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark. - - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order. - - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte order. -- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is `kafka.header.`. - Considering the header from the example, Druid maps the headers to the following columns: `kafka.header.env`, `kafka.header.zone`. -- `keyFormat`: Supply an input format to parse the key. Only the first value will be used. - If, as in the example, your key values are simple strings, then you can use the `tsv` format to parse them. - ``` - { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - ``` - Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`. -- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`. +</details> -Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`) +### I/O configuration -```json -{ - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } -} -``` +The following table outlines the configuration options for `ioConfig`: -would parse the example message as follows: +|Property|Type|Description|Required|Default| +|--------|----|-----------|--------|-------| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid does not support topic patterns. To ingest data from multiple topic, see [Ingest from multiple topics](#ingest-from-multiple-topics). |Yes|| +|`inputFormat`|Object|The [input format](../../ingestion/data-formats.md#input-format) to define input data parsing.|Yes|| +|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes|| +|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| +|`replicas`|Integer|The number of replica sets, where 1 is a single set of tasks (no replication). Druid always assigns replicate tasks to different workers to provide resiliency against process failure.|No|1| +|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The maximum number of reading tasks equals `taskCount * replicas`. The total number of tasks, reading and publishing, is greater than this count. See [Capacity planning](../../ingestion/supervisor.md#capacity-planning) for more details. When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less than the `taskCount` value.|No|1| +|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading and begin publishing segments.|No|PT1H| +|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts managing tasks.|No|PT5S| +|`period`|ISO 8601 period|Determines how often the supervisor executes its management logic. Note that the supervisor also runs in response to certain events, such as tasks succeeding, failing, and reaching their task duration. The `period` value specifies the maximum time between iterations.|No|PT30S| +|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`| +|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT30M| +|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| +|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline. Note that you can specify only one of the late message rejection properties.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more information.|No|null| +|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for more details.|No|null| -```json -{ - "channel": "#sv.wikipedia", - "timestamp": "2016-06-27T00:00:11.080Z", - "page": "Salo Toraut", - "delta": 31, - "namespace": "Main", - "kafka.timestamp": 1680795276351, - "kafka.topic": "wiki-edits", - "kafka.header.env": "development", - "kafka.header.zone": "z1", - "kafka.key": "wiki-edit" -} -``` +#### Consumer properties -For more information on data formats, see [Data formats](../../ingestion/data-formats.md). +Consumer properties must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. +By default, `isolation.level` is set to `read_committed`. If you use older versions of Kafka servers without transactions support or don't want Druid to consume only committed transactions, set `isolation.level` to `read_uncommitted`. -Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. - -The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions: +In some cases, you may need to fetch consumer properties at runtime. For example, when `bootstrap.servers` is not known upfront, or is not static. To enable SSL connections, you must provide passwords for `keystore`, `truststore`, and `key` secretly. You can provide configurations at runtime with a dynamic config provider implementation like the environment variable config provider that comes with Druid. For more information, see [Dynamic config provider](../../operations/dynamic-config-provider.md). + +For example, if you are using SASL and SSL with Kafka, set the following environment variables for the Druid user on the machines running the Overlord and the Peon services: ``` -{ - "type": "kafka", - "spec": { - "ioConfig": { - "type": "kafka", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" - }, - "topic": "wiki-edits", - "inputFormat": { - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - }, - "useEarliestOffset": true - }, - "dataSchema": { - "dataSource": "wikiticker", - "timestampSpec": { - "column": "timestamp", - "format": "posix" - }, - "dimensionsSpec": "dimensionsSpec": { - "useSchemaDiscovery": true, - "includeAllDimensions": true - }, - "granularitySpec": { - "queryGranularity": "none", - "rollup": false, - "segmentGranularity": "day" +export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';" +export SSL_KEY_PASSWORD=mysecretkeypassword +export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword +export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword +``` + +```json + "druid.dynamic.config.provider": { + "type": "environment", + "variables": { + "sasl.jaas.config": "KAFKA_JAAS_CONFIG", + "ssl.key.password": "SSL_KEY_PASSWORD", + "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD" } - }, - "tuningConfig": { - "type": "kafka" - } } -} ``` -After Druid ingests the data, you can query the Kafka metadata columns as follows: +Verify that you've changed the values for all configurations to match your own environment. In the Druid data loader interface, you can use the environment variable config provider syntax in the **Consumer properties** field on the **Connect tab**. When connecting to Kafka, Druid replaces the environment variables with their corresponding values. -```sql -SELECT - "kafka.header.env", - "kafka.key", - "kafka.timestamp", - "kafka.topic" -FROM "wikiticker" -``` +#### Task autoscaler -This query returns: +You can optionally configure autoscaling behavior for ingestion tasks using the `autoScalerConfig` property of the `ioConfig` object. -| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` | -|--------------------|-----------|---------------|---------------| -| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +The following table outlines the configuration options for `autoScalerConfig`: -For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka). +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`| +|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is ignored.|Yes|| +|`taskCountMin`|Minimum number of ingestion tasks. When you enable the autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts with the `taskCountMin` number of tasks to launch.|Yes|| +|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two scale actions.| No|600000| +|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`| -## Submit a supervisor spec +##### Autoscaler strategy -Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the data loader in the web console or you can submit a supervisor spec to the following endpoint: +The following table outlines the configuration options for `autoScalerStrategy`: -`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor` +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`lagCollectionIntervalMillis`|The time period during which Druid collects lag metric points.|No|30000| +|`lagCollectionRangeMillis`|The total time window of lag collection. Use with `lagCollectionIntervalMillis` to specify the intervals at which to collect lag metric points.|No|600000| +|`scaleOutThreshold`|The threshold of scale out action. |No|6000000| +|`triggerScaleOutFractionThreshold`|Enables scale out action if `triggerScaleOutFractionThreshold` percent of lag points is higher than `scaleOutThreshold`.|No|0.3| +|`scaleInThreshold`|The threshold of scale in action.|No|1000000| +|`triggerScaleInFractionThreshold`|Enables scale in action if `triggerScaleInFractionThreshold` percent of lag points is lower than `scaleOutThreshold`.|No|0.9| +|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the supervisor starts before the first scale logic check.|No|300000| +|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000| +|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1| +|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2| -For example: +#### Idle supervisor configuration -``` -curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor -``` +:::info +Idle state transitioning is currently designated as experimental. +::: + +When the supervisor enters the idle state, no new tasks are launched subsequent to the completion of the currently executing tasks. This strategy may lead to reduced costs for cluster operators while using topics that get sporadic data. + +The following table outlines the configuration options for `idleConfig`: + +|Property|Description|Required| +|--------|-----------|--------| +|`enabled`|If `true`, the supervisor becomes idle if there is no data on input stream or topic for some time.|No|`false`| +|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|No|`600_000`| + +The following example shows a supervisor spec with `lagBased` autoscaler and idle configuration enabled: + +<details> + <summary>Click to view the example</summary> -Where the file `supervisor-spec.json` contains your Kafka supervisor spec file. +```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + ... + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 6000000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "taskCount":1, + "replicas":1, + "taskDuration":"PT1H", + "idleConfig": { + "enabled": true, + "inactiveAfterMillis": 600000 + } + }, + "tuningConfig":{ + ... + } + } +} +``` +</details> + +#### Ingest from multiple topics + +:::info +If you enable multi-topic ingestion for a datasource, downgrading to a version older than +28.0.0 will cause the ingestion for that datasource to fail. +::: + +To ingest data from multiple topics, you set `topicPattern` instead of `topic in the supervisor `ioConfig` object`. +You can pass multiple topics as a regex pattern as the value for `topicPattern` in `ioConfig`. For example, to +ingest data from clicks and impressions, set `topicPattern` to `clicks|impressions` in `ioCofig`. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid automatically starts Review Comment: ```suggestion start with `metrics-`. If you add a new topic that matches the regex to the cluster , Druid automatically starts ``` ########## docs/development/extensions-core/kafka-ingestion.md: ########## @@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS } ``` -### Kafka input format supervisor spec example - -If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format. - -The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, -the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. - -For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment: - -- **Kafka timestamp**: `1680795276351` -- **Kafka topic**: `wiki-edits` -- **Kafka headers**: - - `env=development` - - `zone=z1` -- **Kafka key**: `wiki-edit` -- **Kafka payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` - -Using `{ "type": "json" }` as the input format would only parse the payload value. -To parse the Kafka metadata in addition to the payload, use the `kafka` input format. - -You would configure it as follows: - -- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`). -- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`. -- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource. -- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header. - Other supported encoding formats include the following: - - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1. - - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set. - - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark. - - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order. - - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte order. -- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is `kafka.header.`. - Considering the header from the example, Druid maps the headers to the following columns: `kafka.header.env`, `kafka.header.zone`. -- `keyFormat`: Supply an input format to parse the key. Only the first value will be used. - If, as in the example, your key values are simple strings, then you can use the `tsv` format to parse them. - ``` - { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - ``` - Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`. -- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`. +</details> -Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`) +### I/O configuration -```json -{ - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } -} -``` +The following table outlines the configuration options for `ioConfig`: -would parse the example message as follows: +|Property|Type|Description|Required|Default| +|--------|----|-----------|--------|-------| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid does not support topic patterns. To ingest data from multiple topic, see [Ingest from multiple topics](#ingest-from-multiple-topics). |Yes|| +|`inputFormat`|Object|The [input format](../../ingestion/data-formats.md#input-format) to define input data parsing.|Yes|| +|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes|| +|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| +|`replicas`|Integer|The number of replica sets, where 1 is a single set of tasks (no replication). Druid always assigns replicate tasks to different workers to provide resiliency against process failure.|No|1| +|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The maximum number of reading tasks equals `taskCount * replicas`. The total number of tasks, reading and publishing, is greater than this count. See [Capacity planning](../../ingestion/supervisor.md#capacity-planning) for more details. When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less than the `taskCount` value.|No|1| +|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading and begin publishing segments.|No|PT1H| +|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts managing tasks.|No|PT5S| +|`period`|ISO 8601 period|Determines how often the supervisor executes its management logic. Note that the supervisor also runs in response to certain events, such as tasks succeeding, failing, and reaching their task duration. The `period` value specifies the maximum time between iterations.|No|PT30S| +|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`| +|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT30M| +|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| +|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline. Note that you can specify only one of the late message rejection properties.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more information.|No|null| +|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for more details.|No|null| -```json -{ - "channel": "#sv.wikipedia", - "timestamp": "2016-06-27T00:00:11.080Z", - "page": "Salo Toraut", - "delta": 31, - "namespace": "Main", - "kafka.timestamp": 1680795276351, - "kafka.topic": "wiki-edits", - "kafka.header.env": "development", - "kafka.header.zone": "z1", - "kafka.key": "wiki-edit" -} -``` +#### Consumer properties -For more information on data formats, see [Data formats](../../ingestion/data-formats.md). +Consumer properties must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. +By default, `isolation.level` is set to `read_committed`. If you use older versions of Kafka servers without transactions support or don't want Druid to consume only committed transactions, set `isolation.level` to `read_uncommitted`. -Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. - -The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions: +In some cases, you may need to fetch consumer properties at runtime. For example, when `bootstrap.servers` is not known upfront, or is not static. To enable SSL connections, you must provide passwords for `keystore`, `truststore`, and `key` secretly. You can provide configurations at runtime with a dynamic config provider implementation like the environment variable config provider that comes with Druid. For more information, see [Dynamic config provider](../../operations/dynamic-config-provider.md). + +For example, if you are using SASL and SSL with Kafka, set the following environment variables for the Druid user on the machines running the Overlord and the Peon services: ``` -{ - "type": "kafka", - "spec": { - "ioConfig": { - "type": "kafka", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" - }, - "topic": "wiki-edits", - "inputFormat": { - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - }, - "useEarliestOffset": true - }, - "dataSchema": { - "dataSource": "wikiticker", - "timestampSpec": { - "column": "timestamp", - "format": "posix" - }, - "dimensionsSpec": "dimensionsSpec": { - "useSchemaDiscovery": true, - "includeAllDimensions": true - }, - "granularitySpec": { - "queryGranularity": "none", - "rollup": false, - "segmentGranularity": "day" +export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';" +export SSL_KEY_PASSWORD=mysecretkeypassword +export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword +export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword +``` + +```json + "druid.dynamic.config.provider": { + "type": "environment", + "variables": { + "sasl.jaas.config": "KAFKA_JAAS_CONFIG", + "ssl.key.password": "SSL_KEY_PASSWORD", + "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD" } - }, - "tuningConfig": { - "type": "kafka" - } } -} ``` -After Druid ingests the data, you can query the Kafka metadata columns as follows: +Verify that you've changed the values for all configurations to match your own environment. In the Druid data loader interface, you can use the environment variable config provider syntax in the **Consumer properties** field on the **Connect tab**. When connecting to Kafka, Druid replaces the environment variables with their corresponding values. -```sql -SELECT - "kafka.header.env", - "kafka.key", - "kafka.timestamp", - "kafka.topic" -FROM "wikiticker" -``` +#### Task autoscaler -This query returns: +You can optionally configure autoscaling behavior for ingestion tasks using the `autoScalerConfig` property of the `ioConfig` object. -| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` | -|--------------------|-----------|---------------|---------------| -| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +The following table outlines the configuration options for `autoScalerConfig`: -For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka). +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`| +|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is ignored.|Yes|| +|`taskCountMin`|Minimum number of ingestion tasks. When you enable the autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts with the `taskCountMin` number of tasks to launch.|Yes|| +|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two scale actions.| No|600000| +|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`| -## Submit a supervisor spec +##### Autoscaler strategy -Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the data loader in the web console or you can submit a supervisor spec to the following endpoint: +The following table outlines the configuration options for `autoScalerStrategy`: -`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor` +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`lagCollectionIntervalMillis`|The time period during which Druid collects lag metric points.|No|30000| +|`lagCollectionRangeMillis`|The total time window of lag collection. Use with `lagCollectionIntervalMillis` to specify the intervals at which to collect lag metric points.|No|600000| +|`scaleOutThreshold`|The threshold of scale out action. |No|6000000| +|`triggerScaleOutFractionThreshold`|Enables scale out action if `triggerScaleOutFractionThreshold` percent of lag points is higher than `scaleOutThreshold`.|No|0.3| +|`scaleInThreshold`|The threshold of scale in action.|No|1000000| +|`triggerScaleInFractionThreshold`|Enables scale in action if `triggerScaleInFractionThreshold` percent of lag points is lower than `scaleOutThreshold`.|No|0.9| +|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the supervisor starts before the first scale logic check.|No|300000| +|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000| +|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1| +|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2| -For example: +#### Idle supervisor configuration -``` -curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor -``` +:::info +Idle state transitioning is currently designated as experimental. +::: + +When the supervisor enters the idle state, no new tasks are launched subsequent to the completion of the currently executing tasks. This strategy may lead to reduced costs for cluster operators while using topics that get sporadic data. + +The following table outlines the configuration options for `idleConfig`: + +|Property|Description|Required| +|--------|-----------|--------| +|`enabled`|If `true`, the supervisor becomes idle if there is no data on input stream or topic for some time.|No|`false`| +|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|No|`600_000`| + +The following example shows a supervisor spec with `lagBased` autoscaler and idle configuration enabled: + +<details> + <summary>Click to view the example</summary> -Where the file `supervisor-spec.json` contains your Kafka supervisor spec file. +```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + ... + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 6000000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "taskCount":1, + "replicas":1, + "taskDuration":"PT1H", + "idleConfig": { + "enabled": true, + "inactiveAfterMillis": 600000 + } + }, + "tuningConfig":{ + ... + } + } +} +``` +</details> + +#### Ingest from multiple topics + +:::info +If you enable multi-topic ingestion for a datasource, downgrading to a version older than +28.0.0 will cause the ingestion for that datasource to fail. +::: + +To ingest data from multiple topics, you set `topicPattern` instead of `topic in the supervisor `ioConfig` object`. +You can pass multiple topics as a regex pattern as the value for `topicPattern` in `ioConfig`. For example, to +ingest data from clicks and impressions, set `topicPattern` to `clicks|impressions` in `ioCofig`. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid automatically starts +ingesting from those new topics. A topic name that only matches partially such as `my-metrics-12` will not be +included for ingestion. + +When ingesting data from multiple topics, partitions are assigned based on the hashcode of the topic name and the Review Comment: ```suggestion When ingesting data from multiple topics, Druid assigns partitions based on the hashcode of the topic name and the ``` ########## docs/development/extensions-core/kafka-ingestion.md: ########## @@ -130,172 +127,244 @@ The following example demonstrates a supervisor spec for Kafka that uses the `JS } ``` -### Kafka input format supervisor spec example - -If you want to parse the Kafka metadata fields in addition to the Kafka payload value contents, you can use the `kafka` input format. - -The `kafka` input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, -the Kafka topic name, the Kafka event headers, and the key field that itself can be parsed using any available InputFormat. - -For example, consider the following structure for a Kafka message that represents a fictitious wiki edit in a development environment: - -- **Kafka timestamp**: `1680795276351` -- **Kafka topic**: `wiki-edits` -- **Kafka headers**: - - `env=development` - - `zone=z1` -- **Kafka key**: `wiki-edit` -- **Kafka payload value**: `{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}` - -Using `{ "type": "json" }` as the input format would only parse the payload value. -To parse the Kafka metadata in addition to the payload, use the `kafka` input format. - -You would configure it as follows: - -- `valueFormat`: Define how to parse the payload value. Set this to the payload parsing input format (`{ "type": "json" }`). -- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.timestamp`. -- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid schema to avoid conflicts with columns from the payload. The default is `kafka.topic`. This field is useful when ingesting data from multiple topics into same datasource. -- `headerFormat`: The default value `string` decodes strings in UTF-8 encoding from the Kafka header. - Other supported encoding formats include the following: - - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1. - - `US-ASCII`: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set. - - `UTF-16`: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark. - - `UTF-16BE`: Sixteen-bit UCS Transformation Format, big-endian byte order. - - `UTF-16LE`: Sixteen-bit UCS Transformation Format, little-endian byte order. -- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default is `kafka.header.`. - Considering the header from the example, Druid maps the headers to the following columns: `kafka.header.env`, `kafka.header.zone`. -- `keyFormat`: Supply an input format to parse the key. Only the first value will be used. - If, as in the example, your key values are simple strings, then you can use the `tsv` format to parse them. - ``` - { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - ``` - Note that for `tsv`,`csv`, and `regex` formats, you need to provide a `columns` array to make a valid input format. Only the first one is used, and its name will be ignored in favor of `keyColumnName`. -- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts with columns from the payload. The default is `kafka.key`. +</details> -Putting it together, the following input format (that uses the default values for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and `keyColumnName`) +### I/O configuration -```json -{ - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } -} -``` +The following table outlines the configuration options for `ioConfig`: -would parse the example message as follows: +|Property|Type|Description|Required|Default| +|--------|----|-----------|--------|-------| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Druid does not support topic patterns. To ingest data from multiple topic, see [Ingest from multiple topics](#ingest-from-multiple-topics). |Yes|| +|`inputFormat`|Object|The [input format](../../ingestion/data-formats.md#input-format) to define input data parsing.|Yes|| +|`consumerProperties`|String, Object|A map of properties to pass to the Kafka consumer. See [Consumer properties](#consumer-properties) for details.|Yes|| +|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds.|No|100| +|`replicas`|Integer|The number of replica sets, where 1 is a single set of tasks (no replication). Druid always assigns replicate tasks to different workers to provide resiliency against process failure.|No|1| +|`taskCount`|Integer|The maximum number of reading tasks in a replica set. The maximum number of reading tasks equals `taskCount * replicas`. The total number of tasks, reading and publishing, is greater than this count. See [Capacity planning](../../ingestion/supervisor.md#capacity-planning) for more details. When `taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less than the `taskCount` value.|No|1| +|`taskDuration`|ISO 8601 period|The length of time before tasks stop reading and begin publishing segments.|No|PT1H| +|`startDelay`|ISO 8601 period|The period to wait before the supervisor starts managing tasks.|No|PT5S| +|`period`|ISO 8601 period|Determines how often the supervisor executes its management logic. Note that the supervisor also runs in response to certain events, such as tasks succeeding, failing, and reaching their task duration. The `period` value specifies the maximum time between iterations.|No|PT30S| +|`useEarliestOffset`|Boolean|If a supervisor manages a datasource for the first time, it obtains a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks start from where the previous segments ended. Druid only uses `useEarliestOffset` on the first run.|No|`false`| +|`completionTimeout`|ISO 8601 period|The length of time to wait before declaring a publishing task as failed and terminating it. If the value is too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|No|PT30M| +|`lateMessageRejectionStartDateTime`|ISO 8601 date time|Configures tasks to reject messages with timestamps earlier than this date time. For example, if this property is set to `2016-01-01T11:00Z` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline.|No|| +|`lateMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps earlier than this period before the task was created. For example, if this property is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps earlier than `2016-01-01T11:00Z`. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments, such as a realtime and a nightly batch ingestion pipeline. Note that you can specify only one of the late message rejection properties.|No|| +|`earlyMessageRejectionPeriod`|ISO 8601 period|Configures tasks to reject messages with timestamps later than this period after the task reached its task duration. For example, if this property is set to `PT1H`, the task duration is set to `PT1H` and the supervisor creates a task at `2016-01-01T12:00Z`, Druid drops messages with timestamps later than `2016-01-01T14:00Z`. Tasks sometimes run past their task duration, such as in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause Druid to drop messages unexpectedly whenever a task runs past its originally configured task duration.|No|| +|`autoScalerConfig`|Object|Defines auto scaling behavior for ingestion tasks. See [Task autoscaler](../../ingestion/supervisor.md#task-autoscaler) for more information.|No|null| +|`idleConfig`|Object|Defines how and when the Kafka supervisor can become idle. See [Idle supervisor configuration](#idle-supervisor-configuration) for more details.|No|null| -```json -{ - "channel": "#sv.wikipedia", - "timestamp": "2016-06-27T00:00:11.080Z", - "page": "Salo Toraut", - "delta": 31, - "namespace": "Main", - "kafka.timestamp": 1680795276351, - "kafka.topic": "wiki-edits", - "kafka.header.env": "development", - "kafka.header.zone": "z1", - "kafka.key": "wiki-edit" -} -``` +#### Consumer properties -For more information on data formats, see [Data formats](../../ingestion/data-formats.md). +Consumer properties must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `<BROKER_1>:<PORT_1>,<BROKER_2>:<PORT_2>,...`. +By default, `isolation.level` is set to `read_committed`. If you use older versions of Kafka servers without transactions support or don't want Druid to consume only committed transactions, set `isolation.level` to `read_uncommitted`. -Finally, add these Kafka metadata columns to the `dimensionsSpec` or set your `dimensionsSpec` to auto-detect columns. - -The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions: +In some cases, you may need to fetch consumer properties at runtime. For example, when `bootstrap.servers` is not known upfront, or is not static. To enable SSL connections, you must provide passwords for `keystore`, `truststore`, and `key` secretly. You can provide configurations at runtime with a dynamic config provider implementation like the environment variable config provider that comes with Druid. For more information, see [Dynamic config provider](../../operations/dynamic-config-provider.md). + +For example, if you are using SASL and SSL with Kafka, set the following environment variables for the Druid user on the machines running the Overlord and the Peon services: ``` -{ - "type": "kafka", - "spec": { - "ioConfig": { - "type": "kafka", - "consumerProperties": { - "bootstrap.servers": "localhost:9092" - }, - "topic": "wiki-edits", - "inputFormat": { - "type": "kafka", - "valueFormat": { - "type": "json" - }, - "headerFormat": { - "type": "string" - }, - "keyFormat": { - "type": "tsv", - "findColumnsFromHeader": false, - "columns": ["x"] - } - }, - "useEarliestOffset": true - }, - "dataSchema": { - "dataSource": "wikiticker", - "timestampSpec": { - "column": "timestamp", - "format": "posix" - }, - "dimensionsSpec": "dimensionsSpec": { - "useSchemaDiscovery": true, - "includeAllDimensions": true - }, - "granularitySpec": { - "queryGranularity": "none", - "rollup": false, - "segmentGranularity": "day" +export KAFKA_JAAS_CONFIG="org.apache.kafka.common.security.plain.PlainLoginModule required username='admin_user' password='admin_password';" +export SSL_KEY_PASSWORD=mysecretkeypassword +export SSL_KEYSTORE_PASSWORD=mysecretkeystorepassword +export SSL_TRUSTSTORE_PASSWORD=mysecrettruststorepassword +``` + +```json + "druid.dynamic.config.provider": { + "type": "environment", + "variables": { + "sasl.jaas.config": "KAFKA_JAAS_CONFIG", + "ssl.key.password": "SSL_KEY_PASSWORD", + "ssl.keystore.password": "SSL_KEYSTORE_PASSWORD", + "ssl.truststore.password": "SSL_TRUSTSTORE_PASSWORD" } - }, - "tuningConfig": { - "type": "kafka" - } } -} ``` -After Druid ingests the data, you can query the Kafka metadata columns as follows: +Verify that you've changed the values for all configurations to match your own environment. In the Druid data loader interface, you can use the environment variable config provider syntax in the **Consumer properties** field on the **Connect tab**. When connecting to Kafka, Druid replaces the environment variables with their corresponding values. -```sql -SELECT - "kafka.header.env", - "kafka.key", - "kafka.timestamp", - "kafka.topic" -FROM "wikiticker" -``` +#### Task autoscaler -This query returns: +You can optionally configure autoscaling behavior for ingestion tasks using the `autoScalerConfig` property of the `ioConfig` object. -| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` | -|--------------------|-----------|---------------|---------------| -| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` | +The following table outlines the configuration options for `autoScalerConfig`: -For more information, see [`kafka` data format](../../ingestion/data-formats.md#kafka). +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`enableTaskAutoScaler`|Enables the auto scaler. If not specified, Druid disables the auto scaler even when `autoScalerConfig` is not null.|No|`false`| +|`taskCountMax`|Maximum number of ingestion tasks. Set `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales reading tasks up to `{numKafkaPartitions}`. In this case, `taskCountMax` is ignored.|Yes|| +|`taskCountMin`|Minimum number of ingestion tasks. When you enable the autoscaler, Druid ignores the value of `taskCount` in `ioConfig` and starts with the `taskCountMin` number of tasks to launch.|Yes|| +|`minTriggerScaleActionFrequencyMillis`|Minimum time interval between two scale actions.| No|600000| +|`autoScalerStrategy`|The algorithm of `autoScaler`. Druid only supports the `lagBased` strategy. See [Autoscaler strategy](#autoscaler-strategy) for more information.|No|`lagBased`| -## Submit a supervisor spec +##### Autoscaler strategy -Druid starts a supervisor for a dataSource when you submit a supervisor spec. You can use the data loader in the web console or you can submit a supervisor spec to the following endpoint: +The following table outlines the configuration options for `autoScalerStrategy`: -`http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor` +|Property|Description|Required|Default| +|--------|-----------|--------|-------| +|`lagCollectionIntervalMillis`|The time period during which Druid collects lag metric points.|No|30000| +|`lagCollectionRangeMillis`|The total time window of lag collection. Use with `lagCollectionIntervalMillis` to specify the intervals at which to collect lag metric points.|No|600000| +|`scaleOutThreshold`|The threshold of scale out action. |No|6000000| +|`triggerScaleOutFractionThreshold`|Enables scale out action if `triggerScaleOutFractionThreshold` percent of lag points is higher than `scaleOutThreshold`.|No|0.3| +|`scaleInThreshold`|The threshold of scale in action.|No|1000000| +|`triggerScaleInFractionThreshold`|Enables scale in action if `triggerScaleInFractionThreshold` percent of lag points is lower than `scaleOutThreshold`.|No|0.9| +|`scaleActionStartDelayMillis`|The number of milliseconds to delay after the supervisor starts before the first scale logic check.|No|300000| +|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000| +|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1| +|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2| -For example: +#### Idle supervisor configuration -``` -curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor -``` +:::info +Idle state transitioning is currently designated as experimental. +::: + +When the supervisor enters the idle state, no new tasks are launched subsequent to the completion of the currently executing tasks. This strategy may lead to reduced costs for cluster operators while using topics that get sporadic data. + +The following table outlines the configuration options for `idleConfig`: + +|Property|Description|Required| +|--------|-----------|--------| +|`enabled`|If `true`, the supervisor becomes idle if there is no data on input stream or topic for some time.|No|`false`| +|`inactiveAfterMillis`|The supervisor becomes idle if all existing data has been read from input topic and no new data has been published for `inactiveAfterMillis` milliseconds.|No|`600_000`| + +The following example shows a supervisor spec with `lagBased` autoscaler and idle configuration enabled: + +<details> + <summary>Click to view the example</summary> -Where the file `supervisor-spec.json` contains your Kafka supervisor spec file. +```json +{ + "type": "kafka", + "spec": { + "dataSchema": { + ... + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 6000000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "taskCount":1, + "replicas":1, + "taskDuration":"PT1H", + "idleConfig": { + "enabled": true, + "inactiveAfterMillis": 600000 + } + }, + "tuningConfig":{ + ... + } + } +} +``` +</details> + +#### Ingest from multiple topics + +:::info +If you enable multi-topic ingestion for a datasource, downgrading to a version older than +28.0.0 will cause the ingestion for that datasource to fail. +::: + +To ingest data from multiple topics, you set `topicPattern` instead of `topic in the supervisor `ioConfig` object`. +You can pass multiple topics as a regex pattern as the value for `topicPattern` in `ioConfig`. For example, to +ingest data from clicks and impressions, set `topicPattern` to `clicks|impressions` in `ioCofig`. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid automatically starts +ingesting from those new topics. A topic name that only matches partially such as `my-metrics-12` will not be +included for ingestion. + +When ingesting data from multiple topics, partitions are assigned based on the hashcode of the topic name and the +ID of the partition within that topic. The partition assignment might not be uniform across all the tasks. It's also +assumed that partitions across individual topics have similar load. It is recommended that you have a higher number of +partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to +ingest from both high and low load topic in the same supervisor. + +### Tuning configuration + +The `tuningConfig` object is optional. If you don't specify the `tuningConfig` object, Druid uses the default configuration settings. + +The following table outlines the configuration options for `tuningConfig`: + +|Property|Type|Description|Required|Default| +|--------|----|-----------|--------|-------| +|`type`|String|The indexing task type. This should always be `kafka`.|Yes|| Review Comment: ```suggestion |`type`|String|The indexing task type. Must be `kafka`.|Yes|| ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
