[ https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718513#comment-17718513 ]
Yash Mayya commented on KAFKA-14947: ------------------------------------ [~krishnendudas] the offset commits for source connectors were asynchronous and periodic even in AK 2.6.2, see [this class|https://github.com/apache/kafka/blob/da65af02e5856e3429259e26eb49986122e34747/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L47]. {quote}With our existing connect API code and with the Kafka server (2.6.2), our ingestion mechanism was working fine in the live environment. We checked the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was following the below-mentioned execution path # Poll the task for the new data. # If get any data, save the new data into the Kafka topic. # Commit the offset.{quote} This isn't exactly accurate as the offset commit part isn't done synchronously with the poll and the produce (to the Kafka topic). Perhaps the *offset.flush.interval.ms* worker configuration (which determines the offset commit interval) is different between your two environments? {quote}But that willn't be persistent. At every start, the object will be reset. Any suggestion, on how we can make it persistent in the new Kafka server (3.1.1) {quote} That will always be the case for restarts which is why I mentioned in my previous comment that source offsets are typically queried from the offset storage only during connector / task startup. I'm not sure I follow why the local in-memory offset can't be used during regular running of the connector task? > Duplicate records are getting created in the topic. > ---------------------------------------------------- > > Key: KAFKA-14947 > URL: https://issues.apache.org/jira/browse/KAFKA-14947 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 3.1.1 > Reporter: krishnendu Das > Priority: Blocker > Attachments: Kafka_server_3.1.1_data_duplication_issue_log > > > We are using Kafka connect API (version 2.3.0) and Kafka (version 3.1.1) for > data ingestion purposes. Previously we were using Kafka (version 2.6.2) and > the same Kafka connect API (version 2.3.0). The data ingestion was happening > properly. > > Recently we updated the Kafka version from 2.6.2 to 3.1.1. > Post update we are facing duplicate data issues from the source connector > into the Kafka topic. After debugging the 3.1.1 code, we saw one new function > {*}updateCommittableOffsets{*}() got added and called inside the > {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit > source task offsets without blocking on batch delivery (#11323)" > > Now because of this function, we are observing this scenario > # Inside the execute() at the start of the flow, the call goes to > updateCommittableOffsets() to check if anything was there to perform the > committed offset or not. As the first poll is still not yet happened, this > function didn't find anything for commit. > # Then Kafka connects API poll() method is called from the > WorkerSourceTask::execute(). *---------> 1st poll* > # Kafka Connect API (using sleepy policy) reads one source file from the > Cloud source directory. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic. > # During the 2nd poll updateCommittableOffsets() found some offset to commit > and its updates a reference variable committableOffsets, which will be used > further by the WorkerSourceTask::commitOffsets() function to perform actual > commit offset. > # Then Kafka connects the API poll() method is called from the > *WorkerSourceTask::execute().* *---------> 2nd poll* > # Kafka Connect API (using sleepy policy) reads the same source file again > from the start, as the offsetStrorageReader::offset() didn’t give the latest > offset. > # Read the whole content of the file and send the result set Kafka server to > write to the Kafka topic.---> These create duplicate data into the topic. > ................................................ > ................................................ > # WorkerSourceTask::commitOffsets() commits the offset. > ................................................ > ................................................ > # Then Kafka connects API poll() method is called from the > {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll > # This time offsetStrorageReader::offset() will be able to give the latest > offset. > # Kafka Connect API (using sleepy policy) reads the same source file from > the last read position. -- This message was sent by Atlassian Jira (v8.20.10#820010)