[ 
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932441#comment-17932441
 ] 

ASF subversion and git services commented on NIFI-14285:
--------------------------------------------------------

Commit 6d7805f2035f9d14683795cebfd2bd3b98634b67 in nifi's branch 
refs/heads/main from Dariusz Seweryn
[ https://gitbox.apache.org/repos/asf?p=nifi.git;h=6d7805f203 ]

NIFI-14285 Added Output Strategy with Wrapper to ConsumeKinesisStream (#9738)

Signed-off-by: David Handermann <[email protected]>

> ConsumeKinesisStream wrap record metadata into FlowFile content
> ---------------------------------------------------------------
>
>                 Key: NIFI-14285
>                 URL: https://issues.apache.org/jira/browse/NIFI-14285
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 2.2.0
>            Reporter: Dariusz Seweryn
>            Assignee: Dariusz Seweryn
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
>  * "Use Content as Value" — preserves previous functionality, i.e. record 
> content is equal to Kinesis record data
>  * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
> `metadata` key, like so:
> ??{??
> ??  "metadata": {??
> ??    "kinesis.stream": string,??
> ??    "aws.kinesis.shard.id": string,??
> ??    "aws.kinesis.sequence.number": string,??
> ??    "aws.kinesis.partition.key": string,??
> ??    "aws.kinesis.approximate.arrival.timestamp": long??
> ??  },??
> ??  value: record // original schema??
> ??}??
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
> allows for putting multiple records into a single FlowFile output for 
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
> attribute — this limits the ability for constructing an efficient setup with 
> exactly-once deliverability semantics. The sequence number is being committed 
> to DynamoDB every few seconds leaving a possibility of duplicate records in 
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is 
> possible to filter out all rows that would be otherwise duplicated. To do 
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
> the output message and wraps the metadata and content so each record content 
> looks like:
> ??{??
> ??  "metadata": {??
> ??    "topic": string,??
> ??    "partition": long,??
> ??    "offset": long,??
> ??    "timestamp": long??
> ??  },??
> ??  "headers": map,??
> ??  "key": object,??
> ??  "value": record // original schema??
> ??}??



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to