dajac commented on code in PR #14467:
URL: https://github.com/apache/kafka/pull/14467#discussion_r1340783778


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -190,6 +191,11 @@ public GroupCoordinatorService build() {
      */
     private final CoordinatorRuntime<GroupCoordinatorShard, Record> runtime;
 
+    /**
+     * The scheduler that periodically deletes expired group metadata.
+     */
+    private final KafkaScheduler scheduler = new KafkaScheduler(1, true, 
"group-metadata-expiration-");

Review Comment:
   We already have a timer so we should rather use it. Thinking about this, I 
think that the best would best to actually schedule the expiration for each 
shard independently in CoordinatorShard::onLoaded. From there, we can access 
the `CoordinatorTimer`. Would this work?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
         return offsetMetadataManager.deleteOffsets(request);
     }
 
+    public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+        List<Record> records = new ArrayList<>();
+        offsetMetadataManager.cleanupOffsetMetadata(records, 
config.offsetsRetentionMs);
+        groupMetadataManager.cleanupGroupMetadata(records);

Review Comment:
   It is a tad annoying that we have to iterate on all the groups with offsets 
in `cleanupOffsetMetadata` and on the groups in `cleanupGroupMetadata`. Would 
it be possible to iterate on all groups only once?
   
   It is also annoying that we have to iterate on all the groups as well 
because it won't scale well. We would need to build indices to avoid this but I 
am not sure if it is worth it. Have you already considered this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -653,7 +676,17 @@ private void removeOffset(
         if (partitionOffsets.isEmpty())
             topicOffsets.remove(topic);
 
-        if (topicOffsets.isEmpty())
+        if (topicOffsets.isEmpty()) {
             offsetsByGroup.remove(groupId);
+            // A group in offset metadata manager should always exist in the 
group metadata manager.
+            // In the case the write operation fails, the generic group will 
remain Dead.
+            // This is okay since it follows the old coordinator behavior; 
delete expired offsets and
+            // transition group to Dead even if append fails. One caveat is 
that groups that just transitioned
+            // to Dead will be deleted in the next expiration cycle.
+            Group group = groupMetadataManager.group(groupId);
+            if (group.isEmpty()) {
+                group.transitionToDead();

Review Comment:
   I actually wonder if we need this after all. If the deletion fails, the 
group will be restored at its previous state when the operation is reverted in 
the timeline data structure.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -400,6 +408,14 @@ public CoordinatorResult<OffsetDeleteResponseData, Record> 
deleteOffsets(
         return offsetMetadataManager.deleteOffsets(request);
     }
 
+    public CoordinatorResult<Void, Record> cleanupGroupMetadata() {
+        List<Record> records = new ArrayList<>();
+        offsetMetadataManager.cleanupOffsetMetadata(records, 
config.offsetsRetentionMs);
+        groupMetadataManager.cleanupGroupMetadata(records);
+
+        return new CoordinatorResult<>(records, null);

Review Comment:
   The number of records here is potentially unlimited. Therefore, we could hit 
the max message size. If we do, the expiration process would basically fail and 
never recover. Have you thought about how we could handle this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -641,6 +643,26 @@ public List<Record> createGroupTombstoneRecords() {
         );
     }
 
+    @Override
+    public boolean isEmpty() {
+        return state() == ConsumerGroupState.EMPTY;
+    }
+
+    @Override
+    public void transitionToDead() {
+        state.set(ConsumerGroupState.DEAD);
+    }
+
+    @Override
+    public boolean eligibleForExpiration() {
+        return state() == ConsumerGroupState.DEAD;
+    }
+
+    @Override
+    public Map<TopicPartition, OffsetAndMetadata> expiredOffsets(long 
offsetsRetentionMs, TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> offsets) {
+        return null;

Review Comment:
   Why is this null?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    public void cleanupOffsetMetadata(List<Record> records, long 
offsetsRetentionMs) {
+        // TODO: get only committed offset groups?
+        offsetsByGroup.forEach((groupId, offsetsByTopic) -> {
+            try {
+                Group group = groupMetadataManager.group(groupId);
+                Map<TopicPartition, OffsetAndMetadata> expiredOffsets = 
group.expiredOffsets(offsetsRetentionMs, offsetsByTopic);

Review Comment:
   Is there a reason why we need to pass `offsetsByTopic` to the group? Would 
it be possible to just ask the group is offsets could be expired and then 
expire them if it is possible. This would follow the pattern that we have 
established in @dongnuo123's PR.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3100,6 +3100,15 @@ void validateDeleteGroup(String groupId) throws 
ApiException {
         group.validateDeleteGroup();
     }
 
+    public void cleanupGroupMetadata(List<Record> records) {
+        // TODO: committed offset?
+        groups.values().forEach(group -> {
+            if (group.eligibleForExpiration()) {
+                records.addAll(group.createGroupTombstoneRecords());
+            }
+        });

Review Comment:
   It is a bit annoying that we have to iterate over all the groups to find the 
one eligible for expiration. Have you thought about alternatives?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -536,6 +538,27 @@ public OffsetFetchResponseData.OffsetFetchResponseGroup 
fetchAllOffsets(
             .setTopics(topicResponses);
     }
 
+    public void cleanupOffsetMetadata(List<Record> records, long 
offsetsRetentionMs) {
+        // TODO: get only committed offset groups?
+        offsetsByGroup.forEach((groupId, offsetsByTopic) -> {
+            try {
+                Group group = groupMetadataManager.group(groupId);
+                Map<TopicPartition, OffsetAndMetadata> expiredOffsets = 
group.expiredOffsets(offsetsRetentionMs, offsetsByTopic);
+                log.debug("[GroupId {}] Expiring offsets: {}", groupId, 
expiredOffsets.keySet());
+                expiredOffsets.forEach((topicPartition, offsetAndMetadata) -> {

Review Comment:
   Is expiredOffsets really necessary? It seems that we could directly generate 
the required records instead of building it, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to