[
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Handermann resolved NIFI-14285.
-------------------------------------
Fix Version/s: 2.3.0
Resolution: Fixed
> Add Output Strategy with Wrapper Option to ConsumeKinesisStream
> ---------------------------------------------------------------
>
> Key: NIFI-14285
> URL: https://issues.apache.org/jira/browse/NIFI-14285
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 2.2.0
> Reporter: Dariusz Seweryn
> Assignee: Dariusz Seweryn
> Priority: Major
> Fix For: 2.3.0
>
> Time Spent: 2h
> Remaining Estimate: 0h
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
> * "Use Content as Value" — preserves previous functionality, i.e. record
> content is equal to Kinesis record data
> * "Use Wrapper" — wraps Kinesis record content under `value` key and adds
> `metadata` key, like so:
> ??{??
> ?? "metadata": {??
> ?? "kinesis.stream": string,??
> ?? "aws.kinesis.shard.id": string,??
> ?? "aws.kinesis.sequence.number": string,??
> ?? "aws.kinesis.partition.key": string,??
> ?? "aws.kinesis.approximate.arrival.timestamp": long??
> ?? },??
> ?? value: record // original schema??
> ??}??
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which
> allows for putting multiple records into a single FlowFile output for
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number`
> attribute — this limits the ability for constructing an efficient setup with
> exactly-once deliverability semantics. The sequence number is being committed
> to DynamoDB every few seconds leaving a possibility of duplicate records in
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is
> possible to filter out all rows that would be otherwise duplicated. To do
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of
> the output message and wraps the metadata and content so each record content
> looks like:
> ??{??
> ?? "metadata": {??
> ?? "topic": string,??
> ?? "partition": long,??
> ?? "offset": long,??
> ?? "timestamp": long??
> ?? },??
> ?? "headers": map,??
> ?? "key": object,??
> ?? "value": record // original schema??
> ??}??
--
This message was sent by Atlassian Jira
(v8.20.10#820010)