[ https://issues.apache.org/jira/browse/KAFKA-5273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16020408#comment-16020408 ]
ASF GitHub Bot commented on KAFKA-5273: --------------------------------------- GitHub user apurvam opened a pull request: https://github.com/apache/kafka/pull/3119 KAFKA-5273: Make KafkaConsumer.committed query the server for all partitions Before this patch the consumer would return the cached offsets for partitions in its current assignment. This worked when all the offset commits went through the consumer. With KIP-98, offsets can be committed transactionally through the producer. This means that relying on cached positions in the consumer returns incorrect information: since commits go through the producer, the cache is never updated. Hence we need to update the `KafkaConsumer.committed` method to always lookup the server for the last committed offset to ensure it gets the correct information every time. You can merge this pull request into a Git repository by running: $ git pull https://github.com/apurvam/kafka KAFKA-5273-kafkaconsumer-committed-should-always-hit-server Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/3119.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3119 ---- commit 17eb7eab70a40e3d4208a56463bb418350f80950 Author: Apurva Mehta <apu...@confluent.io> Date: 2017-05-22T23:36:38Z Make KafkaConsumer.committed hit the server for all partitions, even those in its current assignment ---- > KafkaConsumer.committed() should get latest committed offsets from the server > ----------------------------------------------------------------------------- > > Key: KAFKA-5273 > URL: https://issues.apache.org/jira/browse/KAFKA-5273 > Project: Kafka > Issue Type: Sub-task > Components: clients, core, producer > Reporter: Apurva Mehta > Assignee: Apurva Mehta > Priority: Blocker > Labels: exactly-once > Fix For: 0.11.0.0 > > > Currently, the `KafkaConsumer.committed(topicPartition)` will return the > current position of the consumer for that partition if the consumer has been > assigned the partition. Otherwise, it will lookup the committed position from > the server. > With the new producer `sendOffsetsToTransaction` api, we get into a state > where we can commit the offsets for an assigned partition through the > producer. So the consumer doesn't update it's cached view and subsequently > returns a stale committed offset for it's assigned partition. > We should either update the consumer's cache when offsets are committed > through the producer, or drop the cache totally and always lookup the server > to get the committed offset. This way the `committed` method will always > return the latest committed offset for any partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)