[ 
https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167956#comment-16167956
 ] 

Randall Hauch commented on KAFKA-5716:
--------------------------------------

[~ewencp], if you wouldn't mind, would you take a look at this? In the issue 
description above, [~steff1193] suggests two possible fixes:

1) Changing the JavaDoc to say what the code *actually* does. This _may not_ 
require a KIP if the consensus is that this *clarifies* existing behavior 
rather than changing the behavior.
2) Correcting the code to implement the specified behavior. This definitely 
requires a KIP as it would change the public API to add a method that does the 
correct thing (this could be done in a backward compatible manner).

I'm unable to think of another fix, but perhaps something is more obvious to 
you. Thoughts?

> Connect: When SourceTask.commit it is possible not everthing from 
> SourceTask.poll has been sent
> -----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-5716
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5716
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Per Steffensen
>            Priority: Minor
>         Attachments: KAFKA-5716.patch
>
>
> Not looking at the very latest code, so the "problem" may have been corrected 
> recently. If so, I apologize. I found the "problem" by code-inspection alone, 
> so I may be wrong. Have not had the time to write tests to confirm.
> According to java-doc on SourceTask.commit
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}. This
> method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> As I read this, when commit-method is called, the SourceTask-developer is 
> "told" that everything returned from poll up until "now" has been sent/stored 
> - both the outgoing messages and the associated connect-offsets. Looking at 
> the implementation it also seems that this is what it tries to 
> "guarantee/achieve".
> But as I see read the code, it is not necessarily true
> The following threads are involved
> * Task-thread: WorkerSourceTask has its own thread running 
> WorkerSourceTask.execute.
> * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled 
> to call WorkerSourceTask.commitOffsets (from a different thread)
> The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and 
> commitOffsets respectively, hindering the task-thread to add to 
> outstandingMessages and offsetWriter while committer-thread is marking what 
> has to be flushed in the offsetWriter and waiting for outstandingMessages to 
> be empty. This means that the offsets committed will be consistent with what 
> has been sent out, but not necessarily what has been polled. At least I do 
> not see why the following is not possible:
> * Task-thread polls something from the task.poll
> * Before task-thread gets to add (all) the polled records to 
> outstandingMessages and offsetWriter in sendRecords, committer-thread kicks 
> in and does its commiting, while hindering the task-thread adding the polled 
> records to outstandingMessages and offsetWriter
> * Consistency will not have been compromised, but committer-thread will end 
> up calling task.commit (via WorkerSourceTask.commitSourceTask), without the 
> records just polled from task.poll has been sent or corresponding 
> connector-offsets flushed.
> If I am right, I guess there are two way to fix it
> * Either change the java-doc of SourceTask.commit, to something a-la (which I 
> do believe is true)
> {quote}
> Commit the offsets, up to the offsets that have been returned by \{@link 
> #poll()}
> *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*.
> This method should block until the commit is complete.
> SourceTasks are not required to implement this functionality; Kafka Connect 
> will record offsets
> automatically. This hook is provided for systems that also need to store 
> offsets internally
> in their own system.
> {quote}
> * or, fix the "problem" so that it actually does what the java-doc says :-)
> If I am not right, of course I apologize for the inconvenience. I would 
> appreciate an explanation where my code-inspection is not correct, and why it 
> works even though I cannot see it. I will not expect such an explanation, 
> though.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to