[
https://issues.apache.org/jira/browse/FLINK-33484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17784716#comment-17784716
]
Martijn Visser commented on FLINK-33484:
----------------------------------------
[~lintingbin] Can you please use the connector version I've listed above and
try again? That's a newer version with more fixes for various bugs
> Flink Kafka Connector Offset Lag Issue with Transactional Data and Read
> Committed Isolation Level
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-33484
> URL: https://issues.apache.org/jira/browse/FLINK-33484
> Project: Flink
> Issue Type: Bug
> Components: Connectors / Kafka
> Affects Versions: 1.17.1
> Environment: Flink 1.17.1
> kafka 2.5.1
> Reporter: Darcy Lin
> Priority: Major
>
> We have encountered an issue with the Flink Kafka connector when consuming
> transactional data from Kafka with the {{isolation.level}} set to
> {{read_committed}} ({{{}setProperty("isolation.level",
> "read_committed"){}}}). The problem is that even when all the data from a
> topic is consumed, the offset lag is not 0, but 1. However, when using the
> Kafka Java client to consume the same data, this issue does not occur.
> We suspect that this issue arises due to the way Flink Kafka connector
> calculates the offset. The problem seems to be in the
> {{KafkaRecordEmitter.java}} file, specifically in the {{emitRecord}} method.
> When saving the offset, the method calls
> {{{}splitState.setCurrentOffset(consumerRecord.offset() + 1);{}}}. While this
> statement works correctly in a regular Kafka scenario, it might not be
> accurate when the {{read_committed}} mode is used. We believe that it should
> be {{{}splitState.setCurrentOffset(consumerRecord.offset() + 2);{}}}, as
> transactional data in Kafka occupies an additional offset to store the
> transaction marker.
> We request the Flink team to investigate this issue and provide us with
> guidance on how to resolve it.
> Thank you for your attention and support.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)