[ 
https://issues.apache.org/jira/browse/KAFKA-5731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16128198#comment-16128198
 ] 

ASF GitHub Bot commented on KAFKA-5731:
---------------------------------------

Github user rhauch closed the pull request at:

    https://github.com/apache/kafka/pull/3672


> Connect WorkerSinkTask out of order offset commit can lead to inconsistent 
> state
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-5731
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5731
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>            Reporter: Jason Gustafson
>            Assignee: Randall Hauch
>             Fix For: 0.11.0.1
>
>
> In Connect's WorkerSinkTask, we do sequence number validation to ensure that 
> offset commits are handled in the right order 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L199).
>  
> Unfortunately, for asynchronous commits, the {{lastCommittedOffsets}} field 
> is overridden regardless of this sequence check as long as the response had 
> no error 
> (https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L284):
> {code:java}
>             OffsetCommitCallback cb = new OffsetCommitCallback() {
>                 @Override
>                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> 
> offsets, Exception error) {
>                     if (error == null) {
>                         lastCommittedOffsets = offsets;
>                     }
>                     onCommitCompleted(error, seqno);
>                 }
>             };
> {code}
> Hence if we get an out of order commit, then the internal state will be 
> inconsistent. To fix this, we should only override {{lastCommittedOffsets}} 
> after sequence validation as part of the {{onCommitCompleted(...)}} method.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to