This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f621a635c15 KAFKA-19570: Implement offline migration for streams 
groups (#20288)
f621a635c15 is described below

commit f621a635c1546246b80ae5d4624241aa4b58fd39
Author: Lucas Brutschy <lbruts...@confluent.io>
AuthorDate: Tue Aug 26 10:05:30 2025 +0200

    KAFKA-19570: Implement offline migration for streams groups (#20288)
    
    Offline migration essentially preserves offsets and nothing else. So
    effectively write tombstones for classic group type when a streams
    heartbeat is sent to with the group ID of an empty classic group, and
    write tombstones for the streams group type when a classic consumer
    attempts to join with a group ID of an empty streams group.
    
    Reviewers: Bill Bejeck <bbej...@apache.org>, Sean Quah
     <sq...@confluent.io>, Dongnuo Lyu <d...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.java    |  48 ++++++-
 .../group/GroupMetadataManagerTest.java            | 150 +++++++++++++++++++++
 2 files changed, 195 insertions(+), 3 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 44380427284..f87af4897a7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -833,19 +833,28 @@ public class GroupMetadataManager {
      * Gets or creates a streams group without updating the groups map.
      * The group will be materialized during the replay.
      *
+     * If there is an empty classic consumer group of the same name, it will 
be deleted and a new streams
+     * group will be created.
+     *
      * @param groupId           The group ID.
+     * @param records           The record list to which the group tombstones 
are written
+     *                          if the group is empty and is a classic group.
      *
      * @return A StreamsGroup.
      *
      * Package private for testing.
      */
     StreamsGroup getOrCreateStreamsGroup(
-        String groupId
+        String groupId,
+        List<CoordinatorRecord> records
     ) {
         Group group = groups.get(groupId);
 
         if (group == null) {
             return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
+        } else if (maybeDeleteEmptyClassicGroup(group, records)) {
+            log.info("[GroupId {}] Converted the empty classic group to a 
streams group.", groupId);
+            return new StreamsGroup(logContext, snapshotRegistry, groupId, 
metrics);
         } else {
             return castToStreamsGroup(group);
         }
@@ -1871,7 +1880,7 @@ public class GroupMetadataManager {
         boolean isJoining = memberEpoch == 0;
         StreamsGroup group;
         if (isJoining) {
-            group = getOrCreateStreamsGroup(groupId);
+            group = getOrCreateStreamsGroup(groupId, records);
             throwIfStreamsGroupIsFull(group);
         } else {
             group = getStreamsGroupOrThrow(groupId);
@@ -6066,7 +6075,11 @@ public class GroupMetadataManager {
                 // classicGroupJoinToConsumerGroup takes the join requests to 
non-empty consumer groups.
                 // The empty consumer groups should be converted to classic 
groups in classicGroupJoinToClassicGroup.
                 return classicGroupJoinToConsumerGroup((ConsumerGroup) group, 
context, request, responseFuture);
-            } else if (group.type() == CONSUMER || group.type() == CLASSIC) {
+            } else if (group.type() == CONSUMER || group.type() == CLASSIC || 
group.type() == STREAMS && group.isEmpty()) {
+                // classicGroupJoinToClassicGroup accepts:
+                // - classic groups
+                // - empty streams groups
+                // - empty consumer groups
                 return classicGroupJoinToClassicGroup(context, request, 
responseFuture);
             } else {
                 // Group exists but it's not a consumer group
@@ -6107,6 +6120,8 @@ public class GroupMetadataManager {
         ClassicGroup group;
         if (maybeDeleteEmptyConsumerGroup(groupId, records)) {
             log.info("[GroupId {}] Converted the empty consumer group to a 
classic group.", groupId);
+        } else if (maybeDeleteEmptyStreamsGroup(groupId, records)) {
+            log.info("[GroupId {}] Converted the empty streams group to a 
classic group.", groupId);
         }
         boolean isNewGroup = !groups.containsKey(groupId);
         try {
@@ -8398,6 +8413,13 @@ public class GroupMetadataManager {
         return group != null && group.type() == CONSUMER && group.isEmpty();
     }
 
+    /**
+     * @return true if the group is an empty streams group.
+     */
+    private static boolean isEmptyStreamsGroup(Group group) {
+        return group != null && group.type() == STREAMS && group.isEmpty();
+    }
+
     /**
      * Write tombstones for the group if it's empty and is a classic group.
      *
@@ -8435,6 +8457,26 @@ public class GroupMetadataManager {
         }
         return false;
     }
+    
+    /**
+     * Delete and write tombstones for the group if it's empty and is a 
streams group.
+     *
+     * @param groupId The group id to be deleted.
+     * @param records The list of records to delete the group.
+     *
+     * @return true if the group is an empty streams group.
+     */
+    private boolean maybeDeleteEmptyStreamsGroup(String groupId, 
List<CoordinatorRecord> records) {
+        Group group = groups.get(groupId, Long.MAX_VALUE);
+        if (isEmptyStreamsGroup(group)) {
+            // Add tombstones for the previous streams group. The tombstones 
won't actually be
+            // replayed because its coordinator result has a non-null 
appendFuture.
+            createGroupTombstoneRecords(group, records);
+            removeGroup(groupId);
+            return true;
+        }
+        return false;
+    }
 
     /**
      * Checks whether the given protocol type or name in the request is 
inconsistent with the group's.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3f1e49b955d..efe2ad96435 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -18633,6 +18633,156 @@ public class GroupMetadataManagerTest {
         assertNull(result.response().data().partitionsByUserEndpoint());
     }
 
+    @Test
+    public void testStreamsGroupHeartbeatWithNonEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+
+        String subtopology1 = "subtopology1";
+        String fooTopicName = "foo";
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time
+        );
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
 classicGroup.groupAssignment()));
+
+        
context.groupMetadataManager.getOrMaybeCreateClassicGroup(classicGroupId, 
false).transitionTo(PREPARING_REBALANCE);
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.streamsGroupHeartbeat(
+                new StreamsGroupHeartbeatRequestData()
+                    .setGroupId(classicGroupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(12000)
+                    .setTopology(topology)
+                    .setActiveTasks(List.of())
+                    .setStandbyTasks(List.of())
+                    .setWarmupTasks(List.of())));
+    }
+
+    @Test
+    public void testStreamsGroupHeartbeatWithEmptyClassicGroup() {
+        String classicGroupId = "classic-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        String fooTopicName = "foo";
+        String subtopology1 = "subtopology1";
+        Topology topology = new Topology().setSubtopologies(List.of(
+            new 
Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName))
+        ));
+
+        MockTaskAssignor assignor = new MockTaskAssignor("sticky");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroupTaskAssignors(List.of(assignor))
+            .build();
+        ClassicGroup classicGroup = new ClassicGroup(
+            new LogContext(),
+            classicGroupId,
+            EMPTY,
+            context.time
+        );
+        
context.replay(GroupCoordinatorRecordHelpers.newGroupMetadataRecord(classicGroup,
 classicGroup.groupAssignment()));
+
+        CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> 
result = context.streamsGroupHeartbeat(
+            new StreamsGroupHeartbeatRequestData()
+                .setGroupId(classicGroupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(12000)
+                .setTopology(topology)
+                .setActiveTasks(List.of())
+                .setStandbyTasks(List.of())
+                .setWarmupTasks(List.of()));
+
+        StreamsGroupMember expectedMember = 
StreamsGroupMember.Builder.withDefaults(memberId)
+            
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+            .setMemberEpoch(1)
+            .setPreviousMemberEpoch(0)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setAssignedTasks(TasksTuple.EMPTY)
+            .setTasksPendingRevocation(TasksTuple.EMPTY)
+            .setRebalanceTimeoutMs(12000)
+            .setTopologyEpoch(0)
+            .build();
+
+        assertEquals(Errors.NONE.code(), result.response().data().errorCode());
+        assertEquals(
+            List.of(
+                
GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, 
expectedMember),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, 
topology),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(classicGroupId, 1, 
0),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId,
 memberId, TasksTuple.EMPTY),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId,
 1),
+                
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId,
 expectedMember)
+            ),
+            result.records()
+        );
+        assertEquals(
+            Group.GroupType.STREAMS,
+            context.groupMetadataManager.streamsGroup(classicGroupId).type()
+        );
+    }
+
+    @Test
+    public void testClassicGroupJoinWithEmptyStreamsGroup() throws Exception {
+        String streamsGroupId = "streams-group-id";
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(streamsGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request, true);
+
+        List<CoordinatorRecord> expectedRecords = List.of(
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochTombstoneRecord(streamsGroupId),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochTombstoneRecord(streamsGroupId),
+            
StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecordTombstone(streamsGroupId)
+        );
+
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
joinResult.joinFuture.get().errorCode());
+        assertEquals(expectedRecords, joinResult.records.subList(0, 
expectedRecords.size()));
+        assertEquals(
+            Group.GroupType.CLASSIC,
+            
context.groupMetadataManager.getOrMaybeCreateClassicGroup(streamsGroupId, 
false).type()
+        );
+    }
+
+    @Test
+    public void testClassicGroupJoinWithNonEmptyStreamsGroup() throws 
Exception {
+        String streamsGroupId = "streams-group-id";
+        String memberId = Uuid.randomUuid().toString();
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withStreamsGroup(new StreamsGroupBuilder(streamsGroupId, 10)
+                .withMember(StreamsGroupMember.Builder.withDefaults(memberId)
+                    
.setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .build()))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(streamsGroupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
joinResult.joinFuture.get().errorCode());
+    }
+
     @Test
     public void testConsumerGroupDynamicConfigs() {
         String groupId = "fooup";

Reply via email to