[ 
https://issues.apache.org/jira/browse/FLINK-4080?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-4080:
---------------------------------------
    Description: 
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current code of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.

  was:
I've occasionally experienced unsuccessful ManualExactlyOnceTest after several 
tries.

Kinesis records of the same aggregated batch will have the same sequence 
number, and different sub-sequence numbers 
(http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
 The current state of the consumer is committing state every time it finishes 
processing a record, even de-aggregated ones. This is a bug since this will 
incorrectly mark all remaining records of the de-aggregated batch as processed 
in the state.

Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
records (either non- or de-aggregated) instead of the basic `Record` class. 
This gives access to whether or not the record was originally aggregated. If we 
encounter a de-aggregated record, don't update state until we finished 
processing the last record of the batch.


> Kinesis consumer not exactly-once if stopped in the middle of processing 
> aggregated records
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-4080
>                 URL: https://issues.apache.org/jira/browse/FLINK-4080
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Kinesis Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Critical
>             Fix For: 1.1.0
>
>
> I've occasionally experienced unsuccessful ManualExactlyOnceTest after 
> several tries.
> Kinesis records of the same aggregated batch will have the same sequence 
> number, and different sub-sequence numbers 
> (http://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-consumer-deaggregation.html).
>  The current code of the consumer is committing state every time it finishes 
> processing a record, even de-aggregated ones. This is a bug since this will 
> incorrectly mark all remaining records of the de-aggregated batch as 
> processed in the state.
> Proposed fix: Use the extended `UserRecord` class in KCL to represent all 
> records (either non- or de-aggregated) instead of the basic `Record` class. 
> This gives access to whether or not the record was originally aggregated. If 
> we encounter a de-aggregated record, don't update state until we finished 
> processing the last record of the batch.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to