This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1293658ccaa KAFKA-19163: Avoid deleting groups with pending 
transactional offsets (#19496)
1293658ccaa is described below

commit 1293658ccaa5ada0f1e12ae78cb395bf81f43be0
Author: Sean Quah <sq...@confluent.io>
AuthorDate: Tue May 13 13:10:26 2025 +0100

    KAFKA-19163: Avoid deleting groups with pending transactional offsets 
(#19496)
    
    When a group has pending transactional offsets but no committed offsets,
    we can accidentally delete it while cleaning up expired offsets. Add a
    check to avoid this case.
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../coordinator/group/OffsetMetadataManager.java   |  5 +--
 .../group/OffsetMetadataManagerTest.java           | 36 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 2 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 54de475d4bf..f946af2ff62 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -1000,13 +1000,14 @@ public class OffsetMetadataManager {
      * @param groupId The group id.
      * @param records The list of records to populate with offset commit 
tombstone records.
      *
-     * @return True if no offsets exist or if all offsets expired, false 
otherwise.
+     * @return True if no offsets exist after expiry and no pending 
transactional offsets exist,
+     *         false otherwise.
      */
     public boolean cleanupExpiredOffsets(String groupId, 
List<CoordinatorRecord> records) {
         TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> 
offsetsByTopic =
             offsets.offsetsByGroup.get(groupId);
         if (offsetsByTopic == null) {
-            return true;
+            return !openTransactions.contains(groupId);
         }
 
         // We expect the group to exist.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 382b2a9b0e5..d0925bb4c21 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -2662,6 +2662,42 @@ public class OffsetMetadataManagerTest {
         assertEquals(List.of(), records);
     }
 
+    @Test
+    public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() 
{
+        GroupMetadataManager groupMetadataManager = 
mock(GroupMetadataManager.class);
+        Group group = mock(Group.class);
+
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder()
+            .withGroupMetadataManager(groupMetadataManager)
+            .withOffsetsRetentionMinutes(1)
+            .build();
+
+        long commitTimestamp = context.time.milliseconds();
+
+        context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
+        context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, 
commitTimestamp + 500);
+
+        context.time.sleep(Duration.ofMinutes(1).toMillis());
+
+        when(groupMetadataManager.group("group-id")).thenReturn(group);
+        when(group.offsetExpirationCondition()).thenReturn(Optional.of(
+            new OffsetExpirationConditionImpl(offsetAndMetadata -> 
offsetAndMetadata.commitTimestampMs)));
+        when(group.isSubscribedToTopic("foo")).thenReturn(false);
+
+        // foo-0 is expired, but the group is not deleted beacuse it has 
pending transactional offset commits.
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 
0)
+        );
+        List<CoordinatorRecord> records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(expectedRecords, records);
+
+        // No offsets are expired, and the group is still not deleted because 
it has pending transactional offset commits.
+        records = new ArrayList<>();
+        assertFalse(context.cleanupExpiredOffsets("group-id", records));
+        assertEquals(List.of(), records);
+    }
+
     private static OffsetFetchResponseData.OffsetFetchResponsePartitions 
mkOffsetPartitionResponse(
         int partition,
         long offset,

Reply via email to