[ https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sam Cantero updated KAFKA-15035: -------------------------------- Summary: Consumer offsets can be deleted immediately if kafka does not detect a consumer as empty (was: Consumer offsets can be deleted immediately if kafka does not detect a consumer as dead) > Consumer offsets can be deleted immediately if kafka does not detect a > consumer as empty > ---------------------------------------------------------------------------------------- > > Key: KAFKA-15035 > URL: https://issues.apache.org/jira/browse/KAFKA-15035 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.7.2 > Reporter: Sam Cantero > Priority: Major > > We've recently encountered a scenario where a consumer group got their > committed offsets deleted almost right after (around 3 minutes) the consumer > got into inactive state. > As per > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets], > committed offsets for an active (i.e running) consumer group should not be > deleted. However, if a consumer becomes inactive, {+}the deletion of > committed offsets will not occur immediately{+}. Instead, the committed > offsets will only be removed if the consumer remains inactive for at least > the duration specified by > [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes]. > In our case {{offset.retention.minutes}} is set to 7 days and the consumer > was only inactive for 5 minutes, so deletion should have not occurred. > By inspecting the KIP-211 further, we can find the following sentence: > {quote}If a group consumer unsubscribes from a topic but continues to consume > from other subscribed topics, the offset information of that unsubscribed > topic’s partitions should be deleted at the appropriate time. > {quote} > And later on: > {quote}If there are partitions the group has offset for but no longer > consumes from, and offsets.retention.minutes has passed since their last > commit timestamp, the corresponding offsets will be removed from the offset > cache > {quote} > It is implied, though {*}+this is what I want to confirm in this ticket+{*}, > that Kafka employs two approaches for offset expiration: > * The deletion timer is activated when a consumer group enters the Empty > state (i.e., not running). Once the timer exceeds the > {{offset.retention.minutes}} threshold, the committed offsets are deleted. > * If a consumer is in a "running" state (i.e., not in the Empty state) but > is no longer consuming from topics with committed offsets older than the > offset.retention.minutes duration, the committed offsets are deleted. > Note that the second approach only takes into account the timestamp of the > last committed offset. > Throughout this event, the affected consumer group didn’t transition into > Empty state. Based on the kafka logs, the consumer group was not detected as > Empty, indicating that Kafka considered the consumer to be running from its > perspective. It’s unclear why kafka didn’t detect this consumer group as > Empty. > {noformat} > 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg > has failed, removing it from the group > 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in > state PreparingRebalance with old generation 432 (__consumer_offsets-16) > (reason: removing member consumer-mycg-1-uuid on heartbeat expiration) > 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg > has failed, removing it from the group > 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 > (__consumer_offsets-16) > 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group > mycg for generation 433{noformat} > This suggests that kafka might have followed the second approach and that's > why kafka deleted the offsets 3 minutes later. > {noformat} > 1:33:17 am - > [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 > milliseconds.{noformat} > As a reference a regular consumer join/startup logs looks like this. The > group is stabilised and the assignment from the leader received. > {noformat} > [GroupCoordinator 0]: Preparing to rebalance group mycg in state > PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: > Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with > group instance id None) (kafka.coordinator.group.GroupCoordinator) > > [GroupCoordinator 0]: Stabilized group mycg generation 7 > (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Assignment received from leader for group mycg for > generation 7 (kafka.coordinator.group.GroupCoordinator){noformat} > As a reference a regular consumer leave/shutdown logs looks like this. NOTE > how the consumer group moves into empty state. > > {noformat} > [GroupCoordinator 0]: Member[group.instance.id None, member.id > consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, > removing it from the group (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Preparing to rebalance group mycg in state > PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: > removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on > LeaveGroup) (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Group mycg with generation 9 is now empty > (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat} > > As another piece of information, the consumer's underlying node experienced > an unclean termination, which could have played a role in Kafka's failure to > identify the consumer group as inactive. > In summary, when combining Kafka's expiration semantics of committed offsets > with Kafka's failure to detect a consumer in a dead state, it is possible for > committed offsets to be deleted. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)