This is an automated email from the ASF dual-hosted git repository.
zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new c324f09108d Kinesis input format docs (#16840)
c324f09108d is described below
commit c324f09108d95f37aef5f38ad4ac5098194dac73
Author: zachjsh <[email protected]>
AuthorDate: Tue Aug 6 18:53:10 2024 -0400
Kinesis input format docs (#16840)
* SQL syntax error should target USER persona
* * revert change to queryHandler and related tests, based on review
comments
* * add test
* Docs for Kinesis input format
* * remove reference to kafka
* * fix spellcheck error
* Apply suggestions from code review
Co-authored-by: 317brian <[email protected]>
---------
Co-authored-by: 317brian <[email protected]>
---
docs/ingestion/data-formats.md | 142 ++++++++++++++++++++++++++++++++++++
docs/ingestion/kinesis-ingestion.md | 1 +
2 files changed, 143 insertions(+)
diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index c9c23896a28..96d8597e7f3 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -731,6 +731,148 @@ This query returns:
|--------------------|-----------|---------------|---------------|
| `development` | `wiki-edit` | `1680795276351` | `wiki-edits` |
+### Kinesis
+
+The `kinesis` input format lets you parse the Kinesis metadata fields in
addition to the Kinesis payload value contents.
+It should only be used when ingesting from Kinesis.
+
+The `kinesis` input format wraps around the payload parsing input format and
augments the data it outputs with the Kinesis event timestamp and partition
key, the `ApproximateArrivalTimestamp ` and `PartitionKey` fields in the
Kinesis record.
+
+If there are conflicts between column names in the payload and those created
from the metadata, the payload takes precedence.
+This ensures that upgrading a Kinesis ingestion to use the Kinesis input
format (by taking its existing input format and setting it as the
`valueFormat`) can be done without losing any of the payload data.
+
+Configure the Kinesis `inputFormat` as follows:
+
+| Field | Type | Description
| Required | Default |
+|-------|------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------|
+| `type` | String | Set value to `kinesis`. | yes ||
+| `valueFormat` | [InputFormat](#input-format) | The [input
format](#input-format) to parse the Kinesis value payload. | yes ||
+| `partitionKeyColumnName` | String | The name of the column for the Kinesis
partition key. This field is useful when ingesting data from multiple
partitions into the same datasource. | no | `kinesis.partitionKey` |
+| `timestampColumnName` | String | The name of the column for the Kinesis
timestamp. | no | `kinesis.timestamp` |
+
+#### Example
+
+Using `{ "type": "json" }` as the input format would only parse the payload
value.
+To parse the Kinesis metadata in addition to the payload, use the `kinesis`
input format.
+
+For example, consider the following structure for a Kinesis record that
represents an edit in a development environment:
+
+- **Kinesis timestamp**: `1680795276351`
+- **Kinesis partition key**: `partition-1`
+- **Kinesis payload value**:
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo
Toraut","delta":31,"namespace":"Main"}`
+
+You would configure it as follows:
+
+```json
+{
+ "ioConfig": {
+ "inputFormat": {
+ "type": "kinesis",
+ "valueFormat": {
+ "type": "json"
+ },
+ "timestampColumnName": "kinesis.timestamp",
+ "partitionKeyColumnName": "kinesis.partitionKey"
+ }
+ }
+}
+```
+
+You would parse the example record as follows:
+
+```json
+{
+ "channel": "#sv.wikipedia",
+ "timestamp": "2016-06-27T00:00:11.080Z",
+ "page": "Salo Toraut",
+ "delta": 31,
+ "namespace": "Main",
+ "kinesis.timestamp": 1680795276351,
+ "kinesis.partitionKey": "partition-1"
+}
+```
+
+If you want to use `kinesis.timestamp` as Druid's primary timestamp
(`__time`), specify it as the value for `column` in the `timestampSpec`:
+
+```json
+"timestampSpec": {
+ "column": "kinesis.timestamp",
+ "format": "millis"
+}
+```
+
+Finally, add these Kinesis metadata columns to the `dimensionsSpec` or set
your `dimensionsSpec` to automatically detect columns.
+
+The following supervisor spec demonstrates how to ingest the Kinesis
timestamp, and partition key into Druid dimensions:
+
+<details>
+<summary>Click to view the example</summary>
+
+```json
+{
+ "type": "kinesis",
+ "spec": {
+ "ioConfig": {
+ "type": "kinesis",
+ "consumerProperties": {
+ "bootstrap.servers": "localhost:9092"
+ },
+ "topic": "wiki-edits",
+ "inputFormat": {
+ "type": "kinesis",
+ "valueFormat": {
+ "type": "json"
+ },
+ "headerFormat": {
+ "type": "string"
+ },
+ "keyFormat": {
+ "type": "tsv",
+ "findColumnsFromHeader": false,
+ "columns": ["x"]
+ }
+ },
+ "useEarliestOffset": true
+ },
+ "dataSchema": {
+ "dataSource": "wikiticker",
+ "timestampSpec": {
+ "column": "timestamp",
+ "format": "posix"
+ },
+ "dimensionsSpec": {
+ "useSchemaDiscovery": true,
+ "includeAllDimensions": true
+ },
+ "granularitySpec": {
+ "queryGranularity": "none",
+ "rollup": false,
+ "segmentGranularity": "day"
+ }
+ },
+ "tuningConfig": {
+ "type": "kinesis"
+ }
+ }
+}
+```
+</details>
+
+After Druid ingests the data, you can query the Kinesis metadata columns as
follows:
+
+```sql
+SELECT
+ "kinesis.timestamp",
+ "kinesis.partitionKey"
+FROM "wikiticker"
+```
+
+This query returns:
+
+| `kinesis.timestamp` | `kinesis.topic` |
+|---------------------|-----------------|
+| `1680795276351` | `partition-1` |
+
## FlattenSpec
You can use the `flattenSpec` object to flatten nested data, as an alternative
to the Druid [nested columns](../querying/nested-columns.md) feature, and for
nested input formats unsupported by the feature. It is an object within the
`inputFormat` object.
diff --git a/docs/ingestion/kinesis-ingestion.md
b/docs/ingestion/kinesis-ingestion.md
index 3b4c5de8548..8ea2cfd4af3 100644
--- a/docs/ingestion/kinesis-ingestion.md
+++ b/docs/ingestion/kinesis-ingestion.md
@@ -139,6 +139,7 @@ The Kinesis indexing service supports both
[`inputFormat`](data-formats.md#input
The Kinesis indexing service supports the following values for `inputFormat`:
+* `kinesis`
* `csv`
* `tvs`
* `json`
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]