Suxing Lee created FLINK-35396:
----------------------------------
Summary: DynamoDB Streams Consumer consumption data is out of order
Key: FLINK-35396
URL: https://issues.apache.org/jira/browse/FLINK-35396
Project: Flink
Issue Type: Bug
Components: Connectors / AWS, Connectors / Kinesis
Affects Versions: 1.16.2
Reporter: Suxing Lee
When we use `FlinkDynamoDBStreamsConsumer` in
`flink-connector-aws/flink-connector-kinesis` to consume dynamodb stream data,
there is an out-of-order problem.
The service exception log is as follows:
{noformat}
2024-05-06 00:00:40,639 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 has discovered a new shard
StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***',
shard='{ShardId: shardId-00000001714924828427-d73b6b68,
ParentShardId: shardId-00000001714910797443-fb1d3b22,HashKeyRange:
{StartingHashKey: 0,EndingHashKey: 1},SequenceNumberRange:
{StartingSequenceNumber: 2958376400000000058201168012,}}'} due to resharding,
and will start consuming the shard from sequence number EARLIEST_SEQUENCE_NUM
with ShardConsumer 2807
......
......
......
2024-05-06 00:00:46,729 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 has reached the end of subscribed shard:
StreamShardHandle{streamName='arn:aws:dynamodb:ap-southeast-1:***',
shard='{ShardId: shardId-00000001714910797443-fb1d3b22,ParentShardId:
shardId-00000001714897099372-17932b9a,HashKeyRange: {StartingHashKey:
0,EndingHashKey: 1},SequenceNumberRange: {StartingSequenceNumber:
2955440900000000051102788386,}}'}
{noformat}
It looks like the failure process is:
`2024-05-06 00:00:40,639` A new shard is discovered and new sub-shards are
consumed immediately.(ShardId: shardId-00000001714924828427-d73b6b68).
`2024-05-06 00:00:46,729` Consume the old parent shard:(ShardId:
shardId-00000001714910797443-fb1d3b22)end.
There was a gap of 6 seconds. In other words, before the data consumption of
the parent shard has finished, the child shard has already started consuming
data. This causes the data we read to be sent downstream out of order.
https://github.com/apache/flink-connector-aws/blob/c688a8545ac1001c8450e8c9c5fe8bbafa13aeba/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L689-L740
This is because the code immediately submits `ShardConsumer` to
`shardConsumersExecutor` when `discoverNewShards` is created, and
`shardConsumersExecutor` is created through Executors.newCachedThreadPool(),
which does not limit the number of threads, causing new and old shards to be
consumed at the same time , so data consumption is out of order?
`flink-connector-kinesis` relies on `dynamodb-streams-kinesis-adapter` to
subscribe to messages from dynamodb stream. But why does
`dynamodb-streams-kinesis-adapter` directly consume data without similar
problems?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)