[ 
https://issues.apache.org/jira/browse/KAFKA-4746?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15858709#comment-15858709
 ] 

Grant Henke commented on KAFKA-4746:
------------------------------------

I just mean that often when working with a compacted topic you read from the 
start of the topic every time your process restarts to see or rebuild "the 
current state". 

But you are right, that is a bit of an overstatement. There are likely cases 
where a process commits an offset to try and resume where it left off being 
well aware that the offsets could have been cleaned since it was last 
committed. As I understand before KIP-58/KAFKA-1981 it would be a race 
condition against the log cleaner whether the committed offset is valid or not. 
 Committing the offset also doesn't do anything to help ensure you didn't miss 
an offset that was cleaned while your application was not processing. 

KIP-58/KAFKA-1981 Fixed that to ensure some time passed before cleaning with  
min.compaction.lag.ms/min.compaction.lag.bytes/min.compaction.lag.messages

> Offsets can be committed for the offsets topic
> ----------------------------------------------
>
>                 Key: KAFKA-4746
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4746
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.9.0.0
>            Reporter: Grant Henke
>
> Though this is likely rare and I don't suspect to many people would try to do 
> this, we should prevent users from committing offsets for the offsets topic 
> into the offsets topic. This would essentially create an infinite loop in any 
> consumer consuming from that topic. Also committing offsets for a compacted 
> topic doesn't likely make sense anyway. 
> Here is a quick failing test I wrote to see if this guard exists:
> {code:title=OffsetCommitTest.scala|borderStyle=solid}
>  @Test
>   def testOffsetTopicOffsetCommit() {
>     val topic1 = "__consumer_offsets"
>     // Commit an offset
>     val expectedReplicaAssignment = Map(0  -> List(1))
>     val commitRequest = OffsetCommitRequest(
>       groupId = group,
>       requestInfo = immutable.Map(TopicAndPartition(topic1, 0) -> 
> OffsetAndMetadata(offset=42L)),
>       versionId = 2
>     )
>     val commitResponse = simpleConsumer.commitOffsets(commitRequest)
>     assertEquals(Errors.INVALID_TOPIC_EXCEPTION.code, 
> commitResponse.commitStatus.get(TopicAndPartition(topic1, 0)).get)
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to