dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1347292051
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -72,6 +72,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.record.BrokerCompressionType; import org.apache.kafka.server.util.FutureUtils; +import org.apache.kafka.server.util.KafkaScheduler; Review Comment: nit: This could be removed. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ########## @@ -330,5 +330,5 @@ void onNewMetadataImage( /** * Shutdown the group coordinator. */ - void shutdown(); + void shutdown() throws InterruptedException; Review Comment: nit: Do we still need this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -417,6 +442,40 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } + /** + * For each group, remove all expired offsets. If all offsets for the group is removed and the group is eligible + * for deletion, delete the group. + * + * @return The list of tombstones (offset commit and group metadata) to append. + */ + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + Set<String> groupsWithEmptyOffsets = new HashSet<>(); + groupMetadataManager.groupIds() + .forEach(groupId -> offsetMetadataManager.cleanupExpiredOffsets(groupId, records, config.offsetsRetentionMs) + .ifPresent(groupsWithEmptyOffsets::add)); + + groupsWithEmptyOffsets.forEach(groupId -> groupMetadataManager.maybeDeleteGroup(groupId, records)); Review Comment: Is there a reason why we need to add the group ids to groupsWithEmptyOffsets? I wonder if we could directly delete the group after deleting the offsets. Is it possible? The advantage is that it groups all the tombstone of a group. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + OptionalLong expireTimestampMs = offsetAndMetadata.expireTimestampMs; + if (expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + if (currentTimestamp >= expireTimestampMs.getAsLong()) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } + // Current version with no per partition retention + } else if (currentTimestamp - expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= offsetsRetentionMs) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } else { + hasAllOffsetsExpired.set(false); + } + }); + } + }); + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); + + if (hasAllOffsetsExpired.get()) { + // All offsets were expired for this group. Remove the group. + return Optional.of(groupId); + } + + } catch (GroupIdNotFoundException e) { + // groups in offsets should exist. + log.warn("GroupId {} should exist.", groupId); + } Review Comment: Given that the group should exist based on how we use this method, I think that we could remove the try..catch and just let the exception go to the caller. We don't need to catch it. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; Review Comment: Are you sure about the `generationId > 0` condition here? I cannot find it. There may be some confusion with [this one](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala#L888). I think that this optimization does not apply to us because we need a record to remove the group from the timeline data structure. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + OptionalLong expireTimestampMs = offsetAndMetadata.expireTimestampMs; + if (expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + if (currentTimestamp >= expireTimestampMs.getAsLong()) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } + // Current version with no per partition retention + } else if (currentTimestamp - expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= offsetsRetentionMs) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } else { + hasAllOffsetsExpired.set(false); + } + }); + } + }); + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); + + if (hasAllOffsetsExpired.get()) { + // All offsets were expired for this group. Remove the group. + return Optional.of(groupId); + } + + } catch (GroupIdNotFoundException e) { + // groups in offsets should exist. + log.warn("GroupId {} should exist.", groupId); + } + + return Optional.empty(); + } + + /** + * Add an offset commit tombstone record for the group. + * + * @param groupId The group id. + * @param topic The topic name. + * @param partition The partition. + * @param records The list of records to append the tombstone. + * + * @return The topic partition of the corresponding tombstone. + */ + private TopicPartition addOffsetCommitTombstone( + String groupId, + String topic, + int partition, + List<Record> records + ) { + records.add(RecordHelpers.newOffsetCommitTombstoneRecord(groupId, topic, partition)); + TopicPartition tp = new TopicPartition(topic, partition); + log.trace("[GroupId {}] Removing expired offset and metadata for {}", groupId, tp); Review Comment: nit: We could replace `{}` by `{}-{}` and `tp` by `topic, partition`. This avoid having to allocate a TopicPartition just for the logging. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -417,6 +442,40 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } + /** + * For each group, remove all expired offsets. If all offsets for the group is removed and the group is eligible + * for deletion, delete the group. + * + * @return The list of tombstones (offset commit and group metadata) to append. + */ + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + Set<String> groupsWithEmptyOffsets = new HashSet<>(); + groupMetadataManager.groupIds() + .forEach(groupId -> offsetMetadataManager.cleanupExpiredOffsets(groupId, records, config.offsetsRetentionMs) Review Comment: Would it be better to pass the config object to the offset metadata manager so that it can figure out the retention? This would be more aligned with the group metadata manager. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + OptionalLong expireTimestampMs = offsetAndMetadata.expireTimestampMs; + if (expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + if (currentTimestamp >= expireTimestampMs.getAsLong()) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } + // Current version with no per partition retention + } else if (currentTimestamp - expirationCondition.baseTimestamp.apply(offsetAndMetadata) >= offsetsRetentionMs) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } else { + hasAllOffsetsExpired.set(false); + } + }); + } + }); + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredPartitions); + + if (hasAllOffsetsExpired.get()) { + // All offsets were expired for this group. Remove the group. + return Optional.of(groupId); + } + + } catch (GroupIdNotFoundException e) { + // groups in offsets should exist. + log.warn("GroupId {} should exist.", groupId); + } + + return Optional.empty(); Review Comment: The group is is known to the caller, right? How about returning a boolean? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { Review Comment: We added `isSubscribedToTopic` to the `Group` interface in https://github.com/apache/kafka/pull/14408. We can probably re-use it here. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3104,6 +3117,10 @@ private boolean isProtocolInconsistent( && !groupProtocolTypeOrName.equals(protocolTypeOrName); } + public Set<String> groupIds() { + return this.groups.keySet(); Review Comment: nit: Should we wrap it with Collection.unmodifiableSet? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -61,6 +67,29 @@ * handling as well as during the initial loading of the records from the partitions. */ public class OffsetMetadataManager { + + /** + * An offset is considered expired based on different factors, such as the state of the group + * and/or the GroupMetadata record version (for generic groups). This class is used to check + * how offsets for the group should be expired. + */ + public static class ExpirationCondition { + /** + * Given an offset metadata, return the base timestamp that will be used to calculate expiration. + */ + final Function<OffsetAndMetadata, Long> baseTimestamp; + + /** + * The set of subscribed topics a group should check against for an offset commit to be eligible for expiration. + */ + final Set<String> subscribedTopics; + + public ExpirationCondition(Function<OffsetAndMetadata, Long> baseTimestamp, Set<String> subscribedTopics) { + this.baseTimestamp = baseTimestamp; + this.subscribedTopics = subscribedTopics; + } + } Review Comment: As we don't need `subscribedTopics` here anymore, I think that we could simplify this. How about the following? ``` interface OffsetExpirationCondition { boolean isOffsetExpired(OffsetAndMetadata offset, long offsetsRetentionMs); } ``` With this, you can define it with a lambda. This would be pretty convenient, I suppose. ``` (offset, offsetsRetentionMs) -> { .... return true/false; } ``` ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -544,6 +573,81 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + /** + * Remove expired offsets for group. + * + * @param groupId The group id. + * @param records The list of records to populate with offset commit tombstone records. + * @param offsetsRetentionMs The offset retention in milliseconds. + * + * @return The group id if the group no longer has any offsets remaining, empty otherwise. + */ + public Optional<String> cleanupExpiredOffsets(String groupId, List<Record> records, long offsetsRetentionMs) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsetsByGroup.get(groupId); + if (offsetsByTopic == null) { + return Optional.of(groupId); + } + try { + Group group = groupMetadataManager.group(groupId); + ExpirationCondition expirationCondition = group.expirationCondition(); + Set<TopicPartition> expiredPartitions = new HashSet<>(); + long currentTimestamp = time.milliseconds(); + AtomicBoolean hasAllOffsetsExpired = new AtomicBoolean(true); + offsetsByTopic.forEach((topic, partitions) -> { + if (!expirationCondition.subscribedTopics.contains(topic)) { + partitions.forEach((partition, offsetAndMetadata) -> { + OptionalLong expireTimestampMs = offsetAndMetadata.expireTimestampMs; + if (expireTimestampMs.isPresent()) { + // Older versions with explicit expire_timestamp field => old expiration semantics is used + if (currentTimestamp >= expireTimestampMs.getAsLong()) { + expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); + } Review Comment: Don't we need an else branch with `hasAllOffsetsExpired.set(false);` here? As a general advice, it would be great if we could wrap the logic to decide whether an offset is expired or not in a lambda/method. This would allow us to write something like the following here: ``` if (isOffsetExpired(...)) { expiredPartitions.add(addOffsetCommitTombstone(groupId, topic, partition, records)); } else { hasAllOffsetsExpired.set(false); } ``` This would handle all the cases. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; + } + + /** + * Return the expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.ExpirationCondition} + * + * @return The expiration condition. + */ + @Override + public OffsetMetadataManager.ExpirationCondition expirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs), + Collections.emptySet()); + } else if (ConsumerProtocol.PROTOCOL_TYPE.equals(protocolType.get()) && + subscribedTopics.isPresent() && isInState(STABLE)) { + // Consumers exist in the group and group is Stable => + // - If the group is aware of the subscribed topics and retention period had passed since the + // last commit timestamp, expire the offset. offset with pending offset commit are not + // expired + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> offsetAndMetadata.commitTimestampMs, + subscribedTopics.orElse(Collections.emptySet())); + } + } else { + // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only + // expire offsets with no pending offset commit that retention period has passed since their last commit + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> offsetAndMetadata.commitTimestampMs, + Collections.emptySet()); + } + + // If none of the conditions above are met, do not expire any offsets. + return new OffsetMetadataManager.ExpirationCondition(offsetAndMetadata -> Long.MAX_VALUE, Collections.emptySet()); Review Comment: It is a tad annoying that we have to iterate on all offsets in this case. I wonder if we could return an Optional. When it is define, it means that offsets could be deleted so we have to check all of them. When it is not define, we could just skip it. This seems to be a worthwhile optimization. What do you think? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; + } + + /** + * Return the expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.ExpirationCondition} + * + * @return The expiration condition. + */ + @Override + public OffsetMetadataManager.ExpirationCondition expirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs), + Collections.emptySet()); + } else if (ConsumerProtocol.PROTOCOL_TYPE.equals(protocolType.get()) && + subscribedTopics.isPresent() && isInState(STABLE)) { Review Comment: nit: Could we align subscribedTopics with ConsumerProtocol on the previous line? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; + } + + /** + * Return the expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.ExpirationCondition} + * + * @return The expiration condition. + */ + @Override + public OffsetMetadataManager.ExpirationCondition expirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs), + Collections.emptySet()); + } else if (ConsumerProtocol.PROTOCOL_TYPE.equals(protocolType.get()) && Review Comment: nit: We could reuse `usesConsumerGroupProtocol` here. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; + } + + /** + * Return the expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.ExpirationCondition} + * + * @return The expiration condition. + */ + @Override + public OffsetMetadataManager.ExpirationCondition expirationCondition() { + if (protocolType.isPresent()) { + if (isInState(EMPTY)) { + // No consumer exists in the group => + // - If current state timestamp exists and retention period has passed since group became Empty, + // expire all offsets with no pending offset commit; + // - If there is no current state timestamp (old group metadata schema) and retention period has passed + // since the last commit timestamp, expire the offset + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> currentStateTimestamp.orElse(offsetAndMetadata.commitTimestampMs), + Collections.emptySet()); + } else if (ConsumerProtocol.PROTOCOL_TYPE.equals(protocolType.get()) && + subscribedTopics.isPresent() && isInState(STABLE)) { + // Consumers exist in the group and group is Stable => + // - If the group is aware of the subscribed topics and retention period had passed since the + // last commit timestamp, expire the offset. offset with pending offset commit are not + // expired + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> offsetAndMetadata.commitTimestampMs, + subscribedTopics.orElse(Collections.emptySet())); + } + } else { + // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only + // expire offsets with no pending offset commit that retention period has passed since their last commit + return new OffsetMetadataManager.ExpirationCondition( + offsetAndMetadata -> offsetAndMetadata.commitTimestampMs, + Collections.emptySet()); + } + + // If none of the conditions above are met, do not expire any offsets. + return new OffsetMetadataManager.ExpirationCondition(offsetAndMetadata -> Long.MAX_VALUE, Collections.emptySet()); + Review Comment: nit: empty line. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroup.java: ########## @@ -898,6 +904,53 @@ public void createGroupTombstoneRecords(List<Record> records) { records.add(RecordHelpers.newGroupMetadataTombstoneRecord(groupId())); } + @Override + public boolean isEligibleForDeletion() { + return isInState(EMPTY) && generationId > 0; + } + + /** + * Return the expiration condition to be used for this group. This is based on several factors + * such as the group state, the protocol type, and the GroupMetadata record version. + * See {@link org.apache.kafka.coordinator.group.OffsetMetadataManager.ExpirationCondition} + * + * @return The expiration condition. + */ + @Override + public OffsetMetadataManager.ExpirationCondition expirationCondition() { Review Comment: nit: `offsetExpirationCondition`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org