This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55d1e3823b7 KAFKA-20254: Fix streams group creation failing when 
simple classic group exists (#21641)
55d1e3823b7 is described below

commit 55d1e3823b76590649cbe584cb906e330ca59fcc
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Mar 6 10:36:21 2026 +0100

    KAFKA-20254: Fix streams group creation failing when simple classic group 
exists (#21641)
    
    During replay of the __consumer_offsets topic, offset commit records can
    appear before streams group records, for example after log compaction.
    When OffsetMetadataManager replays an offset commit for a group that
    doesn't exist yet, it automatically creates a simple ClassicGroup to
    hold the offsets. When the streams group records are subsequently
    replayed, getOrMaybeCreatePersistedStreamsGroup fails with "Group X is
    not a streams group" because it does not handle simple classic groups.
    
    This adds handling for simple classic groups in
    getOrMaybeCreatePersistedStreamsGroup, matching the existing pattern in
    getOrMaybeCreatePersistedConsumerGroup. Simple classic groups have no
    backing records in __consumer_offsets and can safely be replaced.
    
    Reviewers: Matthias J. Sax <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  9 ++++++++
 .../group/GroupMetadataManagerTest.java            | 27 ++++++++++++++++++++++
 2 files changed, 36 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4494a666aef..fa45d1c6e8a 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -1020,6 +1020,15 @@ public class GroupMetadataManager {
             return streamsGroup;
         } else if (group.type() == STREAMS) {
             return (StreamsGroup) group;
+        } else if (group.type() == CLASSIC && ((ClassicGroup) 
group).isSimpleGroup()) {
+            // If the group is a simple classic group, it was automatically 
created to hold committed
+            // offsets when no group-metadata-backed group existed. Simple 
classic groups do not have
+            // any GroupMetadataKey/Value records in the __consumer_offsets 
topic, only offset commit
+            // records, so the in-memory group can be safely replaced here. 
Without this, replaying
+            // streams group records after offset commit records would not 
work.
+            StreamsGroup streamsGroup = new StreamsGroup(logContext, 
snapshotRegistry, groupId);
+            groups.put(groupId, streamsGroup);
+            return streamsGroup;
         } else {
             // We don't support upgrading/downgrading between protocols at the 
moment, so
             // we throw an exception if a group exists with the wrong type.
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index a094c2dc356..91147aed051 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20703,6 +20703,33 @@ public class GroupMetadataManagerTest {
         assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("member"));
     }
 
+    @Test
+    public void testReplayStreamsGroupMemberMetadataWithSimpleClassicGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        // A simple classic group is created when replaying offset commits 
without a group.
+        // This simulates the scenario where offset commit records are 
replayed before streams
+        // group records after log compaction has cleaned up the group 
metadata tombstone.
+        context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
+
+        StreamsGroupMember member = new StreamsGroupMember.Builder("member")
+            .setClientId("clientid")
+            .setClientHost("clienthost")
+            .setRackId("rackid")
+            .setInstanceId("instanceid")
+            .setRebalanceTimeoutMs(1000)
+            .setTopologyEpoch(10)
+            .setProcessId("processid")
+            .setUserEndpoint(new Endpoint().setHost("localhost").setPort(9999))
+            .setClientTags(Map.of("key", "value"))
+            .build();
+
+        // The simple classic group should be replaced by a streams group.
+        
context.replay(StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord("foo",
 member));
+        assertEquals(member, 
context.groupMetadataManager.streamsGroup("foo").getMemberOrThrow("member"));
+    }
+
     @Test
     public void testReplayStreamsGroupMemberMetadataTombstoneNotExisting() {
         GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()

Reply via email to