[ https://issues.apache.org/jira/browse/KAFKA-15035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sam Cantero updated KAFKA-15035: -------------------------------- Summary: Consumer offsets can be deleted if kafka does not detect a consumer as empty (was: Consumer offsets can be deleted immediately if kafka does not detect a consumer as empty) > Consumer offsets can be deleted if kafka does not detect a consumer as empty > ---------------------------------------------------------------------------- > > Key: KAFKA-15035 > URL: https://issues.apache.org/jira/browse/KAFKA-15035 > Project: Kafka > Issue Type: Bug > Affects Versions: 2.7.2 > Reporter: Sam Cantero > Priority: Major > > We've recently encountered a scenario where a consumer group got their > committed offsets deleted some minutes (around 3 minutes) after the consumer > got into inactive state (the underlying node went away). > As per > [KIP-211|https://cwiki.apache.org/confluence/display/KAFKA/KIP-211%3A+Revise+Expiration+Semantics+of+Consumer+Group+Offsets], > committed offsets for an active (i.e running) consumer group should not be > deleted. However, if a consumer becomes inactive, {+}the deletion of > committed offsets will not occur immediately{+}. Instead, the committed > offsets will only be removed if the consumer remains inactive for at least > the duration specified by > [offset.retention.minutes|https://kafka.apache.org/documentation/#brokerconfigs_offsets.retention.minutes]. > In our case {{offset.retention.minutes}} is set to 7 days and the consumer > was only inactive for 5 minutes, so deletion should have not occurred. > Later on KIP-496 was introduced to fix the following issue in KIP-211: > {quote}When a consumer subscription changes, we are still left with the > committed offsets of the previous subscription. These will never be cleaned > up as long as the group remains active. We were aware of this problem in > KIP-211, but the solution was not implemented because the coordinator is > presently agnostic to join group metadata and we were unclear about the > compatibility implications of changing that. > {quote} > However this introduced a regression as explained in > https://issues.apache.org/jira/browse/KAFKA-13636. > {quote}The group coordinator might delete invalid offsets during a group > rebalance. During a rebalance, the coordinator is relying on the last commit > timestamp ({_}offsetAndMetadata.commitTimestamp{_}) instead of the last state > modification {_}timestamp (currentStateTimestamp{_}) to detect expired > offsets. > {quote} > It is implied, that Kafka employs two approaches for offset expiration: > * The deletion timer is activated when a consumer group enters the Empty > state (i.e., not running). Once the timer exceeds the > {{offset.retention.minutes}} threshold, the committed offsets are deleted. > * If a consumer is in a "running" state (i.e., not in the Empty state) but > is no longer consuming from topics with committed offsets older than the > offset.retention.minutes duration, the committed offsets are deleted. > However, the Kafka issue KAFKA-13636 specifically states that this situation > could occur during a group rebalance. In my particular scenario, I have > observed that the affected consumer group did not transition into the Empty > state, and I have encountered a "Stabilized" log line. I'm uncertain whether > this log line indicates that the rebalance had concluded at that point or > not. The reason why Kafka did not detect this consumer group as Empty remains > unclear. > *Logs* > {noformat} > 01:30:47 am - [GroupCoordinator 1]: Member consumer-mycg-1-uuid in group mycg > has failed, removing it from the group > 01:30:47 am - [GroupCoordinator 1]: Preparing to rebalance group mycg in > state PreparingRebalance with old generation 432 (__consumer_offsets-16) > (reason: removing member consumer-mycg-1-uuid on heartbeat expiration) > 1:30:50 am - [GroupCoordinator 1]: Member consumer-mycg-2-uuid in group mycg > has failed, removing it from the group > 01:30:50 am - [GroupCoordinator 1]: Stabilized group mycg generation 433 > (__consumer_offsets-16) > 01:30:50 am - [GroupCoordinator 1]: Assignment received from leader for group > mycg for generation 433{noformat} > This suggests that kafka might have followed the second approach and that's > why kafka deleted the offsets 3 minutes later. > {noformat} > 1:33:17 am - > [GroupMetadataManager brokerId=1] Removed 285 expired offsets in 8 > milliseconds.{noformat} > As a reference a regular consumer join/startup logs looks like this. The > group is stabilised and the assignment from the leader received. > {noformat} > [GroupCoordinator 0]: Preparing to rebalance group mycg in state > PreparingRebalance with old generation 6 (__consumer_offsets-22) (reason: > Adding new member consumer-mycg-1-2b8ba689-fbaa-4829-82f5-dd2ed1d89d86 with > group instance id None) (kafka.coordinator.group.GroupCoordinator) > > [GroupCoordinator 0]: Stabilized group mycg generation 7 > (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Assignment received from leader for group mycg for > generation 7 (kafka.coordinator.group.GroupCoordinator){noformat} > As a reference a regular consumer leave/shutdown logs looks like this. NOTE > how the consumer group moves into empty state. > {noformat} > [GroupCoordinator 0]: Member[group.instance.id None, member.id > consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7] in group mycg has left, > removing it from the group (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Preparing to rebalance group mycg in state > PreparingRebalance with old generation 8 (__consumer_offsets-22) (reason: > removing member consumer-mycg-1-eb77a142-5b64-476e-bc3d-2731c9b811a7 on > LeaveGroup) (kafka.coordinator.group.GroupCoordinator) > [GroupCoordinator 0]: Group mycg with generation 9 is now empty > (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator){noformat} > > -- This message was sent by Atlassian Jira (v8.20.10#820010)