Dariusz Seweryn created NIFI-14285:
--------------------------------------

             Summary: ConsumeKinesisStream wrap record metadata into FlowFile 
content
                 Key: NIFI-14285
                 URL: https://issues.apache.org/jira/browse/NIFI-14285
             Project: Apache NiFi
          Issue Type: New Feature
          Components: Extensions
    Affects Versions: 2.2.0
            Reporter: Dariusz Seweryn


h1. Proposal

Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
 * "Use Content as Value" — preserves previous functionality, i.e. record 
content is equal to Kinesis record data
 * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
`metadata` key, like so:
```
{
  "metadata": {
    "kinesis.stream": string,
    "aws.kinesis.shard.id": string,
    "aws.kinesis.sequence.number": string,
    "aws.kinesis.partition.key": string,
    "aws.kinesis.approximate.arrival.timestamp": long
  },
  value: record // original schema
}
```

h1. Context

ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
allows for putting multiple records into a single FlowFile output for efficient 
passing between processors.

Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
attribute — this limits the ability for constructing an efficient setup with 
exactly-once deliverability semantics. The sequence number is being committed 
to DynamoDB every few seconds leaving a possibility of duplicate records in 
case of restarts.

Given one can store the sequence number of the last uploaded row — it is 
possible to filter out all rows that would be otherwise duplicated. To do that 
the Sequence Number should be available for each individual record.

Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
the output message and wraps the metadata and content so each record content 
looks like:
```
{
  "metadata": {
    "topic": string,
    "partition": long,
    "offset": long,
    "timestamp": long,
  },
  "headers": map,
  "key": object,
  "value": record,
}
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to