[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishnendu Das updated KAFKA-14947:
-----------------------------------
    Attachment: Kafka_server_3.1.1_data_duplication_issue_log

> Duplicate records are getting created in the topic. 
> ----------------------------------------------------
>
>                 Key: KAFKA-14947
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14947
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.1.1
>            Reporter: krishnendu Das
>            Priority: Blocker
>         Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to 
> updateCommittableOffsets() to check if anything was there to perform the 
> committed offset or not. As the first poll is still not yet happened, this 
> function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the 
> WorkerSourceTask::execute(). *---------> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the 
> Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit 
> and its updates a reference variable committableOffsets, which will be used 
> further by the WorkerSourceTask::commitOffsets() function to perform actual 
> commit offset.
>  # Then Kafka connects the API poll() method is called from the 
> *WorkerSourceTask::execute().* *---------> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again 
> from the start, as the offsetStrorageReader::offset() didn’t give the latest 
> offset.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.---> These create duplicate data into the topic.
> ................................................
> ................................................
>  # WorkerSourceTask::commitOffsets() commits the offset.
> ................................................
> ................................................
>  # Then Kafka connects API poll() method is called from the 
> {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll
>  # This time offsetStrorageReader::offset() will be able to give the latest 
> offset.
>  # Kafka Connect API (using sleepy policy) reads the same source file from 
> the last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to