loquisgon commented on a change in pull request #11912:
URL: https://github.com/apache/druid/pull/11912#discussion_r747875023



##########
File path: docs/development/extensions-core/kafka-ingestion.md
##########
@@ -119,403 +127,142 @@ Where the file `supervisor-spec.json` contains a Kafka 
supervisor spec:
 }
 ```
 
-## Supervisor Configuration
-
-|Field|Description|Required|
-|--------|-----------|---------|
-|`type`|Supervisor type. For Kafka streaming, set to `kafka`.|yes|
-|`spec`| Container object for the supervisor configuration. | yes |
-|`dataSchema`|Schema for the Kafka indexing task to use during ingestion.|yes|
-|`ioConfig`|A `KafkaSupervisorIOConfig` object to define the Kafka connection 
and I/O-related settings for the supervisor and indexing task. See 
[KafkaSupervisorIOConfig](#kafkasupervisorioconfig).|yes|
-|`tuningConfig`|A KafkaSupervisorTuningConfig object to define 
performance-related settings for the supervisor and indexing tasks. See 
[KafkaSupervisorTuningConfig](#kafkasupervisortuningconfig).|no|
-
-### KafkaSupervisorIOConfig
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic 
patterns are not supported.|yes|
-|`inputFormat`|Object|`inputFormat` to define input data parsing. See 
[Specifying data format](#specifying-data-format) for details about specifying 
the input format.|yes|
-|`consumerProperties`|Map<String, Object>|A map of properties to pass to the 
Kafka consumer. See [More on consumer 
properties](#more-on-consumerproperties).|yes|
-|`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll 
records, in milliseconds|no (default == 100)|
-|`replicas`|Integer|The number of replica sets. "1" means a single set of 
tasks without replication. Druid always assigns replica tasks to different 
workers to provide resiliency against worker failure.|no (default == 1)|
-|`taskCount`|Integer|The maximum number of *reading* tasks in a *replica set*. 
The maximum number of reading tasks equals `taskCount * replicas`. Therefore, 
the total number of tasks, *reading* + *publishing*, is greater than this 
count. See [Capacity Planning](#capacity-planning) for more details. When 
`taskCount > {numKafkaPartitions}`, the actual number of reading tasks is less 
than the `taskCount` value.|no (default == 1)|
-|`taskDuration`|ISO8601 Period|The length of time before tasks stop reading 
and begin publishing segments.|no (default == PT1H)|
-|`startDelay`|ISO8601 Period|The period to wait before the supervisor starts 
managing tasks.|no (default == PT5S)|
-|`period`|ISO8601 Period|Frequency at which the supervisor executes its 
management logic. The supervisor also runs in response to certain events. For 
example task success, task failure, and tasks reaching their `taskDuration`. 
The `period` value specifies the maximum time between iterations.|no (default 
== PT30S)|
-|`useEarliestOffset`|Boolean|If a supervisor manages a dataSource for the 
first time, it obtains a set of starting offsets from Kafka. This flag 
determines whether it retrieves the earliest or latest offsets in Kafka. Under 
normal circumstances, subsequent tasks will start from where the previous 
segments ended. Therefore Druid only uses `useEarliestOffset` on first run.|no 
(default == false)|
-|`completionTimeout`|ISO8601 Period|The length of time to wait before 
declaring a publishing task as failed and terminating it. If the value is too 
low, your tasks may never publish. The publishing clock for a task begins 
roughly after `taskDuration` elapses.|no (default == PT30M)|
-|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to 
reject messages with timestamps earlier than this date time; for example if 
this is set to `2016-01-01T11:00Z` and the supervisor creates a task at 
*2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than 
*2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no 
(default == none)|
-|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps earlier than this period before the task was created; 
for example if this is set to `PT1H` and the supervisor creates a task at 
*2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* 
will be dropped. This may help prevent concurrency issues if your data stream 
has late messages and you have multiple pipelines that need to operate on the 
same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please 
note that only one of `lateMessageRejectionPeriod` or 
`lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
-|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject 
messages with timestamps later than this period after the task reached its 
taskDuration; for example if this is set to `PT1H`, the taskDuration is set to 
`PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with 
timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks 
sometimes run past their task duration, for example, in cases of supervisor 
failover. Setting earlyMessageRejectionPeriod too low may cause messages to be 
dropped unexpectedly whenever a task runs past its originally configured task 
duration.|no (default == none)|
-|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest 
tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no 
(default == null)|
-
-#### Task Autoscaler Properties
-
-> Note that Task AutoScaler is currently designated as experimental.
-
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `enableTaskAutoScaler` | Enable or disable autoscaling. `false` or blank 
disables the `autoScaler` even when `autoScalerConfig` is not null| no (default 
== false) |
-| `taskCountMax` | Maximum number of ingestion tasks. Set `taskCountMax >= 
taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, Druid only scales 
reading tasks up to the `{numKafkaPartitions}`. In this case `taskCountMax` is 
ignored.  | yes |
-| `taskCountMin` | Minimum number of ingestion tasks. When you enable 
autoscaler, Druid ignores the value of taskCount in `IOConfig` and starts with 
the `taskCountMin` number of tasks.| yes |
-| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two 
scale actions. | no (default == 600000) |
-| `autoScalerStrategy` | The algorithm of `autoScaler`. Only supports 
`lagBased`. See [Lag Based AutoScaler Strategy Related 
Properties](#lag-based-autoscaler-strategy-related-properties) for details.| no 
(default == `lagBased`) |
-
-##### Lag Based AutoScaler Strategy Related Properties
-| Property | Description | Required |
-| ------------- | ------------- | ------------- |
-| `lagCollectionIntervalMillis` | Period of lag points collection.  | no 
(default == 30000) |
-| `lagCollectionRangeMillis` | The total time window of lag collection. Use 
with `lagCollectionIntervalMillis`,it means that in the recent 
`lagCollectionRangeMillis`, collect lag metric points every 
`lagCollectionIntervalMillis`. | no (default == 600000) |
-| `scaleOutThreshold` | The threshold of scale out action | no (default == 
6000000) |
-| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` 
percent of lag points are higher than `scaleOutThreshold`, then do scale out 
action. | no (default == 0.3) |
-| `scaleInThreshold` | The Threshold of scale in action | no (default == 
1000000) |
-| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` 
percent of lag points are lower than `scaleOutThreshold`, then do scale in 
action. | no (default == 0.9) |
-| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor 
starts when first check scale logic. | no (default == 300000) |
-| `scaleActionPeriodMillis` | The frequency of checking whether to do scale 
action in millis | no (default == 60000) |
-| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
-| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
-
-The following example demonstrates supervisor spec with `lagBased` autoScaler 
enabled:
-```json
+### Kafka input format supervisor spec example
+If you want to ingest data from other fields in addition to the Kafka message 
contents, you can use the `kafka` input format. The `kafka` input format lets 
you ingest:
+- the message key field.
+- message headers.
+- the Kafka message timestamp.
+
+> The Kafka inputFormat is currently designated as experimental.
+
+For example, consider the following structure for a message that represents an 
fictitious wiki edit in a development environment:
+- **Event headers**: {"environment": "development"}
+- **Event key**: {"key: "wiki-edit"}
+- **Event value**: \<JSON object with message contents of the change details\>
+- **Event timestamp**: "Nov. 10, 2021 at 14:06"
+
+When you use the `kafka` input format, you configure the way that Druid names 
the dimensions created from the Kafka message:
+- `headerLabelPrefix`: Supply a prefix to the Kafka headers to avoid any 
conflicts with named dimensions. The default is `kafka.header.`. Considering 
the header from the example, Druid maps the header to the following column: 
`kafka.header.enviornment`.
+- `timestampColumnName`:Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with other time columns. The default is 
`kafka.timestamp`.
+- `keyColumnName`: Supply the name for the Kafka key column in Druid. The 
default is `kafka.key`.
+Additionally, you must provide information about how Druid should parse the 
data in the Kafka message:
+- `headerFormat`: The default "string" decodes UTF8-encoded strings from the 
Kafka header. If you need another format, you can implement your own parser.
+- `keyFormat`: Takes a Druid `inputFormat` and uses the value for first key it 
finds. According to the example the value is "wiki-edit". It discards the key 
name in this case. If you store the key as a string, use the `CSV` input 
format. For example if you have simple string for the the key `wiki-edit`, you 
can use the following to parse the key:
+  ```
+  "keyFormat": {
+    "type": "csv",
+    "hasHeaderRow": false,
+    "findColumnsFromHeader": false,
+    "columns": ["key"]
+    } 
+  ```
+- `valueFormat`: Define how to parse the message contents. You can use any of 
the Druid input formats that work for Kafka.
+
+For more information on data formats, see [Data 
formats](../../ingestion/data-formats.md).
+
+Finally, add the Kafka message columns to the `dimensionsSpec`. For the key 
and timestamp, you can use the dimension names you defined for `keyColumnName` 
and `timestampColumnName`. For header dimensions, supply the append the header 
key to the `headerLabelPrefix`. For example `kafka.header.environment`.

Review comment:
       Please reword this: "supply the append the header key"




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to