[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
krishnendu Das updated KAFKA-14947: ----------------------------------- Attachment: (was: Kafka_server_3.1.1_data_duplication_issue_log) > Duplicate records are getting created in the topic. > ---------------------------------------------------- > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 3.1.1 > Reporter: krishnendu Das > Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *---------> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *---------> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > ................................................ > ................................................ > # WorkerSourceTask::commitOffsets() commits the offset. > ................................................ > ................................................ > # Then Kafka connects API poll() method is called from the > {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll > # This time offsetStrorageReader::offset() will be able to give the latest > offset. > # Kafka Connect API (using sleepy policy) reads the same source file from > the last read position. -- This message was sent by Atlassian Jira (v8.20.10#820010)