[ 
https://issues.apache.org/jira/browse/KAFKA-14947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17718513#comment-17718513
 ] 

Yash Mayya commented on KAFKA-14947:
------------------------------------

[~krishnendudas] the offset commits for source connectors were asynchronous and 
periodic even in AK 2.6.2, see [this 
class|https://github.com/apache/kafka/blob/da65af02e5856e3429259e26eb49986122e34747/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L47].

 
{quote}With our existing connect API code and with the Kafka server (2.6.2), 
our ingestion mechanism was working fine in the live environment. We checked 
the Kafka server (2.6.2) WorkerSourceTask::execute() method, and it was 
following the below-mentioned execution path
 # Poll the task for the new data.
 # If get any data, save the new data into the Kafka topic.
 # Commit the offset.{quote}
 

This isn't exactly accurate as the offset commit part isn't done synchronously 
with the poll and the produce (to the Kafka topic). Perhaps the 
*offset.flush.interval.ms* worker configuration (which determines the offset 
commit interval) is different between your two environments?

 
{quote}But that willn't be persistent. At every start, the object will be 
reset. Any suggestion, on how we can make it persistent in the new Kafka server 
(3.1.1) 
{quote}
 

That will always be the case for restarts which is why I mentioned in my 
previous comment that source offsets are typically queried from the offset 
storage only during connector / task startup. I'm not sure I follow why the 
local in-memory offset can't be used during regular running of the connector 
task?

> Duplicate records are getting created in the topic. 
> ----------------------------------------------------
>
>                 Key: KAFKA-14947
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14947
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.1.1
>            Reporter: krishnendu Das
>            Priority: Blocker
>         Attachments: Kafka_server_3.1.1_data_duplication_issue_log
>
>
> We are using Kafka connect API (version 2.3.0) and Kafka  (version 3.1.1) for 
> data ingestion purposes. Previously we were using Kafka (version 2.6.2) and 
> the same Kafka connect API (version 2.3.0). The data ingestion was happening 
> properly. 
>  
> Recently we updated the Kafka version from 2.6.2 to 3.1.1.
> Post update we are facing duplicate data issues from the source connector 
> into the Kafka topic. After debugging the 3.1.1 code, we saw one new function
> {*}updateCommittableOffsets{*}() got added and called inside the 
> {*}WorkerSourceTask::execute{*}() as part of bug fix --"KAFKA-12226: Commit 
> source task offsets without blocking on batch delivery (#11323)"
>  
> Now because of this function, we are observing this scenario
>  # Inside the execute() at the start of the flow, the call goes to 
> updateCommittableOffsets() to check if anything was there to perform the 
> committed offset or not. As the first poll is still not yet happened, this 
> function didn't find anything for commit.
>  # Then Kafka connects API poll() method is called from the 
> WorkerSourceTask::execute(). *---------> 1st poll*
>  # Kafka Connect API (using sleepy policy) reads one source file from the 
> Cloud source directory.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.
>  # During the 2nd poll updateCommittableOffsets() found some offset to commit 
> and its updates a reference variable committableOffsets, which will be used 
> further by the WorkerSourceTask::commitOffsets() function to perform actual 
> commit offset.
>  # Then Kafka connects the API poll() method is called from the 
> *WorkerSourceTask::execute().* *---------> 2nd poll*
>  # Kafka Connect API (using sleepy policy) reads the same source file again 
> from the start, as the offsetStrorageReader::offset() didn’t give the latest 
> offset.
>  # Read the whole content of the file and send the result set Kafka server to 
> write to the Kafka topic.---> These create duplicate data into the topic.
> ................................................
> ................................................
>  # WorkerSourceTask::commitOffsets() commits the offset.
> ................................................
> ................................................
>  # Then Kafka connects API poll() method is called from the 
> {*}WorkerSourceTask::execute(){*}. ---------> 3rd poll
>  # This time offsetStrorageReader::offset() will be able to give the latest 
> offset.
>  # Kafka Connect API (using sleepy policy) reads the same source file from 
> the last read position.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to