[ 
https://issues.apache.org/jira/browse/NIFI-14285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dariusz Seweryn updated NIFI-14285:
-----------------------------------
    Description: 
h1. Proposal

Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
 * "Use Content as Value" — preserves previous functionality, i.e. record 
content is equal to Kinesis record data
 * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
`metadata` key, like so:

??{??
??  "metadata": {??
??    "kinesis.stream": string,??
??    "aws.kinesis.shard.id": string,??
??    "aws.kinesis.sequence.number": string,??
??    "aws.kinesis.partition.key": string,??
??    "aws.kinesis.approximate.arrival.timestamp": long??
??  },??
??  value: record // original schema??
??}??

h1. Context

ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
allows for putting multiple records into a single FlowFile output for efficient 
passing between processors.

Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
attribute — this limits the ability for constructing an efficient setup with 
exactly-once deliverability semantics. The sequence number is being committed 
to DynamoDB every few seconds leaving a possibility of duplicate records in 
case of restarts.

Given one can store the sequence number of the last uploaded row — it is 
possible to filter out all rows that would be otherwise duplicated. To do that 
the Sequence Number should be available for each individual record.

Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
the output message and wraps the metadata and content so each record content 
looks like:

??{??
??  "metadata": {??
??    "topic": string,??
??    "partition": long,??
??    "offset": long,??
??    "timestamp": long??
??  },??
??  "headers": map,??
??  "key": object,??
??  "value": record // original schema??
??}??

  was:
h1. Proposal

Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
`RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
 * "Use Content as Value" — preserves previous functionality, i.e. record 
content is equal to Kinesis record data
 * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
`metadata` key, like so:

{{{}}
{{{}  "metadata": {
    "kinesis.stream": string,
    "aws.kinesis.shard.id": string,
    "aws.kinesis.sequence.number": string,
    "aws.kinesis.partition.key": string,
    "aws.kinesis.approximate.arrival.timestamp": long
  }{}}}{{{},{}}}
{{  value: record // original schema}}
{{}}}

h1. Context

ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
allows for putting multiple records into a single FlowFile output for efficient 
passing between processors.

Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
attribute — this limits the ability for constructing an efficient setup with 
exactly-once deliverability semantics. The sequence number is being committed 
to DynamoDB every few seconds leaving a possibility of duplicate records in 
case of restarts.

Given one can store the sequence number of the last uploaded row — it is 
possible to filter out all rows that would be otherwise duplicated. To do that 
the Sequence Number should be available for each individual record.

Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
`WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
the output message and wraps the metadata and content so each record content 
looks like:

{{{}}
{{  "metadata": }}{{{}{
    "topic": string,
    "partition": long,
    "offset": long,
    "timestamp": long
  }{}}}{{{},{}}}
{{  "headers": map,}}
{{  "key": object,}}
{{  "value": record // original schema}}
{{}}}


> ConsumeKinesisStream wrap record metadata into FlowFile content
> ---------------------------------------------------------------
>
>                 Key: NIFI-14285
>                 URL: https://issues.apache.org/jira/browse/NIFI-14285
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>    Affects Versions: 2.2.0
>            Reporter: Dariusz Seweryn
>            Priority: Major
>
> h1. Proposal
> Add `USE_WRAPPER` property to `ConsumeKinesisStream` processor. Once a 
> `RECORD_WRITER` property is set — `USE_WRAPPER` could be set to either:
>  * "Use Content as Value" — preserves previous functionality, i.e. record 
> content is equal to Kinesis record data
>  * "Use Wrapper" — wraps Kinesis record content under `value` key and adds 
> `metadata` key, like so:
> ??{??
> ??  "metadata": {??
> ??    "kinesis.stream": string,??
> ??    "aws.kinesis.shard.id": string,??
> ??    "aws.kinesis.sequence.number": string,??
> ??    "aws.kinesis.partition.key": string,??
> ??    "aws.kinesis.approximate.arrival.timestamp": long??
> ??  },??
> ??  value: record // original schema??
> ??}??
> h1. Context
> ConsumeKinesisStream supports using `Record Reader` and `Record Writer` which 
> allows for putting multiple records into a single FlowFile output for 
> efficient passing between processors.
> Problem is that such FlowFile has only a single `aws.kinesis.sequence.number` 
> attribute — this limits the ability for constructing an efficient setup with 
> exactly-once deliverability semantics. The sequence number is being committed 
> to DynamoDB every few seconds leaving a possibility of duplicate records in 
> case of restarts.
> Given one can store the sequence number of the last uploaded row — it is 
> possible to filter out all rows that would be otherwise duplicated. To do 
> that the Sequence Number should be available for each individual record.
> Prior art: `ConsumeKafka` has a property `USE_WRAPPER` that leads to using a 
> `WrapperRecordStreamKafkaMessageConverter` that alters the `RecordSchema` of 
> the output message and wraps the metadata and content so each record content 
> looks like:
> ??{??
> ??  "metadata": {??
> ??    "topic": string,??
> ??    "partition": long,??
> ??    "offset": long,??
> ??    "timestamp": long??
> ??  },??
> ??  "headers": map,??
> ??  "key": object,??
> ??  "value": record // original schema??
> ??}??



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to