317brian commented on code in PR #14049:
URL: https://github.com/apache/druid/pull/14049#discussion_r1160981550
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
+You can parse out the Kafka metadata in addition to the payload by using the
`kafka` input format.
Review Comment:
```suggestion
To parse the Kafka metadata in addition to the payload, use the `kafka`
input format.
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
Review Comment:
```suggestion
Using `{ "type": "json" }` as the input format would only parse the payload
value.
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
+You can parse out the Kafka metadata in addition to the payload by using the
`kafka` input format.
+
+You would configure it as follows:
+
+- `valueFormat`: Define how to parse the payload value. Set this to the
payload parsing input format (`{ "type": "json" }`).
+- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with columns from the payload. The default is
`kafka.timestamp`.
- `headerFormat`: The default "string" decodes UTF8-encoded strings from the
Kafka header. If you need another format, you can implement your own parser.
-- `keyFormat`: Takes a Druid `inputFormat` and uses the value for the first
key it finds. According to the example the value is "wiki-edit". It discards
the key name in this case. If you store the key as a string, use the `CSV`
input format. For example, if you have simple string for the the key
`wiki-edit`, you can use the following to parse the key:
+- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with columns from the payload. The default is `kafka.header.`.
+ Considering the header from the example, Druid maps the headers to the
following columns: `kafka.header.env`, `kafka.header.zone`.
+- `keyFormat`: Supply an input format to parse the key. Only the first value
will be used.
+ If, as in the example, you key values are simple stings then you can use the
`tsv` format to parse them.
Review Comment:
```suggestion
If, as in the example, your key values are simple strings, then you can
use the `tsv` format to parse them.
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
+You can parse out the Kafka metadata in addition to the payload by using the
`kafka` input format.
+
+You would configure it as follows:
+
+- `valueFormat`: Define how to parse the payload value. Set this to the
payload parsing input format (`{ "type": "json" }`).
+- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with columns from the payload. The default is
`kafka.timestamp`.
- `headerFormat`: The default "string" decodes UTF8-encoded strings from the
Kafka header. If you need another format, you can implement your own parser.
-- `keyFormat`: Takes a Druid `inputFormat` and uses the value for the first
key it finds. According to the example the value is "wiki-edit". It discards
the key name in this case. If you store the key as a string, use the `CSV`
input format. For example, if you have simple string for the the key
`wiki-edit`, you can use the following to parse the key:
+- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with columns from the payload. The default is `kafka.header.`.
+ Considering the header from the example, Druid maps the headers to the
following columns: `kafka.header.env`, `kafka.header.zone`.
+- `keyFormat`: Supply an input format to parse the key. Only the first value
will be used.
+ If, as in the example, you key values are simple stings then you can use the
`tsv` format to parse them.
```
- "keyFormat": {
- "type": "csv",
- "hasHeaderRow": false,
+ {
+ "type": "tsv",
"findColumnsFromHeader": false,
- "columns": ["key"]
- }
+ "columns": ["x"]
+ }
```
-- `valueFormat`: Define how to parse the message contents. You can use any of
the Druid input formats that work for Kafka.
+ Note that in case of the `tsv` format (also `csv` and `regex`) you need to
provide a `columns` array to make a valid input format but only the first one
will be used and it's name will be ignored in favor of `keyColumnName`.
+- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts
with columns from the payload. The default is `kafka.key`.
+
+Putting it together, the following input format (that accepts default values
for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
+
+```json
+{
+ "type": "kafka",
+ "valueFormat": {
+ "type": "json"
+ },
+ "headerFormat": {
+ "type": "string"
+ },
+ "keyFormat": {
+ "type": "tsv",
+ "findColumnsFromHeader": false,
+ "columns": ["x"]
+ }
+}
+```
+
+would parse the example message as follows:
+
+```json
+{
+ "channel": "#sv.wikipedia",
+ "timestamp": "2016-06-27T00:00:11.080Z",
+ "page": "Salo Toraut",
+ "delta": 31,
+ "namespace": "Main",
+ "kafka.timestamp": 1680795276351,
+ "kafka.header.env": "development",
+ "kafka.header.zone": "z1",
+ "kafka.key": "wiki-edit"
+}
+```
For more information on data formats, see [Data
formats](../../ingestion/data-formats.md).
-Finally, add the Kafka message columns to the `dimensionsSpec`. For the key
and timestamp, you can use the dimension names you defined for `keyColumnName`
and `timestampColumnName`. For header dimensions, append the header key to the
`headerColumnPrefix`. For example `kafka.header.environment`.
+Finally, you would want to add these Kafka metadata columns to the
`dimensionsSpec` (alternatively you can set your `dimensionsSpec` to
auto-detect columns.
Review Comment:
```suggestion
Finally, add these Kafka metadata columns to the `dimensionsSpec` or set
your `dimensionsSpec` to auto-detect columns.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
Review Comment:
```suggestion
| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports string types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
+| `keyColumnName` | String | Name of the column for the kafka record's key.|
no (default = "kafka.key") |
+
+The Kafka input format augments the payload with information form the Kafka
timestamp, headers, and key.
Review Comment:
```suggestion
The Kafka input format augments the payload with information from the Kafka
timestamp, headers, and key.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
+| `keyColumnName` | String | Name of the column for the kafka record's key.|
no (default = "kafka.key") |
+
+The Kafka input format augments the payload with information form the Kafka
timestamp, headers, and key.
+
+If there are conflicts between column names in the payload and those created
from the metadata the payload takes precedence.
+This is done to ensure that upgrading a Kafka ingestion to use the Kafka input
format (by taking its existing input format and setting it as the `valueFormat`
in the kafka format) can be done without losing any of the payload data.
Review Comment:
```suggestion
This ensures that upgrading a Kafka ingestion to use the Kafka input format
(by taking its existing input format and setting it as the `valueFormat`) can
be done without losing any of the payload data.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
+| `keyColumnName` | String | Name of the column for the kafka record's key.|
no (default = "kafka.key") |
+
+The Kafka input format augments the payload with information form the Kafka
timestamp, headers, and key.
Review Comment:
```suggestion
The Kafka input format augments the payload with information from the Kafka
timestamp, headers, and key.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
Review Comment:
```suggestion
to parse the Kafka metadata (timestamp, headers, and key) that is part of
Kafka messages.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
Review Comment:
```suggestion
Only use it when ingesting from Apache Kafka.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
Review Comment:
```suggestion
| `timestampColumnName` | String | Name of the column for the Kafka record's
timestamp.| no (default = "kafka.timestamp") |
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -177,84 +221,61 @@ The following supervisor spec demonstrates how to ingest
the Kafka header, key,
"topic": "wiki-edits",
"inputFormat": {
"type": "kafka",
- "headerColumnPrefix": "kafka.header.",
- "timestampColumnName": "kafka.timestamp",
- "keyColumnName": "kafka.key",
+ "valueFormat": {
+ "type": "json"
+ },
"headerFormat": {
"type": "string"
},
"keyFormat": {
- "type": "json"
- },
- "valueFormat": {
- "type": "json"
- },
- "findColumnsFromHeader": false
+ "type": "tsv",
+ "findColumnsFromHeader": false,
+ "columns": ["x"]
+ }
},
"useEarliestOffset": true
},
- "tuningConfig": {
- "type": "kafka"
- },
"dataSchema": {
"dataSource": "wikiticker",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
- "dimensionsSpec": {
- "dimensions": [
- {
- "type": "string",
- "name": "kafka.key"
- },
- {
- "type": "string",
- "name": "kafka.timestamp"
- },
- {
- "type": "string",
- "name": "kafka.header.environment"
- },
- "$schema",
- {
- "type": "long",
- "name": "id"
- },
- "type",
- {
- "type": "long",
- "name": "namespace"
- },
- "title",
- "comment",
- "user",]
- ]
+ "dimensionsSpec": "dimensionsSpec": {
+ "useSchemaDiscovery": true,
+ "includeAllDimensions": true
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
+ },
+ "tuningConfig": {
+ "type": "kafka"
}
- },
- "tuningConfig": {
- "type": "kafka"
}
}
```
-After Druid ingests the data, you can query the Kafka message columns as
follows:
-```unix
+
+After Druid ingests the data, you can query the Kafka metadata columns as
follows:
+
+```sql
SELECT
- "kafka.header.environment",
+ "kafka.header.env",
"kafka.key",
"kafka.timestamp"
FROM "wikiticker"
-
-kafka.header.environment kafka.key kafka.timestamp
-development wiki-edit 1636399229823
```
+
+This query would output:
Review Comment:
```suggestion
This query returns:
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
+You can parse out the Kafka metadata in addition to the payload by using the
`kafka` input format.
+
+You would configure it as follows:
+
+- `valueFormat`: Define how to parse the payload value. Set this to the
payload parsing input format (`{ "type": "json" }`).
+- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with columns from the payload. The default is
`kafka.timestamp`.
- `headerFormat`: The default "string" decodes UTF8-encoded strings from the
Kafka header. If you need another format, you can implement your own parser.
-- `keyFormat`: Takes a Druid `inputFormat` and uses the value for the first
key it finds. According to the example the value is "wiki-edit". It discards
the key name in this case. If you store the key as a string, use the `CSV`
input format. For example, if you have simple string for the the key
`wiki-edit`, you can use the following to parse the key:
+- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with columns from the payload. The default is `kafka.header.`.
+ Considering the header from the example, Druid maps the headers to the
following columns: `kafka.header.env`, `kafka.header.zone`.
+- `keyFormat`: Supply an input format to parse the key. Only the first value
will be used.
+ If, as in the example, you key values are simple stings then you can use the
`tsv` format to parse them.
```
- "keyFormat": {
- "type": "csv",
- "hasHeaderRow": false,
+ {
+ "type": "tsv",
"findColumnsFromHeader": false,
- "columns": ["key"]
- }
+ "columns": ["x"]
+ }
```
-- `valueFormat`: Define how to parse the message contents. You can use any of
the Druid input formats that work for Kafka.
+ Note that in case of the `tsv` format (also `csv` and `regex`) you need to
provide a `columns` array to make a valid input format but only the first one
will be used and it's name will be ignored in favor of `keyColumnName`.
+- `keyColumnName`: Supply the name for the Kafka key column to avoid conflicts
with columns from the payload. The default is `kafka.key`.
+
+Putting it together, the following input format (that accepts default values
for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
Review Comment:
```suggestion
Putting it together, the following input format (that uses the default
values for `timestampColumnName`, `headerColumnPrefix`, and `keyColumnName`)
```
##########
docs/development/extensions-core/kafka-ingestion.md:
##########
@@ -131,40 +132,83 @@ The following example demonstrates a supervisor spec for
Kafka that uses the `JS
### Kafka input format supervisor spec example
-If you want to ingest data from other fields in addition to the Kafka message
contents, you can use the `kafka` input format. The `kafka` input format lets
you ingest:
-- the event key field
-- event headers
-- the Kafka event timestamp
-- the Kafka event value that stores the payload.
-
-For example, consider the following structure for a message that represents a
fictitious wiki edit in a development environment:
-- **Event headers**: {"environment": "development"}
-- **Event key**: {"key: "wiki-edit"}
-- **Event value**: \<JSON object with event payload containing the change
details\>
-- **Event timestamp**: "Nov. 10, 2021 at 14:06"
-
-When you use the `kafka` input format, you configure the way that Druid names
the dimensions created from the Kafka message:
-- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with named dimensions. The default is `kafka.header`. Considering the
header from the example, Druid maps the header to the following column:
`kafka.header.environment`.
-- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with other time columns. The default is
`kafka.timestamp`.
-- `keyColumnName`: Supply the name for the Kafka key column in Druid. The
default is `kafka.key`.
-Additionally, you must provide information about how Druid should parse the
data in the Kafka message:
+If you want to parse the Kafka metadata fields in addition to the Kafka
payload value contents, you can use the `kafka` input format.
+
+The `kafka` input format wraps around the payload parsing input format and
augments the data it outputs with the Kafka event timestamp,
+the Kafka event headers, and the key field that itself can be parsed using any
available InputFormat.
+
+For example, consider the following structure for a Kafka message that
represents a fictitious wiki edit in a development environment:
+
+- **Kafka timestamp**: `1680795276351`
+- **Kafka headers**:
+ - `env=development`
+ - `zone=z1`
+- **Kafka key**: `wiki-edit`
+- **Kafka payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+Using a `{ "type": "json" }` input format would only parse the payload value.
+You can parse out the Kafka metadata in addition to the payload by using the
`kafka` input format.
+
+You would configure it as follows:
+
+- `valueFormat`: Define how to parse the payload value. Set this to the
payload parsing input format (`{ "type": "json" }`).
+- `timestampColumnName`: Supply a custom name for the Kafka timestamp in the
Druid schema to avoid conflicts with columns from the payload. The default is
`kafka.timestamp`.
- `headerFormat`: The default "string" decodes UTF8-encoded strings from the
Kafka header. If you need another format, you can implement your own parser.
-- `keyFormat`: Takes a Druid `inputFormat` and uses the value for the first
key it finds. According to the example the value is "wiki-edit". It discards
the key name in this case. If you store the key as a string, use the `CSV`
input format. For example, if you have simple string for the the key
`wiki-edit`, you can use the following to parse the key:
+- `headerColumnPrefix`: Supply a prefix to the Kafka headers to avoid any
conflicts with columns from the payload. The default is `kafka.header.`.
+ Considering the header from the example, Druid maps the headers to the
following columns: `kafka.header.env`, `kafka.header.zone`.
+- `keyFormat`: Supply an input format to parse the key. Only the first value
will be used.
+ If, as in the example, you key values are simple stings then you can use the
`tsv` format to parse them.
```
- "keyFormat": {
- "type": "csv",
- "hasHeaderRow": false,
+ {
+ "type": "tsv",
"findColumnsFromHeader": false,
- "columns": ["key"]
- }
+ "columns": ["x"]
+ }
```
-- `valueFormat`: Define how to parse the message contents. You can use any of
the Druid input formats that work for Kafka.
+ Note that in case of the `tsv` format (also `csv` and `regex`) you need to
provide a `columns` array to make a valid input format but only the first one
will be used and it's name will be ignored in favor of `keyColumnName`.
Review Comment:
```suggestion
Note that for `tsv`,`csv`, and `regex` formats, you need to provide a
`columns` array to make a valid input format. Only the first one is used, and
its name will be ignored in favor of `keyColumnName`.
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
Review Comment:
```suggestion
| `keyFormat` | [InputFormat](#input-format) | Any [input
format](#input-format) to parse the Kafka key. It only processes the first
entry of the `inputFormat` field. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
+| `keyColumnName` | String | Name of the column for the kafka record's key.|
no (default = "kafka.key") |
+
+The Kafka input format augments the payload with information form the Kafka
timestamp, headers, and key.
+
+If there are conflicts between column names in the payload and those created
from the metadata the payload takes precedence.
+This is done to ensure that upgrading a Kafka ingestion to use the Kafka input
format (by taking its existing input format and setting it as the `valueFormat`
in the kafka format) can be done without losing any of the payload data.
+
+Here is a minimal example that only augments the parsed payload with the Kafka
timestamp column:
+
+```
+"ioConfig": {
+ "inputFormat": {
+ "type": "kafka",
+ "valueFormat": {
+ "type": "json"
+ }
+ },
+ ...
+}
+```
+
+Here is a complete example:
+
+```
+"ioConfig": {
+ "inputFormat": {
+ "type": "kafka",
+ "valueFormat": {
+ "type": "json"
+ }
+ "timestampColumnName": "kafka.timestamp",
+ "headerFormat": {
+ "type": "string",
+ "encoding": "UTF-8"
+ },
+ "headerColumnPrefix": "kafka.header.",
+ "keyFormat": {
+ "type": "tsv",
+ "findColumnsFromHeader": false,
+ "columns": ["x"]
+ },
+ "keyColumnName": "kafka.key",
+ },
+ ...
+}
+```
+
+Note that if you want to use the `kafka.timestamp` as Druid's primary
timestamp (`__time`) you would need to refer to it as the `column` in the
`timestampSpec`:
Review Comment:
```suggestion
If you want to use `kafka.timestamp` as Druid's primary timestamp
(`__time`), specify it as the value for `column` in the `timestampSpec`:
```
##########
docs/ingestion/data-formats.md:
##########
@@ -604,7 +538,88 @@ For example:
}
```
-### FlattenSpec
+### Kafka
+
+`kafka` is a special input format that wraps a regular input format (which
goes in `valueFormat`) and allows you
+to parse the Kafka metadata (kafka timestamp, headers, and key) that is part
of Kafka messages.
+It should only be used when ingesting from Apache Kafka.
+
+Configure the Kafka `inputFormat` as follows:
+
+| Field | Type | Description | Required |
+|-------|------|-------------|----------|
+| `type` | String | Set value to `kafka`. | yes |
+| `valueFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka value payload. For details
about specifying the input format, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| yes |
+| `timestampColumnName` | String | Name of the column for the kafka record's
timestamp.| no (default = "kafka.timestamp") |
+| `headerColumnPrefix` | String | Custom prefix for all the header columns. |
no (default = "kafka.header.") |
+| `headerFormat` | Object | `headerFormat` specifies how to parse the Kafka
headers. Supports String types. Because Kafka header values are bytes, the
parser decodes them as UTF-8 encoded strings. To change this behavior,
implement your own parser based on the encoding style. Change the 'encoding'
type in `KafkaStringHeaderFormat` to match your custom implementation. | no |
+| `keyFormat` | [InputFormat](#input-format) | Any
[InputFormat](#input-format) to parse the Kafka key. It only processes the
first entry of the input format. For details, see [Specifying data
format](../development/extensions-core/kafka-supervisor-reference.md#specifying-data-format).
| no |
+| `keyColumnName` | String | Name of the column for the kafka record's key.|
no (default = "kafka.key") |
+
+The Kafka input format augments the payload with information form the Kafka
timestamp, headers, and key.
+
+If there are conflicts between column names in the payload and those created
from the metadata the payload takes precedence.
Review Comment:
```suggestion
If there are conflicts between column names in the payload and those created
from the metadata, the payload takes precedence.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]