Vadim Vararu created FLINK-35115:
------------------------------------
Summary: Kinesis connector writes wrong sequence number at stop
with savepoint
Key: FLINK-35115
URL: https://issues.apache.org/jira/browse/FLINK-35115
Project: Flink
Issue Type: Bug
Components: Connectors / Kinesis
Affects Versions: 1.16.3
Environment: The issue happens in a *Kinesis -> Flink -> Kafka*
exactly-once setup with:
* Flink versions checked 1.16.3 and 1.18.1
* Kinesis connector checked 1.16.3 and 4.2.0-1.18
* checkpointing configured at 1 minute with EXACTLY_ONCE mode:
{code:java}
StreamExecutionEnvironment execEnv =
StreamExecutionEnvironment.getExecutionEnvironment ();
execEnv.enableCheckpointing (60000,EXACTLY_ONCE); execEnv.getCheckpointConfig
().setCheckpointTimeout (90000); execEnv.getCheckpointConfig
().setCheckpointStorage (CHECKPOINTS_PATH); {code}
* Kafka sink configured with EXACTLY_ONCE semantic/delivery guarantee:
{code:java}
Properties sinkConfig = new Properties ();
sinkConfig.put ("transaction.timeout.ms", 480000);
KafkaSink<String> sink = KafkaSink.<String>builder ()
.setBootstrapServers ("localhost:9092")
.setTransactionalIdPrefix ("test-prefix")
.setDeliverGuarantee (EXACTLY_ONCE)
.setKafkaProducerConfig (sinkConfig)
.setRecordSerializer (
(KafkaRecordSerializationSchema<String>) (element, context, timestamp)
-> new ProducerRecord<> (
"test-output-topic", null, element.getBytes ()))
.build (); {code}
* Kinesis consumer defined as:
{code:java}
FlinkKinesisConsumer<ByteBuffer> flinkKinesisConsumer = new
FlinkKinesisConsumer<> ("test-stream",
new AbstractDeserializationSchema<> () {
@Override
public ByteBuffer deserialize (byte[] bytes) {
// Return
return ByteBuffer.wrap (bytes);
}
}, props); {code}
Reporter: Vadim Vararu
Having an exactly-once Kinesis -> Flink -> Kafka job and triggering a
stop-with-savepoint, Flink duplicates in Kafka all the records between the last
checkpoint and the savepoint at resume:
* Event1 is written to Kinesis
* Event1 is processed by Flink
* Event1 is committed to Kafka at the checkpoint
* ............................................................................
* Event2 is written to Kinesis
* Event2 is processed by Flink
* Stop with savepoint is triggered manually
* Event2 is committed to Kafka
* ............................................................................
* Job is resumed from the savepoint
* *{color:#FF0000}Event2 is written again to Kafka at the first
checkpoint{color}*
{color:#172b4d}I believe that it's a Kinesis connector issue for 2
reasons:{color}
* I've checked the actual Kinesis sequence number in the _metadata file
generated at stop-with-savepoint and it's the one from the checkpoint before
the savepoint instead of being the one of the last record committed to Kafka.
* I've tested exactly the save job with Kafka as source instead of Kinesis as
source and the behaviour does not reproduce.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)