[ https://issues.apache.org/jira/browse/KAFKA-4746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858709#comment-15858709 ]
Grant Henke commented on KAFKA-4746: ------------------------------------ I just mean that often when working with a compacted topic you read from the start of the topic every time your process restarts to see or rebuild "the current state". But you are right, that is a bit of an overstatement. There are likely cases where a process commits an offset to try and resume where it left off being well aware that the offsets could have been cleaned since it was last committed. As I understand before KIP-58/KAFKA-1981 it would be a race condition against the log cleaner whether the committed offset is valid or not. Committing the offset also doesn't do anything to help ensure you didn't miss an offset that was cleaned while your application was not processing. KIP-58/KAFKA-1981 Fixed that to ensure some time passed before cleaning with min.compaction.lag.ms/min.compaction.lag.bytes/min.compaction.lag.messages > Offsets can be committed for the offsets topic > ---------------------------------------------- > > Key: KAFKA-4746 > URL: https://issues.apache.org/jira/browse/KAFKA-4746 > Project: Kafka > Issue Type: Bug > Components: core > Affects Versions: 0.9.0.0 > Reporter: Grant Henke > > Though this is likely rare and I don't suspect to many people would try to do > this, we should prevent users from committing offsets for the offsets topic > into the offsets topic. This would essentially create an infinite loop in any > consumer consuming from that topic. Also committing offsets for a compacted > topic doesn't likely make sense anyway. > Here is a quick failing test I wrote to see if this guard exists: > {code:title=OffsetCommitTest.scala|borderStyle=solid} > @Test > def testOffsetTopicOffsetCommit() { > val topic1 = "__consumer_offsets" > // Commit an offset > val expectedReplicaAssignment = Map(0 -> List(1)) > val commitRequest = OffsetCommitRequest( > groupId = group, > requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> > OffsetAndMetadata(offset=42L)), > versionId = 2 > ) > val commitResponse = simpleConsumer.commitOffsets(commitRequest) > assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, > commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get) > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)