dajac commented on code in PR #14467: URL: https://github.com/apache/kafka/pull/14467#discussion_r1340783778
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -190,6 +191,11 @@ public GroupCoordinatorService build() { */ private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime; + /** + * The scheduler that periodically deletes expired group metadata. + */ + private final KafkaScheduler scheduler = new KafkaScheduler(1, true, "group-metadata-expiration-"); Review Comment: We already have a timer so we should rather use it. Thinking about this, I think that the best would best to actually schedule the expiration for each shard independently in CoordinatorShard::onLoaded. From there, we can access the `CoordinatorTimer`. Would this work? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + offsetMetadataManager.cleanupOffsetMetadata(records, config.offsetsRetentionMs); + groupMetadataManager.cleanupGroupMetadata(records); Review Comment: It is a tad annoying that we have to iterate on all the groups with offsets in `cleanupOffsetMetadata` and on the groups in `cleanupGroupMetadata`. Would it be possible to iterate on all groups only once? It is also annoying that we have to iterate on all the groups as well because it won't scale well. We would need to build indices to avoid this but I am not sure if it is worth it. Have you already considered this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -653,7 +676,17 @@ private void removeOffset( if (partitionOffsets.isEmpty()) topicOffsets.remove(topic); - if (topicOffsets.isEmpty()) + if (topicOffsets.isEmpty()) { offsetsByGroup.remove(groupId); + // A group in offset metadata manager should always exist in the group metadata manager. + // In the case the write operation fails, the generic group will remain Dead. + // This is okay since it follows the old coordinator behavior; delete expired offsets and + // transition group to Dead even if append fails. One caveat is that groups that just transitioned + // to Dead will be deleted in the next expiration cycle. + Group group = groupMetadataManager.group(groupId); + if (group.isEmpty()) { + group.transitionToDead(); Review Comment: I actually wonder if we need this after all. If the deletion fails, the group will be restored at its previous state when the operation is reverted in the timeline data structure. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> deleteOffsets( return offsetMetadataManager.deleteOffsets(request); } + public CoordinatorResult<Void, Record> cleanupGroupMetadata() { + List<Record> records = new ArrayList<>(); + offsetMetadataManager.cleanupOffsetMetadata(records, config.offsetsRetentionMs); + groupMetadataManager.cleanupGroupMetadata(records); + + return new CoordinatorResult<>(records, null); Review Comment: The number of records here is potentially unlimited. Therefore, we could hit the max message size. If we do, the expiration process would basically fail and never recover. Have you thought about how we could handle this? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java: ########## @@ -641,6 +643,26 @@ public List<Record> createGroupTombstoneRecords() { ); } + @Override + public boolean isEmpty() { + return state() == ConsumerGroupState.EMPTY; + } + + @Override + public void transitionToDead() { + state.set(ConsumerGroupState.DEAD); + } + + @Override + public boolean eligibleForExpiration() { + return state() == ConsumerGroupState.DEAD; + } + + @Override + public Map<TopicPartition, OffsetAndMetadata> expiredOffsets(long offsetsRetentionMs, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsets) { + return null; Review Comment: Why is this null? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + public void cleanupOffsetMetadata(List<Record> records, long offsetsRetentionMs) { + // TODO: get only committed offset groups? + offsetsByGroup.forEach((groupId, offsetsByTopic) -> { + try { + Group group = groupMetadataManager.group(groupId); + Map<TopicPartition, OffsetAndMetadata> expiredOffsets = group.expiredOffsets(offsetsRetentionMs, offsetsByTopic); Review Comment: Is there a reason why we need to pass `offsetsByTopic` to the group? Would it be possible to just ask the group is offsets could be expired and then expire them if it is possible. This would follow the pattern that we have established in @dongnuo123's PR. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -3100,6 +3100,15 @@ void validateDeleteGroup(String groupId) throws ApiException { group.validateDeleteGroup(); } + public void cleanupGroupMetadata(List<Record> records) { + // TODO: committed offset? + groups.values().forEach(group -> { + if (group.eligibleForExpiration()) { + records.addAll(group.createGroupTombstoneRecords()); + } + }); Review Comment: It is a bit annoying that we have to iterate over all the groups to find the one eligible for expiration. Have you thought about alternatives? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup fetchAllOffsets( .setTopics(topicResponses); } + public void cleanupOffsetMetadata(List<Record> records, long offsetsRetentionMs) { + // TODO: get only committed offset groups? + offsetsByGroup.forEach((groupId, offsetsByTopic) -> { + try { + Group group = groupMetadataManager.group(groupId); + Map<TopicPartition, OffsetAndMetadata> expiredOffsets = group.expiredOffsets(offsetsRetentionMs, offsetsByTopic); + log.debug("[GroupId {}] Expiring offsets: {}", groupId, expiredOffsets.keySet()); + expiredOffsets.forEach((topicPartition, offsetAndMetadata) -> { Review Comment: Is expiredOffsets really necessary? It seems that we could directly generate the required records instead of building it, no? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org