[ 
https://issues.apache.org/jira/browse/FLINK-35396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suxing Lee updated FLINK-35396:
-------------------------------
    Affects Version/s: 1.16.2

> DynamoDB Streams Consumer consumption data is out of order
> ----------------------------------------------------------
>
>                 Key: FLINK-35396
>                 URL: https://issues.apache.org/jira/browse/FLINK-35396
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / AWS, Connectors / Kinesis
>    Affects Versions: 1.16.2, aws-connector-4.2.0
>            Reporter: Suxing Lee
>            Priority: Major
>              Labels: AWS
>
> When we use `FlinkDynamoDBStreamsConsumer` in 
> `flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream 
> data, there is an out-of-order problem.
> The service exception log is as follows:
> {noformat}
> 2024-05-06 00:00:40,639 INFO  
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] 
> - Subtask 0 has discovered a new shard 
> StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', 
> shard='{ShardId: shardId-00000001714924828427-d73b6b68,
>   ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange: 
> {StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange: 
> {StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding, 
> and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM 
> with ShardConsumer 2807
> ......
> ......
> ......
> 2024-05-06 00:00:46,729 INFO  
> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] 
> - Subtask 0 has reached the end of subscribed shard: 
> StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***', 
> shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId: 
> shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey: 
> 0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber: 
> 2955440900000000051102788386,}}'}
> {noformat}
> It looks like the failure process is:
> `2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are 
> consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68).
> `2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId: 
> shardId-00000001714910797443-fb1d3b22)end.
> There was a gap of 6 seconds. In other words, before the data consumption of 
> the parent shard has finished, the child shard has already started consuming 
> data. This causes the data we read to be sent downstream out of order.
> https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L689-L740
>  
> This is because the code immediately submits `ShardConsumer` to 
> `shardConsumersExecutor` when `discoverNewShards` is created, and 
> `shardConsumersExecutor` is created through Executors.newCachedThreadPool(), 
> which does not limit the number of threads, causing new and old shards to be 
> consumed at the same time , so data consumption is out of order?
> `flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to 
> subscribe to messages from dynamodb stream. But why does 
> `dynamodb-streams-kinesis-adapter` directly consume data without similar 
> problems?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to