[ 
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Handermann updated NIFI-14285:
------------------------------------
    Summary: Add Output Strategy with Wrapper Option to ConsumeKinesisStream  
(was: ConsumeKinesisStream wrap record metadata into FlowFile content)

> Add Output Strategy with Wrapper Option to ConsumeKinesisStream
> ---------------------------------------------------------------
>
>                 Key: NIFI-14285
>                 URL: https://issues.apache.org/jira/browse/NIFI-14285
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 2.2.0
>            Reporter: Dariusz Seweryn
>            Assignee: Dariusz Seweryn
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
>  * "Use Content as Value" — preserves previous functionality, i.e. record 
> content is equal to Kinesis record data
>  * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
> `metadata` key, like so:
> ??{??
> ??  "metadata": {??
> ??    "kinesis.stream": string,??
> ??    "aws.kinesis.shard.id": string,??
> ??    "aws.kinesis.sequence.number": string,??
> ??    "aws.kinesis.partition.key": string,??
> ??    "aws.kinesis.approximate.arrival.timestamp": long??
> ??  },??
> ??  value: record // original schema??
> ??}??
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
> allows for putting multiple records into a single FlowFile output for 
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
> attribute — this limits the ability for constructing an efficient setup with 
> exactly-once deliverability semantics. The sequence number is being committed 
> to DynamoDB every few seconds leaving a possibility of duplicate records in 
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is 
> possible to filter out all rows that would be otherwise duplicated. To do 
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
> the output message and wraps the metadata and content so each record content 
> looks like:
> ??{??
> ??  "metadata": {??
> ??    "topic": string,??
> ??    "partition": long,??
> ??    "offset": long,??
> ??    "timestamp": long??
> ??  },??
> ??  "headers": map,??
> ??  "key": object,??
> ??  "value": record // original schema??
> ??}??



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to