[
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dariusz Seweryn updated NIFI-14285:
-----------------------------------
Description:
h1. Proposal
Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
* "Use Content as Value" — preserves previous functionality, i.e. record
content is equal to Kinesis record data
* "Use Wrapper" — wraps Kinesis record content under `value` key and adds
`metadata` key, like so:
{{{}}
{{{} "metadata": {
"kinesis.stream": string,
"aws.kinesis.shard.id": string,
"aws.kinesis.sequence.number": string,
"aws.kinesis.partition.key": string,
"aws.kinesis.approximate.arrival.timestamp": long
}{}}}{{{},{}}}
{{ value: record // original schema}}
{{}}}
h1. Context
ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which
allows for putting multiple records into a single FlowFile output for efficient
passing between processors.
Problem is that such FlowFile has only a single `aws.kinesis.sequence.number`
attribute — this limits the ability for constructing an efficient setup with
exactly-once deliverability semantics. The sequence number is being committed
to DynamoDB every few seconds leaving a possibility of duplicate records in
case of restarts.
Given one can store the sequence number of the last uploaded row — it is
possible to filter out all rows that would be otherwise duplicated. To do that
the Sequence Number should be available for each individual record.
Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of
the output message and wraps the metadata and content so each record content
looks like:
{{{}}
{{ "metadata": }}{{{}{
"topic": string,
"partition": long,
"offset": long,
"timestamp": long
}{}}}{{{},{}}}
{{ "headers": map,}}
{{ "key": object,}}
{{ "value": record // original schema}}
{{}}}
was:
h1. Proposal
Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
* "Use Content as Value" — preserves previous functionality, i.e. record
content is equal to Kinesis record data
* "Use Wrapper" — wraps Kinesis record content under `value` key and adds
`metadata` key, like so:
```
{
"metadata": {
"kinesis.stream": string,
"aws.kinesis.shard.id": string,
"aws.kinesis.sequence.number": string,
"aws.kinesis.partition.key": string,
"aws.kinesis.approximate.arrival.timestamp": long
},
value: record // original schema
}
```
h1. Context
ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which
allows for putting multiple records into a single FlowFile output for efficient
passing between processors.
Problem is that such FlowFile has only a single `aws.kinesis.sequence.number`
attribute — this limits the ability for constructing an efficient setup with
exactly-once deliverability semantics. The sequence number is being committed
to DynamoDB every few seconds leaving a possibility of duplicate records in
case of restarts.
Given one can store the sequence number of the last uploaded row — it is
possible to filter out all rows that would be otherwise duplicated. To do that
the Sequence Number should be available for each individual record.
Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of
the output message and wraps the metadata and content so each record content
looks like:
```
{
"metadata": {
"topic": string,
"partition": long,
"offset": long,
"timestamp": long,
},
"headers": map,
"key": object,
"value": record,
}
```
> ConsumeKinesisStream wrap record metadata into FlowFile content
> ---------------------------------------------------------------
>
> Key: NIFI-14285
> URL: https://issues.apache.org/jira/browse/NIFI-14285
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Affects Versions: 2.2.0
> Reporter: Dariusz Seweryn
> Priority: Major
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
> * "Use Content as Value" — preserves previous functionality, i.e. record
> content is equal to Kinesis record data
> * "Use Wrapper" — wraps Kinesis record content under `value` key and adds
> `metadata` key, like so:
> {{{}}
> {{{} "metadata": {
> "kinesis.stream": string,
> "aws.kinesis.shard.id": string,
> "aws.kinesis.sequence.number": string,
> "aws.kinesis.partition.key": string,
> "aws.kinesis.approximate.arrival.timestamp": long
> }{}}}{{{},{}}}
> {{ value: record // original schema}}
> {{}}}
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which
> allows for putting multiple records into a single FlowFile output for
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number`
> attribute — this limits the ability for constructing an efficient setup with
> exactly-once deliverability semantics. The sequence number is being committed
> to DynamoDB every few seconds leaving a possibility of duplicate records in
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is
> possible to filter out all rows that would be otherwise duplicated. To do
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of
> the output message and wraps the metadata and content so each record content
> looks like:
> {{{}}
> {{ "metadata": }}{{{}{
> "topic": string,
> "partition": long,
> "offset": long,
> "timestamp": long
> }{}}}{{{},{}}}
> {{ "headers": map,}}
> {{ "key": object,}}
> {{ "value": record // original schema}}
> {{}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)