[
https://issues.apache.org/jira/browse/FLINK-35115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17838121#comment-17838121
]
Vadim Vararu commented on FLINK-35115:
--------------------------------------
[~a.pilipenko] Yes, I can reproduce this consistently.
I've enabled this logger:
{code:java}
logger.kinesis.name = org.apache.flink.streaming.connectors.kinesis
logger.kinesis.level = DEBUG {code}
and got these last logs on TM before triggering the stop-with-savepoint (the
log at 2024-04-17 14:05:11,753 is the last checkpoint):
{code:java}
2024-04-17 14:05:06,330 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:05:11,753 DEBUG
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
Snapshotting state ...
2024-04-17 14:05:11,753 DEBUG
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
Snapshotted state, last processed sequence numbers:
{StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression',
shardId='shardId-000000000000', parentShardId='null',
adjacentParentShardId='null', startingHashKey='0',
endingHashKey='340282366920938463463374607431768211455',
startingSequenceNumber='49618213417511572504838906841289148356109207047268990978',
endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554},
checkpoint id: 1, timestamp: 1713351911711
2024-04-17 14:05:16,652 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:05:26,930 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:05:27,032 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] -
stream: kinesis-dev-1-20210513-v3-contract-impression, shard:
shardId-000000000000, millis behind latest: 0, batch size: 120
24-04-17 14:05:37,229 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:05:43,079 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] -
stream: kinesis-dev-1-20210513-v3-contract-impression, shard:
shardId-000000000000, millis behind latest: 0, batch size: 1
2024-04-17 14:05:47,752 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:05:50,677 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] -
stream: kinesis-dev-1-20210513-v3-contract-impression, shard:
shardId-000000000000, millis behind latest: 0, batch size: 1{code}
now I trigger the stop-with-savepoint:
{code:java}
2024-04-17 14:05:52,168 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
2024-04-17 14:05:52,169 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Cancelled discovery
2024-04-17 14:05:52,169 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ...
2024-04-17 14:05:52,645 DEBUG
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
snapshotState() called on closed source; returning null.
2024-04-17 14:05:52,669 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Starting shutdown of shard consumer threads and AWS SDK resources of subtask 0
...
2024-04-17 14:05:52,670 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Shutting down the shard consumer threads of subtask 0 ... {code}
and here I start from the savepoint:
{code:java}
2024-04-17 14:12:56,691 INFO
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Setting
restore state in the FlinkKinesisConsumer. Using the following offsets:
{org.apache.flink.streaming.connectors.kinesis.model.StreamShardMetadata$EquivalenceWrapper@f5191c51=49646826022549514041791139259235973731492142339223191554}
2024-04-17 14:12:58,370 INFO
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask
0 is seeding the fetcher with restored shard
StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression',
shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey:
0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49618213417511572504838906841289148356109207047268990978,}}'}, starting state
set to the restored sequence number
49646826022549514041791139259235973731492142339223191554
2024-04-17 14:12:58,371 INFO
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 will start consuming seeded shard
StreamShardHandle{streamName='kinesis-dev-1-20210513-v3-contract-impression',
shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey:
0,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange:
{StartingSequenceNumber:
49618213417511572504838906841289148356109207047268990978,}}'} from sequence
number 49646826022549514041791139259235973731492142339223191554 with
ShardConsumer 0
2024-04-17 14:12:59,211 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] -
Subtask 0 is trying to discover new shards that were created due to resharding
...
2024-04-17 14:12:59,297 DEBUG
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer [] -
stream: kinesis-dev-1-20210513-v3-contract-impression, shard:
shardId-000000000000, millis behind latest: 0, batch size: 3 {code}
There are no other logs between the above actions.
I did not dive into the source code, but it seems odd to me that these type of
line:
{code:java}
2024-04-17 14:05:11,753 DEBUG
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] -
Snapshotted state, last processed sequence numbers:
{StreamShardMetadata{streamName='kinesis-dev-1-20210513-v3-contract-impression',
shardId='shardId-000000000000', parentShardId='null',
adjacentParentShardId='null', startingHashKey='0',
endingHashKey='340282366920938463463374607431768211455',
startingSequenceNumber='49618213417511572504838906841289148356109207047268990978',
endingSequenceNumber='null'}=49646826022549514041791139259235973731492142339223191554},
checkpoint id: 1, timestamp: 1713351911711 {code}
has been logged at the last checkpoint but there is not anything similar at the
stop-with-savepoint.
I've let pass only the logs for
_org.apache.flink.streaming.connectors.kinesis._ Let me know if you need any
other logs.
> Kinesis connector writes wrong Kinesis sequence number at stop with savepoint
> -----------------------------------------------------------------------------
>
> Key: FLINK-35115
> URL: https://issues.apache.org/jira/browse/FLINK-35115
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kinesis
> Affects Versions: 1.16.3, 1.18.1
> Environment: The issue happens in a *Kinesis -> Flink -> Kafka*
> exactly-once setup with:
> * Flink versions checked 1.16.3 and 1.18.1
> * Kinesis connector checked 1.16.3 and 4.2.0-1.18
> * checkpointing configured at 1 minute with EXACTLY_ONCE mode:
> {code:java}
> StreamExecutionEnvironment execEnv =
> StreamExecutionEnvironment.getExecutionEnvironment ();
> execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig
> ().setCheckpointTimeout (90000); execEnv.getCheckpointConfig
> ().setCheckpointStorage (CHECKPOINTS_PATH); {code}
> * Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
> {code:java}
> Properties sinkConfig = new Properties ();
> sinkConfig.put ("transaction.timeout.ms", 480000);
> KafkaSink<String> sink = KafkaSink.<String>builder ()
> .setBootstrapServers ("localhost:9092")
> .setTransactionalIdPrefix ("test-prefix")
> .setDeliverGuarantee (EXACTLY_ONCE)
> .setKafkaProducerConfig (sinkConfig)
> .setRecordSerializer (
> (KafkaRecordSerializationSchema<String>) (element, context,
> timestamp) -> new ProducerRecord<> (
> "test-output-topic", null, element.getBytes ()))
> .build (); {code}
> * Kinesis consumer defined as:
> {code:java}
> FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
> FlinkKinesisConsumer<> ("test-stream",
> new AbstractDeserializationSchema<> () {
> @Override
> public ByteBuffer deserialize (byte[] bytes) {
> // Return
> return ByteBuffer.wrap (bytes);
> }
> }, props); {code}
>
> Reporter: Vadim Vararu
> Assignee: Aleksandr Pilipenko
> Priority: Major
> Labels: kinesis
>
> Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a
> stop-with-savepoint, Flink duplicates in Kafka all the records between the
> last checkpoint and the savepoint at resume:
> * Event1 is written to Kinesis
> * Event1 is processed by Flink
> * Event1 is committed to Kafka at the checkpoint
> *
> ............................................................................
> * Event2 is written to Kinesis
> * Event2 is processed by Flink
> * Stop with savepoint is triggered manually
> * Event2 is committed to Kafka
> *
> ............................................................................
> * Job is resumed from the savepoint
> * *{color:#FF0000}Event2 is written again to Kafka at the first
> checkpoint{color}*
>
> {color:#172b4d}I believe that it's a Kinesis connector issue for 2
> reasons:{color}
> * I've checked the actual Kinesis sequence number in the _metadata file
> generated at stop-with-savepoint and it's the one from the checkpoint before
> the savepoint instead of being the one of the last record committed to Kafka.
> * I've tested exactly the save job with Kafka as source instead of Kinesis
> as source and the behaviour does not reproduce.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)