This is an automated email from the ASF dual-hosted git repository.

zachjsh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new c324f09108d Kinesis input format docs (#16840)
c324f09108d is described below

commit c324f09108d95f37aef5f38ad4ac5098194dac73
Author: zachjsh <[email protected]>
AuthorDate: Tue Aug 6 18:53:10 2024 -0400

    Kinesis input format docs (#16840)
    
    * SQL syntax error should target USER persona
    
    * * revert change to queryHandler and related tests, based on review 
comments
    
    * * add test
    
    * Docs for Kinesis input format
    
    * * remove reference to kafka
    
    * * fix spellcheck error
    
    * Apply suggestions from code review
    
    Co-authored-by: 317brian <[email protected]>
    
    ---------
    
    Co-authored-by: 317brian <[email protected]>
---
 docs/ingestion/data-formats.md      | 142 ++++++++++++++++++++++++++++++++++++
 docs/ingestion/kinesis-ingestion.md |   1 +
 2 files changed, 143 insertions(+)

diff --git a/docs/ingestion/data-formats.md b/docs/ingestion/data-formats.md
index c9c23896a28..96d8597e7f3 100644
--- a/docs/ingestion/data-formats.md
+++ b/docs/ingestion/data-formats.md
@@ -731,6 +731,148 @@ This query returns:
 |--------------------|-----------|---------------|---------------|
 | `development`      | `wiki-edit` | `1680795276351` | `wiki-edits`  |
 
+### Kinesis
+
+The `kinesis` input format lets you parse the Kinesis metadata fields in 
addition to the Kinesis payload value contents.
+It should only be used when ingesting from Kinesis.
+
+The `kinesis` input format wraps around the payload parsing input format and 
augments the data it outputs with the Kinesis event timestamp and partition 
key, the `ApproximateArrivalTimestamp ` and `PartitionKey` fields in the 
Kinesis record.
+
+If there are conflicts between column names in the payload and those created 
from the metadata, the payload takes precedence.
+This ensures that upgrading a Kinesis ingestion to use the Kinesis input 
format (by taking its existing input format and setting it as the 
`valueFormat`) can be done without losing any of the payload data.
+
+Configure the Kinesis `inputFormat` as follows:
+
+| Field | Type | Description                                                   
                                                                                
    | Required | Default             |
+|-------|------|---------------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------------|
+| `type` | String | Set value to `kinesis`. | yes ||
+| `valueFormat` | [InputFormat](#input-format) | The [input 
format](#input-format) to parse the Kinesis value payload. | yes ||
+| `partitionKeyColumnName` | String | The name of the column for the Kinesis 
partition key. This field is useful when ingesting data from multiple 
partitions into the same datasource. | no | `kinesis.partitionKey` |
+| `timestampColumnName` | String | The name of the column for the Kinesis 
timestamp. | no | `kinesis.timestamp` |
+
+#### Example
+
+Using `{ "type": "json" }` as the input format would only parse the payload 
value.
+To parse the Kinesis metadata in addition to the payload, use the `kinesis` 
input format.
+
+For example, consider the following structure for a Kinesis record that 
represents an edit in a development environment:
+
+- **Kinesis timestamp**: `1680795276351`
+- **Kinesis partition key**: `partition-1`
+- **Kinesis payload value**: 
`{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo 
Toraut","delta":31,"namespace":"Main"}`
+
+You would configure it as follows:
+
+```json
+{
+  "ioConfig": {
+    "inputFormat": {
+      "type": "kinesis",
+      "valueFormat": {
+        "type": "json"
+      },
+      "timestampColumnName": "kinesis.timestamp",
+      "partitionKeyColumnName": "kinesis.partitionKey"
+    }
+  }
+}
+```
+
+You would parse the example record as follows:
+
+```json
+{
+  "channel": "#sv.wikipedia",
+  "timestamp": "2016-06-27T00:00:11.080Z",
+  "page": "Salo Toraut",
+  "delta": 31,
+  "namespace": "Main",
+  "kinesis.timestamp": 1680795276351,
+  "kinesis.partitionKey": "partition-1"
+}
+```
+
+If you want to use `kinesis.timestamp` as Druid's primary timestamp 
(`__time`), specify it as the value for `column` in the `timestampSpec`:
+
+```json
+"timestampSpec": {
+  "column": "kinesis.timestamp",
+  "format": "millis"
+}
+```
+
+Finally, add these Kinesis metadata columns to the `dimensionsSpec` or  set 
your `dimensionsSpec` to automatically detect columns.
+
+The following supervisor spec demonstrates how to ingest the Kinesis 
timestamp, and partition key into Druid dimensions:
+
+<details>
+<summary>Click to view the example</summary>
+
+```json
+{
+  "type": "kinesis",
+  "spec": {
+    "ioConfig": {
+      "type": "kinesis",
+      "consumerProperties": {
+        "bootstrap.servers": "localhost:9092"
+      },
+      "topic": "wiki-edits",
+      "inputFormat": {
+        "type": "kinesis",
+        "valueFormat": {
+          "type": "json"
+        },
+        "headerFormat": {
+          "type": "string"
+        },
+        "keyFormat": {
+          "type": "tsv",
+          "findColumnsFromHeader": false,
+          "columns": ["x"]
+        }
+      },
+      "useEarliestOffset": true
+    },
+    "dataSchema": {
+      "dataSource": "wikiticker",
+      "timestampSpec": {
+        "column": "timestamp",
+        "format": "posix"
+      },
+      "dimensionsSpec": {
+        "useSchemaDiscovery": true,
+        "includeAllDimensions": true
+      },
+      "granularitySpec": {
+        "queryGranularity": "none",
+        "rollup": false,
+        "segmentGranularity": "day"
+      }
+    },
+    "tuningConfig": {
+      "type": "kinesis"
+    }
+  }
+}
+```
+</details>
+
+After Druid ingests the data, you can query the Kinesis metadata columns as 
follows:
+
+```sql
+SELECT
+  "kinesis.timestamp",
+  "kinesis.partitionKey"
+FROM "wikiticker"
+```
+
+This query returns:
+
+| `kinesis.timestamp` | `kinesis.topic` |
+|---------------------|-----------------|
+| `1680795276351`     | `partition-1`   |
+
 ## FlattenSpec
 
 You can use the `flattenSpec` object to flatten nested data, as an alternative 
to the Druid [nested columns](../querying/nested-columns.md) feature, and for 
nested input formats unsupported by the feature. It is an object within the 
`inputFormat` object.
diff --git a/docs/ingestion/kinesis-ingestion.md 
b/docs/ingestion/kinesis-ingestion.md
index 3b4c5de8548..8ea2cfd4af3 100644
--- a/docs/ingestion/kinesis-ingestion.md
+++ b/docs/ingestion/kinesis-ingestion.md
@@ -139,6 +139,7 @@ The Kinesis indexing service supports both 
[`inputFormat`](data-formats.md#input
 
 The Kinesis indexing service supports the following values for `inputFormat`:
 
+* `kinesis`
 * `csv`
 * `tvs`
 * `json`


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to