This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0d0956b207 KAFKA-17425: Improve coexistence of consumer and share 
groups (#17039)
b0d0956b207 is described below

commit b0d0956b20735575b7d4d870c2838912f4006940
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Sep 3 19:46:15 2024 +0100

    KAFKA-17425: Improve coexistence of consumer and share groups (#17039)
    
    This PR ensures that using the various group RPCs work properly when issued 
against the wrong type of group, such as DescribeConsumerGroups for a share 
group, or ConsumerGroupHeartbeat for a share group. There are no changes to the 
RPC error codes required.
    
    The significant code changes are:
    
    Making sure that the group coordinator does not assume that only classic 
and consumer groups exist. This was the cause of a ClassCastException when 
ConsumerGroupHeartbeat was being used against a share group.
    Making sure that committing offsets to a share group fails with 
GroupIdNotFoundException rather than java.lang.UnsupportedOperation. This was 
the cause of a name collision between a share group and a consumer group when 
using kafka-consumer-groups.sh --reset-offsets which inadvertently created a 
consumer group of the same name.
    
     Reviewers: Manikumar Reddy <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |  37 +++-
 .../coordinator/group/modern/share/ShareGroup.java |   7 +-
 .../group/GroupMetadataManagerTest.java            | 235 +++++++++++++++++++++
 .../group/modern/share/ShareGroupTest.java         |   7 +-
 4 files changed, 272 insertions(+), 14 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 506203b9b83..4816f780ab6 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -790,7 +790,7 @@ public class GroupMetadataManager {
         } else {
             if (group.type() == CONSUMER) {
                 return (ConsumerGroup) group;
-            } else if (createIfNotExists && 
validateOnlineUpgrade((ClassicGroup) group)) {
+            } else if (createIfNotExists && group.type() == CLASSIC && 
validateOnlineUpgrade((ClassicGroup) group)) {
                 return convertToConsumerGroup((ClassicGroup) group, records);
             } else {
                 throw new GroupIdNotFoundException(String.format("Group %s is 
not a consumer group.",
@@ -3863,10 +3863,21 @@ public class GroupMetadataManager {
         CompletableFuture<JoinGroupResponseData> responseFuture
     ) {
         Group group = groups.get(request.groupId(), Long.MAX_VALUE);
-        if (group != null && group.type() == CONSUMER && !group.isEmpty()) {
-            // classicGroupJoinToConsumerGroup takes the join requests to 
non-empty consumer groups.
-            // The empty consumer groups should be converted to classic groups 
in classicGroupJoinToClassicGroup.
-            return classicGroupJoinToConsumerGroup((ConsumerGroup) group, 
context, request, responseFuture);
+        if (group != null) {
+            if (group.type() == CONSUMER && !group.isEmpty()) {
+                // classicGroupJoinToConsumerGroup takes the join requests to 
non-empty consumer groups.
+                // The empty consumer groups should be converted to classic 
groups in classicGroupJoinToClassicGroup.
+                return classicGroupJoinToConsumerGroup((ConsumerGroup) group, 
context, request, responseFuture);
+            } else if (group.type() == CONSUMER || group.type() == CLASSIC) {
+                return classicGroupJoinToClassicGroup(context, request, 
responseFuture);
+            } else {
+                // Group exists but it's not a consumer group
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(UNKNOWN_MEMBER_ID)
+                    .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code())
+                );
+                return EMPTY_RESULT;
+            }
         } else {
             return classicGroupJoinToClassicGroup(context, request, 
responseFuture);
         }
@@ -5087,8 +5098,12 @@ public class GroupMetadataManager {
 
         if (group.type() == CLASSIC) {
             return classicGroupSyncToClassicGroup((ClassicGroup) group, 
context, request, responseFuture);
-        } else {
+        } else if (group.type() == CONSUMER) {
             return classicGroupSyncToConsumerGroup((ConsumerGroup) group, 
context, request, responseFuture);
+        } else {
+            responseFuture.complete(new SyncGroupResponseData()
+                .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+            return EMPTY_RESULT;
         }
     }
 
@@ -5355,8 +5370,12 @@ public class GroupMetadataManager {
 
         if (group.type() == CLASSIC) {
             return classicGroupHeartbeatToClassicGroup((ClassicGroup) group, 
context, request);
-        } else {
+        } else if (group.type() == CONSUMER) {
             return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group, 
context, request);
+        } else {
+            throw new UnknownMemberIdException(
+                String.format("Group %s not found.", request.groupId())
+            );
         }
     }
 
@@ -5536,8 +5555,10 @@ public class GroupMetadataManager {
 
         if (group.type() == CLASSIC) {
             return classicGroupLeaveToClassicGroup((ClassicGroup) group, 
context, request);
-        } else {
+        } else if (group.type() == CONSUMER) {
             return classicGroupLeaveToConsumerGroup((ConsumerGroup) group, 
context, request);
+        } else {
+            throw new UnknownMemberIdException(String.format("Group %s not 
found.", request.groupId()));
         }
     }
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index f54b6559b91..b7c0c433842 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.modern.share;
 
 import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.protocol.Errors;
@@ -184,7 +185,7 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         boolean isTransactional,
         short apiVersion
     ) {
-        throw new UnsupportedOperationException("validateOffsetCommit is not 
supported for Share Groups.");
+        throw new GroupIdNotFoundException(String.format("Group %s is not a 
consumer group.", groupId));
     }
 
     @Override
@@ -193,12 +194,12 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         int memberEpoch,
         long lastCommittedOffset
     ) {
-        throw new UnsupportedOperationException("validateOffsetFetch is not 
supported for Share Groups.");
+        throw new GroupIdNotFoundException(String.format("Group %s is not a 
consumer group.", groupId));
     }
 
     @Override
     public void validateOffsetDelete() {
-        throw new UnsupportedOperationException("validateOffsetDelete is not 
supported for Share Groups.");
+        throw new GroupIdNotFoundException(String.format("Group %s is not a 
consumer group.", groupId));
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b500bf57add..22ae88aaf53 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -14285,6 +14285,241 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testConsumerGroupHeartbeatOnShareGroup() {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(MetadataImage.EMPTY)
+            .withShareGroup(new ShareGroupBuilder(groupId, 1)
+                .withMember(new ShareGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .build())
+                .withAssignment(memberId, mkAssignment())
+                .withAssignmentEpoch(1))
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () -> 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberEpoch(0)
+                .setServerAssignor("range")
+                .setRebalanceTimeoutMs(5000)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList())));
+    }
+
+    @Test
+    public void testClassicGroupJoinOnShareGroup() throws Exception {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(MetadataImage.EMPTY)
+            .withShareGroup(new ShareGroupBuilder(groupId, 1)
+                .withMember(new ShareGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .build())
+                .withAssignment(memberId, mkAssignment())
+                .withAssignmentEpoch(1))
+            .build();
+
+        JoinGroupRequestData request = new 
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withProtocolType("consumer")
+            .withProtocols(new JoinGroupRequestProtocolCollection(0))
+            .build();
+
+        GroupMetadataManagerTestContext.JoinResult joinResult = 
context.sendClassicGroupJoin(request);
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(), 
joinResult.joinFuture.get().errorCode());
+    }
+
+    @Test
+    public void testClassicGroupSyncOnShareGroup() throws Exception {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(MetadataImage.EMPTY)
+            .withShareGroup(new ShareGroupBuilder(groupId, 1)
+                .withMember(new ShareGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .build())
+                .withAssignment(memberId, mkAssignment())
+                .withAssignmentEpoch(1))
+            .build();
+
+        SyncGroupRequestData request = new 
GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+            .withGroupId(groupId)
+            .withGenerationId(1)
+            .withMemberId(memberId)
+            .build();
+
+        GroupMetadataManagerTestContext.SyncResult syncResult = 
context.sendClassicGroupSync(request);
+
+        assertTrue(syncResult.records.isEmpty());
+        assertTrue(syncResult.syncFuture.isDone());
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
syncResult.syncFuture.get().errorCode());
+    }
+
+    @Test
+    public void testClassicGroupLeaveOnShareGroup() throws Exception {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(MetadataImage.EMPTY)
+                .withShareGroup(new ShareGroupBuilder(groupId, 1)
+                    .withMember(new ShareGroupMember.Builder(memberId)
+                        .setState(MemberState.STABLE)
+                        .setMemberEpoch(1)
+                        .setPreviousMemberEpoch(0)
+                        .setClientId(DEFAULT_CLIENT_ID)
+                        .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                        
.setSubscribedTopicNames(Collections.singletonList("foo"))
+                        .build())
+                .withAssignment(memberId, mkAssignment())
+                .withAssignmentEpoch(1))
+                .build();
+
+        assertThrows(UnknownMemberIdException.class, () -> 
context.sendClassicGroupLeave(
+            new LeaveGroupRequestData()
+            .setGroupId(groupId)
+            .setMembers(Collections.singletonList(
+                new MemberIdentity()
+                    .setMemberId(memberId)))));
+    }
+
+    @Test
+    public void testConsumerGroupDescribeOnShareGroup() {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(MetadataImage.EMPTY)
+            .withShareGroup(new ShareGroupBuilder(groupId, 1)
+                .withMember(new ShareGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(1)
+                    .setPreviousMemberEpoch(0)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .build())
+                .withAssignment(memberId, mkAssignment())
+                .withAssignmentEpoch(1))
+            .build();
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+        );
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Collections.singletonList(groupId));
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testShareGroupHeartbeatOnConsumerGroup() {
+        String groupId = "group-foo";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with one static member.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setState(MemberState.STABLE)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assertThrows(GroupIdNotFoundException.class, () ->
+            context.shareGroupHeartbeat(
+                new ShareGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(Uuid.randomUuid().toString())
+                    .setMemberEpoch(1)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+    }
+
+    @Test
+    public void testShareGroupDescribeOnConsumerGroup() {
+        String groupId = "group-foo";
+        String memberId = Uuid.randomUuid().toString();
+
+        int epoch = 10;
+        String topicName = "topicName";
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName))
+            .setServerAssignorName("assignorName");
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withConsumerGroupAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch)
+                .withMember(memberBuilder.build()))
+            .build();
+
+        List<ShareGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            new ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+        );
+
+        List<ShareGroupDescribeResponseData.DescribedGroup> actual = 
context.sendShareGroupDescribe(Collections.singletonList(groupId));
+        assertEquals(expected, actual);
+    }
+
     private static void checkJoinGroupResponse(
         JoinGroupResponseData expectedResponse,
         JoinGroupResponseData actualResponse,
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 0d84d0c0da4..e53a9dac910 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.coordinator.group.modern.share;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
@@ -551,7 +552,7 @@ public class ShareGroupTest {
     @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
     public void testValidateOffsetCommit(short version) {
         ShareGroup shareGroup = createShareGroup("group-foo");
-        assertThrows(UnsupportedOperationException.class, () ->
+        assertThrows(GroupIdNotFoundException.class, () ->
             shareGroup.validateOffsetCommit(null, null, -1, false, version));
     }
 
@@ -581,14 +582,14 @@ public class ShareGroupTest {
     @Test
     public void testValidateOffsetFetch() {
         ShareGroup shareGroup = createShareGroup("group-foo");
-        assertThrows(UnsupportedOperationException.class, () ->
+        assertThrows(GroupIdNotFoundException.class, () ->
             shareGroup.validateOffsetFetch(null, -1, -1));
     }
 
     @Test
     public void testValidateOffsetDelete() {
         ShareGroup shareGroup = createShareGroup("group-foo");
-        assertThrows(UnsupportedOperationException.class, 
shareGroup::validateOffsetDelete);
+        assertThrows(GroupIdNotFoundException.class, 
shareGroup::validateOffsetDelete);
     }
 
     @Test

Reply via email to