[ https://issues.apache.org/jira/browse/KAFKA-5716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16171249#comment-16171249 ]
Per Steffensen commented on KAFKA-5716: --------------------------------------- FWIW bq. WorkerSourceTask doesn't have synchronization around at least one call to commitSourceTask (which invokes the SourceTask.commit()) method so you may call this while the main thread is invoking some other method on the connector. {{commitSourceTask}} is only called by {{commitOffsets}}, which is only (intended to be?) called by {{SourceTaskOffsetCommitter}}, so no need to have synchronization on that, to prevent it from being called several times concurrently. {{commitOffsets}} is also called by main-thread on {{execute}}-finish, but besides that. I guess, with the way there are synchronization in {{commitOffsets}} it works nicely being called by {{SourceTaskOffsetCommitter}}, at the same time as the main-thread is calling other methods on the connector. It provides the semantics that: {{task.commit()}} is called when offsets has been written and flushed, and the offsets that has been written and flushed will always include everything from a {{task.poll}} (and all previous polls), BUT NOT NECESSARILY THE LAST (FEW) POLL(S). It is all or nothing per poll. The only problem I see is that it is does not necessarily contain offsets from records from the last (few) poll(s). That is what this KAFKA-5716 is about. bq. Correcting the code would be tough I think it is a matter of exposing to {{task.commit}} which polled records have had their offset-changes included in the offsets-write and which have not. Guess it could be done by just giving "the last record that has been included in the offset-write", indirectly indicating the last poll that has been included. Please see the attached patch. And then change {{SourceTask}} to: {code} public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws InterruptedException { commit(); } public void commit() throws InterruptedException { // This space intentionally left blank. } {code} And some updated JavaDoc - maybe including deprecation of {{commit()}}, because it is ambiguous. bq. I'd propose a different solution: let's just remove the method Uhhh, please do not. Then the {{SourceTask}}-implementation will have absolutely no clue when offsets has been written and flushed. {{commitRecord}} will not help, as it is completely independent when Kafka acknowledges outgoing records (triggering {{commitRecord}}) and when record-offsets are written to offset-storage (triggering {{commit}}) bq. Given its current state, it seems unlikely anyone is actually using this functionality anyway I am! And couldn't live without it :-) I cannot afford to acknowledge data going into my source-connector before the corresponding outgoing records AND their offset-changes has been written, flushed and acknowledged. Today I pretend that everything polled has been written to offset-storage when {{task.commit()}} is called, even though that is not always entirely true, but it is close. But not knowing anything about when offsets have been written to offset-storage would definitely leave me in the blind. bq. If someone did want this functionality, we likely should just add a new {{commit(Map<SourcePartition, SourceOffset>)}} variant That is a good alternative to the {{commit(SourceRecord lastPolledRecordWithOffsetsWritten)}} variant I suggested. bq. It'd be interesting to know if the method is overridden in any connectors that we know about in the wild. I hope it will often be the case, because that is the only clue you have about when offsets have been written to offset-storage, so if you are actually using connect-offsets (and you probably are, since you are using connect), you would probably have an incorrect system if you do not take advantage of {{commit()}}. You cannot use {{commitRecord}} for that. bq. If we just wanted a short term fix, we could definitely update the javadoc to make it clear what's actually happening and that this probably isn't what you want. That would be easy :-) But would not leave the {{SourceTask}}-implementor in any better position. It will still be undefined which polled records had their offsets written and which had not. This small change will help {code} public void commit(SourceRecord lastPolledRecordWithOffsetsWritten) throws InterruptedException { commit(); } public void commit() throws InterruptedException { // This space intentionally left blank. } {code} Or the {{commit(Map<SourcePartition, SourceOffset>)}} variant, you suggest. > Connect: When SourceTask.commit it is possible not everthing from > SourceTask.poll has been sent > ----------------------------------------------------------------------------------------------- > > Key: KAFKA-5716 > URL: https://issues.apache.org/jira/browse/KAFKA-5716 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Reporter: Per Steffensen > Priority: Minor > Attachments: KAFKA-5716.patch > > > Not looking at the very latest code, so the "problem" may have been corrected > recently. If so, I apologize. I found the "problem" by code-inspection alone, > so I may be wrong. Have not had the time to write tests to confirm. > According to java-doc on SourceTask.commit > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()}. This > method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > As I read this, when commit-method is called, the SourceTask-developer is > "told" that everything returned from poll up until "now" has been sent/stored > - both the outgoing messages and the associated connect-offsets. Looking at > the implementation it also seems that this is what it tries to > "guarantee/achieve". > But as I see read the code, it is not necessarily true > The following threads are involved > * Task-thread: WorkerSourceTask has its own thread running > WorkerSourceTask.execute. > * Committer-thread: From time to time SourceTaskOffsetCommitter is scheduled > to call WorkerSourceTask.commitOffsets (from a different thread) > The two thread synchronize (on the WorkerSourceTask-object) in sendRecord and > commitOffsets respectively, hindering the task-thread to add to > outstandingMessages and offsetWriter while committer-thread is marking what > has to be flushed in the offsetWriter and waiting for outstandingMessages to > be empty. This means that the offsets committed will be consistent with what > has been sent out, but not necessarily what has been polled. At least I do > not see why the following is not possible: > * Task-thread polls something from the task.poll > * Before task-thread gets to add (all) the polled records to > outstandingMessages and offsetWriter in sendRecords, committer-thread kicks > in and does its commiting, while hindering the task-thread adding the polled > records to outstandingMessages and offsetWriter > * Consistency will not have been compromised, but committer-thread will end > up calling task.commit (via WorkerSourceTask.commitSourceTask), without the > records just polled from task.poll has been sent or corresponding > connector-offsets flushed. > If I am right, I guess there are two way to fix it > * Either change the java-doc of SourceTask.commit, to something a-la (which I > do believe is true) > {quote} > Commit the offsets, up to the offsets that have been returned by \{@link > #poll()} > *and confirmed by a call to \{@link #commitRecord(SourceRecord)}*. > This method should block until the commit is complete. > SourceTasks are not required to implement this functionality; Kafka Connect > will record offsets > automatically. This hook is provided for systems that also need to store > offsets internally > in their own system. > {quote} > * or, fix the "problem" so that it actually does what the java-doc says :-) > If I am not right, of course I apologize for the inconvenience. I would > appreciate an explanation where my code-inspection is not correct, and why it > works even though I cannot see it. I will not expect such an explanation, > though. -- This message was sent by Atlassian JIRA (v6.4.14#64029)