Darcy Lin created FLINK-33484:
---------------------------------
Summary: Flink Kafka Connector Offset Lag Issue with Transactional
Data and Read Committed Isolation Level
Key: FLINK-33484
URL: https://issues.apache.org/jira/browse/FLINK-33484
Project: Flink
Issue Type: Bug
Components: Connectors / Kafka
Affects Versions: 1.17.1
Environment: Flink 1.17.1
kafka 2.5.1
Reporter: Darcy Lin
We have encountered an issue with the Flink Kafka connector when consuming
transactional data from Kafka with the {{isolation.level}} set to
{{read_committed}} ({{{}setProperty("isolation.level", "read_committed"){}}}).
The problem is that even when all the data from a topic is consumed, the offset
lag is not 0, but 1. However, when using the Kafka Java client to consume the
same data, this issue does not occur.
We suspect that this issue arises due to the way Flink Kafka connector
calculates the offset. The problem seems to be in the
{{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method.
When saving the offset, the method calls
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this
statement works correctly in a regular Kafka scenario, it might not be accurate
when the {{read_committed}} mode is used. We believe that it should be
{{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as
transactional data in Kafka occupies an additional offset to store the
transaction marker.
We request the Flink team to investigate this issue and provide us with
guidance on how to resolve it.
Thank you for your attention and support.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)