[ 
https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sam Cantero updated KAFKA-15035:
--------------------------------
    Summary: Consumer offsets can be deleted immediately if kafka does not 
detect a consumer as empty  (was: Consumer offsets can be deleted immediately 
if kafka does not detect a consumer as dead)

> Consumer offsets can be deleted immediately if kafka does not detect a 
> consumer as empty
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-15035
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15035
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.7.2
>            Reporter: Sam Cantero
>            Priority: Major
>
> We've recently encountered a scenario where a consumer group got their 
> committed offsets deleted almost right after (around 3 minutes) the consumer 
> got into inactive state.
> As per 
> [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets],
>  committed offsets for an active (i.e running) consumer group should not be 
> deleted. However, if a consumer becomes inactive, {+}the deletion of 
> committed offsets will not occur immediately{+}. Instead, the committed 
> offsets will only be removed if the consumer remains inactive for at least 
> the duration specified by 
> [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes].
> In our case {{offset.retention.minutes}} is set to 7 days and the consumer 
> was only inactive for 5 minutes, so deletion should have not occurred.
> By inspecting the KIP-211 further, we can find the following sentence:
> {quote}If a group consumer unsubscribes from a topic but continues to consume 
> from other subscribed topics, the offset information of that unsubscribed 
> topic’s partitions should be deleted at the appropriate time.
> {quote}
> And later on:
> {quote}If there are partitions the group has offset for but no longer 
> consumes from, and offsets.retention.minutes has passed since their last 
> commit timestamp, the corresponding offsets will be removed from the offset 
> cache
> {quote}
> It is implied, though {*}+this is what I want to confirm in this ticket+{*}, 
> that Kafka employs two approaches for offset expiration:
>  * The deletion timer is activated when a consumer group enters the Empty 
> state (i.e., not running). Once the timer exceeds the 
> {{offset.retention.minutes}} threshold, the committed offsets are deleted.
>  * If a consumer is in a "running" state (i.e., not in the Empty state) but 
> is no longer consuming from topics with committed offsets older than the 
> offset.retention.minutes duration, the committed offsets are deleted.
> Note that the second approach only takes into account the timestamp of the 
> last committed offset.
> Throughout this event, the affected consumer group didn’t transition into 
> Empty state. Based on the kafka logs, the consumer group was not detected as 
> Empty, indicating that Kafka considered the consumer to be running from its 
> perspective. It’s unclear why kafka didn’t detect this consumer group as 
> Empty.
> {noformat}
> 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg 
> has failed, removing it from the group
> 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in 
> state PreparingRebalance with old generation 432 (__consumer_offsets-16) 
> (reason: removing member consumer-mycg-1-uuid on heartbeat expiration)
> 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg 
> has failed, removing it from the group
> 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 
> (__consumer_offsets-16)
> 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group 
> mycg for generation 433{noformat}
> This suggests that kafka might have followed the second approach and that's 
> why kafka deleted the offsets 3 minutes later.
> {noformat}
> 1:33:17 am - 
> [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 
> milliseconds.{noformat}
> As a reference a regular consumer join/startup logs looks like this. The 
> group is stabilised and the assignment from the leader received.
> {noformat}
>  [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: 
> Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with 
> group instance id None) (kafka.coordinator.group.GroupCoordinator)
>  
> [GroupCoordinator 0]: Stabilized group mycg generation 7 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Assignment received from leader for group mycg for 
> generation 7 (kafka.coordinator.group.GroupCoordinator){noformat}
> As a reference a regular consumer leave/shutdown logs looks like this. NOTE 
> how the consumer group moves into empty state.
>  
> {noformat}
> [GroupCoordinator 0]: Member[group.instance.id None, member.id 
> consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, 
> removing it from the group (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Preparing to rebalance group mycg in state 
> PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: 
> removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on 
> LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
> [GroupCoordinator 0]: Group mycg with generation 9 is now empty 
> (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat}
>  
> As another piece of information, the consumer's underlying node experienced 
> an unclean termination, which could have played a role in Kafka's failure to 
> identify the consumer group as inactive.
> In summary, when combining Kafka's expiration semantics of committed offsets 
> with Kafka's failure to detect a consumer in a dead state, it is possible for 
> committed offsets to be deleted.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to