[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16160407#comment-16160407 ]
Randall Hauch commented on KAFKA-5716: -------------------------------------- [~steff1193], thanks for logging the issue and submitting a proposal for the change. You're analysis does seem to be correct, though I need to spend a bit more time verifying it, and I'd love to hear [~hachikuji]'s thoughts. Have you tried replicating this with a test case? It might not be straightforward considering it is dependent upon the timing of the WorkerSourceTask implementation, but is seems like it should be possible to replicate with a mock offset writer and consumer. Not only will this help prove your conjecture, but it also will help verify that we can correctly fix it. Also, please be aware that all changes to the Connect API *must* be binary compatible with previous releases so that existing SourceTask implementations need not be changed or even recompiled to run on newer versions of the framework. This has two implications for your proposed patch. First, we cannot change the signature of public API methods, so we need to avoid changing the name of {{commitRecord(...)}}. Like it or not, though, we're stuck with that naming convention. FWIW, I personally think the names of {{commitRecord(...)}}, {{commit()}, and even a new {{commitOffsets(SourceRecord)}} on the SourceTask actually _do_ make sense since they reflect the action that the source task should take. Second, we have to keep the existing {{commit()}} method, but we deprecate it and can add a new method such as {{commitOffsets(SourceRecord)}} that has the behavior you state and that should by default call the existing {{commit()}}. Doing this would allow existing SourceTask implementations to continue to work, albeit potentially with the incorrect assumptions/behavior as you argue above. However, any developer that wants to take advantage of the new/correct behavior can change their {{SourceTask}} implementations to instead use {{commitOffsets(SourceRecord)}} rather than {{commit()}} to switch to the correct behavior. Note that we should definitely have thorough JavaDoc for both {{commit()}} and {{commitOffsets(SourceRecord)}} that should say what they do by default and how implementations can take advantage of them. Again, I *think* your analysis is correct, but I need to spend time looking into this and hopefully get others to confirm it as well. In the meantime, it might be worth updating your patch and, as [~yuzhih...@gmail.com] mentioned, creating a pull request with your changes based the {{trunk}} branch. One advantage of maintaining binary compatibility is that we can more easily backport the changes to older branches, though just because we can doesn't mean we will. If the committers do backport and have any non-trivial merge conflicts, they'll ask for a separate dedicated PR for the older branch(es). Thanks again for all your work on this! > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Per Steffensen > Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)