[ 
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dariusz Seweryn updated NIFI-14285:
-----------------------------------
    Description: 
h1. Proposal

Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
 * "Use Content as Value" — preserves previous functionality, i.e. record 
content is equal to Kinesis record data
 * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
`metadata` key, like so:

{{{}}
{{{}  "metadata": {
    "kinesis.stream": string,
    "aws.kinesis.shard.id": string,
    "aws.kinesis.sequence.number": string,
    "aws.kinesis.partition.key": string,
    "aws.kinesis.approximate.arrival.timestamp": long
  }{}}}{{{},{}}}
{{  value: record // original schema}}
{{}}}

h1. Context

ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
allows for putting multiple records into a single FlowFile output for efficient 
passing between processors.

Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
attribute — this limits the ability for constructing an efficient setup with 
exactly-once deliverability semantics. The sequence number is being committed 
to DynamoDB every few seconds leaving a possibility of duplicate records in 
case of restarts.

Given one can store the sequence number of the last uploaded row — it is 
possible to filter out all rows that would be otherwise duplicated. To do that 
the Sequence Number should be available for each individual record.

Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
the output message and wraps the metadata and content so each record content 
looks like:

{{{}}
{{  "metadata": }}{{{}{
    "topic": string,
    "partition": long,
    "offset": long,
    "timestamp": long
  }{}}}{{{},{}}}
{{  "headers": map,}}
{{  "key": object,}}
{{  "value": record // original schema}}
{{}}}

  was:
h1. Proposal

Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
 * "Use Content as Value" — preserves previous functionality, i.e. record 
content is equal to Kinesis record data
 * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
`metadata` key, like so:
```
{
  "metadata": {
    "kinesis.stream": string,
    "aws.kinesis.shard.id": string,
    "aws.kinesis.sequence.number": string,
    "aws.kinesis.partition.key": string,
    "aws.kinesis.approximate.arrival.timestamp": long
  },
  value: record // original schema
}
```

h1. Context

ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
allows for putting multiple records into a single FlowFile output for efficient 
passing between processors.

Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
attribute — this limits the ability for constructing an efficient setup with 
exactly-once deliverability semantics. The sequence number is being committed 
to DynamoDB every few seconds leaving a possibility of duplicate records in 
case of restarts.

Given one can store the sequence number of the last uploaded row — it is 
possible to filter out all rows that would be otherwise duplicated. To do that 
the Sequence Number should be available for each individual record.

Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
the output message and wraps the metadata and content so each record content 
looks like:
```
{
  "metadata": {
    "topic": string,
    "partition": long,
    "offset": long,
    "timestamp": long,
  },
  "headers": map,
  "key": object,
  "value": record,
}
```


> ConsumeKinesisStream wrap record metadata into FlowFile content
> ---------------------------------------------------------------
>
>                 Key: NIFI-14285
>                 URL: https://issues.apache.org/jira/browse/NIFI-14285
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 2.2.0
>            Reporter: Dariusz Seweryn
>            Priority: Major
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
>  * "Use Content as Value" — preserves previous functionality, i.e. record 
> content is equal to Kinesis record data
>  * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
> `metadata` key, like so:
> {{{}}
> {{{}  "metadata": {
>     "kinesis.stream": string,
>     "aws.kinesis.shard.id": string,
>     "aws.kinesis.sequence.number": string,
>     "aws.kinesis.partition.key": string,
>     "aws.kinesis.approximate.arrival.timestamp": long
>   }{}}}{{{},{}}}
> {{  value: record // original schema}}
> {{}}}
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
> allows for putting multiple records into a single FlowFile output for 
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
> attribute — this limits the ability for constructing an efficient setup with 
> exactly-once deliverability semantics. The sequence number is being committed 
> to DynamoDB every few seconds leaving a possibility of duplicate records in 
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is 
> possible to filter out all rows that would be otherwise duplicated. To do 
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
> the output message and wraps the metadata and content so each record content 
> looks like:
> {{{}}
> {{  "metadata": }}{{{}{
>     "topic": string,
>     "partition": long,
>     "offset": long,
>     "timestamp": long
>   }{}}}{{{},{}}}
> {{  "headers": map,}}
> {{  "key": object,}}
> {{  "value": record // original schema}}
> {{}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to