[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16170270#comment-16170270
 ] 

Ewen Cheslack-Postava commented on KAFKA-5716:
----------------------------------------------

The basic analysis sounds right. That javadoc is from way back from the initial 
PR. I can't remember if we even had the separate thread for offset commits at 
that point, so it may have just grown outdated (and that may have been the case 
just based on my development of the initial version and may have just been 
missed in the initial review, which given it was an [11k line 
patch|https://github.com/apache/kafka/pull/99], I'm not surprised some things 
were missed in the review...

There's actually another problem that reviewing this revealed that, at best is 
important but not clearly documented and at worst is another bug: 
WorkerSourceTask doesn't have synchronization around at least one call to 
commitSourceTask (which invokes the SourceTask.commit()) method so you may call 
this while the main thread is invoking some other method on the connector. I 
think we document elsewhere that stop() has this behavior (to interrupt 
polling) and so perhaps people would have handled synchronization properly, but 
more likely there would just be bugs.

Correcting the code would be tough. We could update the javadoc, but I'd 
propose a different solution: let's just remove the method. It's bad that it's 
not actually doing what it claims and correcting it would require committing 
offsets synchronously which we don't want to do either. Given its current 
state, it seems unlikely anyone is actually using this functionality anyway. 
The intent was to allow you to, e.g., delete or ack data once you know it has 
been committed, but it clearly isn't serving that purpose and commitRecord() 
was added to give finer grained feedback. If someone did want this 
functionality, we likely should just add a new commit(Map<SourcePartition, 
SourceOffset>) variant that can actually express to the user what we actually 
want... It'd be interesting to know if the method is overridden in any 
connectors that we know about in the wild.

If we just wanted a short term fix, we could definitely update the javadoc to 
make it clear what's actually happening and that this probably isn't what you 
want.

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to