[ 
https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Cantero updated KAFKA-15035:
--------------------------------
    Summary: Consumer offsets can be deleted if kafka does not detect a 
consumer as empty  (was: Consumer offsets can be deleted immediately if kafka 
does not detect a consumer as empty)

> Consumer offsets can be deleted if kafka does not detect a consumer as empty
> ----------------------------------------------------------------------------
>
>                 Key: KAFKA-15035
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15035
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.7.2
>            Reporter: Sam Cantero
>            Priority: Major
>
> We've recently encountered a scenario where a consumer group got their 
> committed offsets deleted some minutes (around 3 minutes)  after the consumer 
> got into inactive state (the underlying node went away).
> As per 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
>  committed offsets for an active (i.e running) consumer group should not be 
> deleted. However, if a consumer becomes inactive, {+}the deletion of 
> committed offsets will not occur immediately{+}. Instead, the committed 
> offsets will only be removed if the consumer remains inactive for at least 
> the duration specified by 
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer 
> was only inactive for 5 minutes, so deletion should have not occurred.
> Later on KIP-496 was introduced to fix the following issue in KIP-211:
> {quote}When a consumer subscription changes, we are still left with the 
> committed offsets of the previous subscription. These will never be cleaned 
> up as long as the group remains active. We were aware of this problem in 
> KIP-211, but the solution was not implemented because the coordinator is 
> presently agnostic to join group metadata and we were unclear about the 
> compatibility implications of changing that.
> {quote}
> However this introduced a regression as explained in 
> https://issues.apache.org/jira/browse/KAFKA-13636.
> {quote}The group coordinator might delete invalid offsets during a group 
> rebalance. During a rebalance, the coordinator is relying on the last commit 
> timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state 
> modification {_}timestamp (currentStateTimestamp{_}) to detect expired 
> offsets.
> {quote}
> It is implied, that Kafka employs two approaches for offset expiration:
>  * The deletion timer is activated when a consumer group enters the Empty 
> state (i.e., not running). Once the timer exceeds the 
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
>  * If a consumer is in a "running" state (i.e., not in the Empty state) but 
> is no longer consuming from topics with committed offsets older than the 
> offset.retention.minutes duration, the committed offsets are deleted.
> However, the Kafka issue KAFKA-13636 specifically states that this situation 
> could occur during a group rebalance. In my particular scenario, I have 
> observed that the affected consumer group did not transition into the Empty 
> state, and I have encountered a "Stabilized" log line. I'm uncertain whether 
> this log line indicates that the rebalance had concluded at that point or 
> not. The reason why Kafka did not detect this consumer group as Empty remains 
> unclear.
> *Logs*
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in 
> state PreparingRebalance with old generation 432 (__consumer_offsets-16) 
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's 
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am - 
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
> milliseconds.{noformat}
> As a reference a regular consumer join/startup logs looks like this. The 
> group is stabilised and the assignment from the leader received.
> {noformat}
>  [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
> Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
> group instance id None) (kafka.coordinator.group.GroupCoordinator)
>  
> [GroupCoordinator 0]: Stabilized group mycg generation 7 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Assignment received from leader for group mycg for 
> generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
> As a reference a regular consumer leave/shutdown logs looks like this. NOTE 
> how the consumer group moves into empty state.
> {noformat}
> [GroupCoordinator 0]: Member[group.instance.id None, member.id 
> consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
> removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
> removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
> LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Group mycg with generation 9 is now empty 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to