This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 1293658ccaa KAFKA-19163: Avoid deleting groups with pending transactional offsets (#19496) 1293658ccaa is described below commit 1293658ccaa5ada0f1e12ae78cb395bf81f43be0 Author: Sean Quah <sq...@confluent.io> AuthorDate: Tue May 13 13:10:26 2025 +0100 KAFKA-19163: Avoid deleting groups with pending transactional offsets (#19496) When a group has pending transactional offsets but no committed offsets, we can accidentally delete it while cleaning up expired offsets. Add a check to avoid this case. Reviewers: David Jacot <dja...@confluent.io> --- .../coordinator/group/OffsetMetadataManager.java | 5 +-- .../group/OffsetMetadataManagerTest.java | 36 ++++++++++++++++++++++ 2 files changed, 39 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java index 54de475d4bf..f946af2ff62 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java @@ -1000,13 +1000,14 @@ public class OffsetMetadataManager { * @param groupId The group id. * @param records The list of records to populate with offset commit tombstone records. * - * @return True if no offsets exist or if all offsets expired, false otherwise. + * @return True if no offsets exist after expiry and no pending transactional offsets exist, + * false otherwise. */ public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> records) { TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> offsetsByTopic = offsets.offsetsByGroup.get(groupId); if (offsetsByTopic == null) { - return true; + return !openTransactions.contains(groupId); } // We expect the group to exist. diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java index 382b2a9b0e5..d0925bb4c21 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java @@ -2662,6 +2662,42 @@ public class OffsetMetadataManagerTest { assertEquals(List.of(), records); } + @Test + public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() { + GroupMetadataManager groupMetadataManager = mock(GroupMetadataManager.class); + Group group = mock(Group.class); + + OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder() + .withGroupMetadataManager(groupMetadataManager) + .withOffsetsRetentionMinutes(1) + .build(); + + long commitTimestamp = context.time.milliseconds(); + + context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp); + context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500); + + context.time.sleep(Duration.ofMinutes(1).toMillis()); + + when(groupMetadataManager.group("group-id")).thenReturn(group); + when(group.offsetExpirationCondition()).thenReturn(Optional.of( + new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs))); + when(group.isSubscribedToTopic("foo")).thenReturn(false); + + // foo-0 is expired, but the group is not deleted beacuse it has pending transactional offset commits. + List<CoordinatorRecord> expectedRecords = List.of( + GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 0) + ); + List<CoordinatorRecord> records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(expectedRecords, records); + + // No offsets are expired, and the group is still not deleted because it has pending transactional offset commits. + records = new ArrayList<>(); + assertFalse(context.cleanupExpiredOffsets("group-id", records)); + assertEquals(List.of(), records); + } + private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse( int partition, long offset,