Dariusz Seweryn created NIFI-14285:
--------------------------------------
Summary: ConsumeKinesisStream wrap record metadata into FlowFile
content
Key: NIFI-14285
URL: https://issues.apache.org/jira/browse/NIFI-14285
Project: Apache NiFi
Issue Type: New Feature
Components: Extensions
Affects Versions: 2.2.0
Reporter: Dariusz Seweryn
h1. Proposal
Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
* "Use Content as Value" — preserves previous functionality, i.e. record
content is equal to Kinesis record data
* "Use Wrapper" — wraps Kinesis record content under `value` key and adds
`metadata` key, like so:
```
{
"metadata": {
"kinesis.stream": string,
"aws.kinesis.shard.id": string,
"aws.kinesis.sequence.number": string,
"aws.kinesis.partition.key": string,
"aws.kinesis.approximate.arrival.timestamp": long
},
value: record // original schema
}
```
h1. Context
ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which
allows for putting multiple records into a single FlowFile output for efficient
passing between processors.
Problem is that such FlowFile has only a single `aws.kinesis.sequence.number`
attribute — this limits the ability for constructing an efficient setup with
exactly-once deliverability semantics. The sequence number is being committed
to DynamoDB every few seconds leaving a possibility of duplicate records in
case of restarts.
Given one can store the sequence number of the last uploaded row — it is
possible to filter out all rows that would be otherwise duplicated. To do that
the Sequence Number should be available for each individual record.
Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of
the output message and wraps the metadata and content so each record content
looks like:
```
{
"metadata": {
"topic": string,
"partition": long,
"offset": long,
"timestamp": long,
},
"headers": map,
"key": object,
"value": record,
}
```
--
This message was sent by Atlassian Jira
(v8.20.10#820010)