[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16120618#comment-16120618
 ] 

Per Steffensen edited comment on KAFKA-5716 at 8/9/17 8:36 PM:
---------------------------------------------------------------

Maybe something a-la the change I did in attached KAFKA-5716.patch (relative to 
branch 0.10.2). I took the liberty to change method-naming and java-doc in the 
SourceTask, to something that I like better. Those changes can of course be 
left out! It did them, because I really do not think of commitRecord -> 
recordCommitted and commit -> offsetsCommitted as (only) a chance for the 
SourceTask to commit something. I think it is rare that it wants to "commit" 
something, but it is likely that it wants to do something else - I know I do in 
my SourceTask implementations. In general those methods are notifications to 
the SourceTask that the framework has sent/acknowledged/written/flushed 
something, and a chance for the SourceTask to react appropriately (potentially 
doing lots of different things that cannot be characterized as committing).


was (Author: steff1193):
Maybe something a-la the change I did in attached KAFKA-5716.patch. I took the 
liberty to change method-naming and java-doc in the SourceTask, to something 
that I like better. Those changes can of course be left out! It did them, 
because I really do not think of commitRecord -> recordCommitted and commit -> 
offsetsCommitted as (only) a chance for the SourceTask to commit something. I 
think it is rare that it wants to "commit" something, but it is likely that it 
wants to do something else - I know I do in my SourceTask implementations. In 
general those methods are notifications to the SourceTask that the framework 
has sent/acknowledged/written/flushed something, and a chance for the 
SourceTask to react appropriately (potentially doing lots of different things 
that cannot be characterized as committing).

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to