[
https://issues.apache.org/jira/browse/KAFKA-8069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jason Gustafson resolved KAFKA-8069.
------------------------------------
Resolution: Fixed
Fix Version/s: 2.1.2
2.0.2
> Committed offsets get cleaned up right after the coordinator loading them
> back from __consumer_offsets in broker with old inter-broker protocol version
> (< 2.2)
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-8069
> URL: https://issues.apache.org/jira/browse/KAFKA-8069
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 2.1.0, 2.2.0, 2.1.1, 2.1.2, 2.2.1
> Reporter: Zhanxiang (Patrick) Huang
> Assignee: Zhanxiang (Patrick) Huang
> Priority: Critical
> Fix For: 2.2.0, 2.0.2, 2.1.2
>
>
> After the 2.1 release, if the broker hasn't been upgrade to the latest
> inter-broker protocol version,
> the committed offsets stored in the __consumer_offset topic will get cleaned
> up way earlier than it should be when the offsets are loaded back from the
> __consumer_offset topic in GroupCoordinator, which will happen during
> leadership transition or after broker bounce.
> TL;DR
> For V1 on-disk format for __consumer_offsets, we have the *expireTimestamp*
> field and if the inter-broker protocol (IBP) version is prior to 2.1 (prior
> to
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets])
> for a kafka 2.1 broker, the logic of getting the expired offsets looks like:
> {code:java}
> def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long):
> Map[TopicPartition, OffsetAndMetadata] = {
> offsets.filter {
> case (topicPartition, commitRecordMetadataAndOffset) =>
> ... && {
> commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
> case None =>
> // current version with no per partition retention
> currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >=
> offsetRetentionMs
> case Some(expireTimestamp) =>
> // older versions with explicit expire_timestamp field => old expiration
> semantics is used
> currentTimestamp >= expireTimestamp
> }
> }
> }....
> }
> {code}
> The expireTimestamp in the on-disk offset record can only be set when storing
> the committed offset in the __consumer_offset topic. But the GroupCoordinator
> also has keep a in-memory representation for the expireTimestamp (see the
> codes above), which can be set in the following two cases:
> # Upon the GroupCoordinator receiving OffsetCommitRequest, the
> expireTimestamp is set using the following logic:
> {code:java}
> expireTimestamp = offsetCommitRequest.retentionTime match {
> case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
> case retentionTime => Some(currentTimestamp + retentionTime)
> }
> {code}
> In all the latest client versions, the consumer will set out
> OffsetCommitRequest with DEFAULT_RETENTION_TIME so the expireTimestamp will
> always be None in this case. *This means any committed offset set in this
> case will always hit the "case None" in the "getExpiredOffsets(...)" when
> coordinator is doing the cleanup, which is correct.*
> # Upon the GroupCoordinatorReceiving loading the committed offset stored in
> the __consumer_offsets topic from disk, the expireTimestamp is set using the
> following logic if IBP<2.1:
> {code:java}
> val expireTimestamp =
> value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
> {code}
> and the logic to persist the expireTimestamp is:
> {code:java}
> // OffsetCommitRequest.DEFAULT_TIMESTAMP = -1
> value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
> offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
> {code}
> Since the in-memory expireTimestamp will always be None in our case as
> mentioned in 1), we will always store -1 on-disk. Therefore, when the offset
> is loaded from the __consumer_offsets topic, the in-memory expireTimestamp
> will always be set to -1. *This means any committed offset set in this case
> will always hit "case Some(expireTimestamp)" in the "getExpiredOffsets(...)"
> when coordinator is doing the cleanup, which basically indicates we will
> always expire the committed offset on the first expiration check (which is
> shortly after they are loaded from __consumer_offsets topic)*.
> I am able to reproduce this bug on my local box with one broker using 2.*,1.*
> and 0.11.* consumer. The consumer will see null committed offset after the
> broker is bounced.
> This bug is introduced by [PR-5690|https://github.com/apache/kafka/pull/5690]
> in the kafka 2.1 release and the fix is very straight-forward, which is
> basically set the expireTimestamp to None if it is -1 in the on-disk format.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)