This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7b237c22 Add docs for ingesting Kafka topic name (#14894)
3c7b237c22 is described below

commit 3c7b237c22505e195c4c6278c4123707755da7e4
Author: Abhishek Agarwal <[email protected]>
AuthorDate: Thu Aug 24 19:19:59 2023 +0530

    Add docs for ingesting Kafka topic name (#14894)
    
    Add documentation on how to extract the Kafka topic name and ingest it into 
the data.
---
 docs/development/extensions-core/kafka-ingestion.md | 18 +++++++++++-------
 docs/ingestion/data-formats.md                      |  5 ++++-
 website/.spelling                                   |  1 +
 3 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/docs/development/extensions-core/kafka-ingestion.md 
b/docs/development/extensions-core/kafka-ingestion.md
index 46426e55f2..329967747b 100644
--- a/docs/development/extensions-core/kafka-ingestion.md
+++ b/docs/development/extensions-core/kafka-ingestion.md
@@ -135,11 +135,12 @@ The following example demonstrates a supervisor spec for 
Kafka that uses the `JS
 If you want to parse the Kafka metadata fields in addition to the Kafka 
payload value contents, you can use the `kafka` input format.
 
 The `kafka` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kafka event timestamp,
-the Kafka event headers, and the key field that itself can be parsed using any 
available InputFormat.
+the Kafka topic name, the Kafka event headers, and the key field that itself 
can be parsed using any available InputFormat.
 
 For example, consider the following structure for a Kafka message that 
represents a fictitious wiki edit in a development environment:
 
 - **Kafka timestamp**: `1680795276351`
+- **Kafka topic**: `wiki-edits`
 - **Kafka headers**:
   - `env=development`
   - `zone=z1`
@@ -153,6 +154,7 @@ You would configure it as follows:
 
 - `valueFormat`: Define how to parse the payload value. Set this to the 
payload parsing input format (`{ "type": "json" }`).
 - `timestampColumnName`: Supply a custom name for the Kafka timestamp in the 
Druid schema to avoid conflicts with columns from the payload. The default is 
`kafka.timestamp`.
+- `topicColumnName`: Supply a custom name for the Kafka topic in the Druid 
schema to avoid conflicts with columns from the payload. The default is 
`kafka.topic`. This field is useful when ingesting data from multiple topics 
into same datasource.
 - `headerFormat`: The default value `string` decodes strings in UTF-8 encoding 
from the Kafka header.
    Other supported encoding formats include the following:
    - `ISO-8859-1`: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.
@@ -174,7 +176,7 @@ You would configure it as follows:
   Note that for `tsv`,`csv`, and `regex` formats, you need to provide a 
`columns` array to make a valid input format. Only the first one is used, and 
its name will be ignored in favor of `keyColumnName`.
 - `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts 
with columns from the payload. The default is `kafka.key`.
 
-Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
+Putting it together, the following input format (that uses the default values 
for `timestampColumnName`, `topicColumnName`, `headerColumnPrefix`, and 
`keyColumnName`)
 
 ```json
 {
@@ -203,6 +205,7 @@ would parse the example message as follows:
   "delta": 31,
   "namespace": "Main",
   "kafka.timestamp": 1680795276351,
+  "kafka.topic": "wiki-edits",
   "kafka.header.env": "development",
   "kafka.header.zone": "z1",
   "kafka.key": "wiki-edit"
@@ -213,7 +216,7 @@ For more information on data formats, see [Data 
formats](../../ingestion/data-fo
 
 Finally, add these Kafka metadata columns to the `dimensionsSpec` or  set your 
`dimensionsSpec` to auto-detect columns.
      
-The following supervisor spec demonstrates how to ingest the Kafka header, 
key, and timestamp into Druid dimensions:
+The following supervisor spec demonstrates how to ingest the Kafka header, 
key, timestamp, and topic into Druid dimensions:
 
 ```
 {
@@ -270,15 +273,16 @@ After Druid ingests the data, you can query the Kafka 
metadata columns as follow
 SELECT
   "kafka.header.env",
   "kafka.key",
-  "kafka.timestamp"
+  "kafka.timestamp",
+  "kafka.topic"
 FROM "wikiticker"
 ```
 
 This query returns:
 
-| `kafka.header.env` | `kafka.key` | `kafka.timestamp` |
-|--------------------|-----------|---------------|
-| `development`      | `wiki-edit` | `1680795276351` |
+| `kafka.header.env` | `kafka.key` | `kafka.timestamp` | `kafka.topic` |
+|--------------------|-----------|---------------|---------------|
+| `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
 
 For more information, see [`kafka` data 
format](../../ingestion/data-formats.md#kafka).
 
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index e3ad22a916..7dd1b10c7f 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -560,17 +560,19 @@ Configure the Kafka `inputFormat` as follows:
 | `type` | String | Set value to `kafka`. | yes |
 | `valueFormat` | [InputFormat](#input-format) | Any 
[InputFormat](#input-format) to parse the Kafka value payload. For details 
about specifying the input format, see [Specifying data 
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
 | yes |
 | `timestampColumnName` | String | Name of the column for the kafka record's 
timestamp.| no (default = "kafka.timestamp") |
+| `topicColumnName` | String |Name of the column for the kafka record's topic. 
It is useful when ingesting data from multiple topics.| no (default = 
"kafka.timestamp") |
 | `headerColumnPrefix` | String | Custom prefix for all the header columns. | 
no (default = "kafka.header.") |
 | `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka 
headers. Supports String types. Because Kafka header values are bytes, the 
parser decodes them as UTF-8 encoded strings. To change this behavior, 
implement your own parser based on the encoding style. Change the 'encoding' 
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
 | `keyFormat` | [InputFormat](#input-format) | Any [input 
format](#input-format) to parse the Kafka key. It only processes the first 
entry of the `inputFormat` field. For details, see [Specifying data 
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
 | no |
 | `keyColumnName` | String | Name of the column for the kafka record's key.| 
no (default = "kafka.key") |
 
+
 The Kafka input format augments the payload with information from the Kafka 
timestamp, headers, and key.
 
 If there are conflicts between column names in the payload and those created 
from the metadata, the payload takes precedence.
 This ensures that upgrading a Kafka ingestion to use the Kafka input format 
(by taking its existing input format and setting it as the `valueFormat`) can 
be done without losing any of the payload data.  
 
-Here is a minimal example that only augments the parsed payload with the Kafka 
timestamp column:
+Here is a minimal example that only augments the parsed payload with the Kafka 
timestamp column and kafka topic column:
 
 ```
 "ioConfig": {
@@ -594,6 +596,7 @@ Here is a complete example:
       "type": "json"
     }
     "timestampColumnName": "kafka.timestamp",
+    "topicColumnName": "kafka.topic",
     "headerFormat": {
       "type": "string",
       "encoding": "UTF-8"
diff --git a/website/.spelling b/website/.spelling
index 32f7786b1b..cc4e02fcf2 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -1258,6 +1258,7 @@ KafkaStringHeaderFormat
 kafka.header.
 kafka.key
 kafka.timestamp
+kafka.topic
 keyColumnName
 keyFormat
 listDelimiter


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to