This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0d0956b207 KAFKA-17425: Improve coexistence of consumer and share
groups (#17039)
b0d0956b207 is described below
commit b0d0956b20735575b7d4d870c2838912f4006940
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Sep 3 19:46:15 2024 +0100
KAFKA-17425: Improve coexistence of consumer and share groups (#17039)
This PR ensures that using the various group RPCs work properly when issued
against the wrong type of group, such as DescribeConsumerGroups for a share
group, or ConsumerGroupHeartbeat for a share group. There are no changes to the
RPC error codes required.
The significant code changes are:
Making sure that the group coordinator does not assume that only classic
and consumer groups exist. This was the cause of a ClassCastException when
ConsumerGroupHeartbeat was being used against a share group.
Making sure that committing offsets to a share group fails with
GroupIdNotFoundException rather than java.lang.UnsupportedOperation. This was
the cause of a name collision between a share group and a consumer group when
using kafka-consumer-groups.sh --reset-offsets which inadvertently created a
consumer group of the same name.
Reviewers: Manikumar Reddy <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 37 +++-
.../coordinator/group/modern/share/ShareGroup.java | 7 +-
.../group/GroupMetadataManagerTest.java | 235 +++++++++++++++++++++
.../group/modern/share/ShareGroupTest.java | 7 +-
4 files changed, 272 insertions(+), 14 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 506203b9b83..4816f780ab6 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -790,7 +790,7 @@ public class GroupMetadataManager {
} else {
if (group.type() == CONSUMER) {
return (ConsumerGroup) group;
- } else if (createIfNotExists &&
validateOnlineUpgrade((ClassicGroup) group)) {
+ } else if (createIfNotExists && group.type() == CLASSIC &&
validateOnlineUpgrade((ClassicGroup) group)) {
return convertToConsumerGroup((ClassicGroup) group, records);
} else {
throw new GroupIdNotFoundException(String.format("Group %s is
not a consumer group.",
@@ -3863,10 +3863,21 @@ public class GroupMetadataManager {
CompletableFuture<JoinGroupResponseData> responseFuture
) {
Group group = groups.get(request.groupId(), Long.MAX_VALUE);
- if (group != null && group.type() == CONSUMER && !group.isEmpty()) {
- // classicGroupJoinToConsumerGroup takes the join requests to
non-empty consumer groups.
- // The empty consumer groups should be converted to classic groups
in classicGroupJoinToClassicGroup.
- return classicGroupJoinToConsumerGroup((ConsumerGroup) group,
context, request, responseFuture);
+ if (group != null) {
+ if (group.type() == CONSUMER && !group.isEmpty()) {
+ // classicGroupJoinToConsumerGroup takes the join requests to
non-empty consumer groups.
+ // The empty consumer groups should be converted to classic
groups in classicGroupJoinToClassicGroup.
+ return classicGroupJoinToConsumerGroup((ConsumerGroup) group,
context, request, responseFuture);
+ } else if (group.type() == CONSUMER || group.type() == CLASSIC) {
+ return classicGroupJoinToClassicGroup(context, request,
responseFuture);
+ } else {
+ // Group exists but it's not a consumer group
+ responseFuture.complete(new JoinGroupResponseData()
+ .setMemberId(UNKNOWN_MEMBER_ID)
+ .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code())
+ );
+ return EMPTY_RESULT;
+ }
} else {
return classicGroupJoinToClassicGroup(context, request,
responseFuture);
}
@@ -5087,8 +5098,12 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) {
return classicGroupSyncToClassicGroup((ClassicGroup) group,
context, request, responseFuture);
- } else {
+ } else if (group.type() == CONSUMER) {
return classicGroupSyncToConsumerGroup((ConsumerGroup) group,
context, request, responseFuture);
+ } else {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+ return EMPTY_RESULT;
}
}
@@ -5355,8 +5370,12 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) {
return classicGroupHeartbeatToClassicGroup((ClassicGroup) group,
context, request);
- } else {
+ } else if (group.type() == CONSUMER) {
return classicGroupHeartbeatToConsumerGroup((ConsumerGroup) group,
context, request);
+ } else {
+ throw new UnknownMemberIdException(
+ String.format("Group %s not found.", request.groupId())
+ );
}
}
@@ -5536,8 +5555,10 @@ public class GroupMetadataManager {
if (group.type() == CLASSIC) {
return classicGroupLeaveToClassicGroup((ClassicGroup) group,
context, request);
- } else {
+ } else if (group.type() == CONSUMER) {
return classicGroupLeaveToConsumerGroup((ConsumerGroup) group,
context, request);
+ } else {
+ throw new UnknownMemberIdException(String.format("Group %s not
found.", request.groupId()));
}
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index f54b6559b91..b7c0c433842 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.protocol.Errors;
@@ -184,7 +185,7 @@ public class ShareGroup extends
ModernGroup<ShareGroupMember> {
boolean isTransactional,
short apiVersion
) {
- throw new UnsupportedOperationException("validateOffsetCommit is not
supported for Share Groups.");
+ throw new GroupIdNotFoundException(String.format("Group %s is not a
consumer group.", groupId));
}
@Override
@@ -193,12 +194,12 @@ public class ShareGroup extends
ModernGroup<ShareGroupMember> {
int memberEpoch,
long lastCommittedOffset
) {
- throw new UnsupportedOperationException("validateOffsetFetch is not
supported for Share Groups.");
+ throw new GroupIdNotFoundException(String.format("Group %s is not a
consumer group.", groupId));
}
@Override
public void validateOffsetDelete() {
- throw new UnsupportedOperationException("validateOffsetDelete is not
supported for Share Groups.");
+ throw new GroupIdNotFoundException(String.format("Group %s is not a
consumer group.", groupId));
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b500bf57add..22ae88aaf53 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -14285,6 +14285,241 @@ public class GroupMetadataManagerTest {
context.assertNoRebalanceTimeout(groupId, memberId);
}
+ @Test
+ public void testConsumerGroupHeartbeatOnShareGroup() {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(MetadataImage.EMPTY)
+ .withShareGroup(new ShareGroupBuilder(groupId, 1)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build())
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(1))
+ .build();
+
+ assertThrows(GroupIdNotFoundException.class, () ->
context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberEpoch(0)
+ .setServerAssignor("range")
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setTopicPartitions(Collections.emptyList())));
+ }
+
+ @Test
+ public void testClassicGroupJoinOnShareGroup() throws Exception {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(MetadataImage.EMPTY)
+ .withShareGroup(new ShareGroupBuilder(groupId, 1)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build())
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(1))
+ .build();
+
+ JoinGroupRequestData request = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType("consumer")
+ .withProtocols(new JoinGroupRequestProtocolCollection(0))
+ .build();
+
+ GroupMetadataManagerTestContext.JoinResult joinResult =
context.sendClassicGroupJoin(request);
+ assertTrue(joinResult.joinFuture.isDone());
+ assertEquals(Errors.INCONSISTENT_GROUP_PROTOCOL.code(),
joinResult.joinFuture.get().errorCode());
+ }
+
+ @Test
+ public void testClassicGroupSyncOnShareGroup() throws Exception {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(MetadataImage.EMPTY)
+ .withShareGroup(new ShareGroupBuilder(groupId, 1)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build())
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(1))
+ .build();
+
+ SyncGroupRequestData request = new
GroupMetadataManagerTestContext.SyncGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withGenerationId(1)
+ .withMemberId(memberId)
+ .build();
+
+ GroupMetadataManagerTestContext.SyncResult syncResult =
context.sendClassicGroupSync(request);
+
+ assertTrue(syncResult.records.isEmpty());
+ assertTrue(syncResult.syncFuture.isDone());
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code(),
syncResult.syncFuture.get().errorCode());
+ }
+
+ @Test
+ public void testClassicGroupLeaveOnShareGroup() throws Exception {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(MetadataImage.EMPTY)
+ .withShareGroup(new ShareGroupBuilder(groupId, 1)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+
.setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build())
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(1))
+ .build();
+
+ assertThrows(UnknownMemberIdException.class, () ->
context.sendClassicGroupLeave(
+ new LeaveGroupRequestData()
+ .setGroupId(groupId)
+ .setMembers(Collections.singletonList(
+ new MemberIdentity()
+ .setMemberId(memberId)))));
+ }
+
+ @Test
+ public void testConsumerGroupDescribeOnShareGroup() {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("share");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .withMetadataImage(MetadataImage.EMPTY)
+ .withShareGroup(new ShareGroupBuilder(groupId, 1)
+ .withMember(new ShareGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(1)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build())
+ .withAssignment(memberId, mkAssignment())
+ .withAssignmentEpoch(1))
+ .build();
+
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> expected =
Collections.singletonList(
+ new ConsumerGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ );
+
+ List<ConsumerGroupDescribeResponseData.DescribedGroup> actual =
context.sendConsumerGroupDescribe(Collections.singletonList(groupId));
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testShareGroupHeartbeatOnConsumerGroup() {
+ String groupId = "group-foo";
+ // Use a static member id as it makes the test easier.
+ String memberId1 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+ // Consumer group with one static member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .withMetadataImage(new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build())
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setInstanceId(memberId1)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(9)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ assertThrows(GroupIdNotFoundException.class, () ->
+ context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(Uuid.randomUuid().toString())
+ .setMemberEpoch(1)
+ .setSubscribedTopicNames(Arrays.asList("foo", "bar"))));
+ }
+
+ @Test
+ public void testShareGroupDescribeOnConsumerGroup() {
+ String groupId = "group-foo";
+ String memberId = Uuid.randomUuid().toString();
+
+ int epoch = 10;
+ String topicName = "topicName";
+ ConsumerGroupMember.Builder memberBuilder = new
ConsumerGroupMember.Builder(memberId)
+ .setSubscribedTopicNames(Collections.singletonList(topicName))
+ .setServerAssignorName("assignorName");
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withConsumerGroupAssignors(Collections.singletonList(assignor))
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, epoch)
+ .withMember(memberBuilder.build()))
+ .build();
+
+ List<ShareGroupDescribeResponseData.DescribedGroup> expected =
Collections.singletonList(
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ );
+
+ List<ShareGroupDescribeResponseData.DescribedGroup> actual =
context.sendShareGroupDescribe(Collections.singletonList(groupId));
+ assertEquals(expected, actual);
+ }
+
private static void checkJoinGroupResponse(
JoinGroupResponseData expectedResponse,
JoinGroupResponseData actualResponse,
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
index 0d84d0c0da4..e53a9dac910 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/share/ShareGroupTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.modern.share;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
@@ -551,7 +552,7 @@ public class ShareGroupTest {
@ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
public void testValidateOffsetCommit(short version) {
ShareGroup shareGroup = createShareGroup("group-foo");
- assertThrows(UnsupportedOperationException.class, () ->
+ assertThrows(GroupIdNotFoundException.class, () ->
shareGroup.validateOffsetCommit(null, null, -1, false, version));
}
@@ -581,14 +582,14 @@ public class ShareGroupTest {
@Test
public void testValidateOffsetFetch() {
ShareGroup shareGroup = createShareGroup("group-foo");
- assertThrows(UnsupportedOperationException.class, () ->
+ assertThrows(GroupIdNotFoundException.class, () ->
shareGroup.validateOffsetFetch(null, -1, -1));
}
@Test
public void testValidateOffsetDelete() {
ShareGroup shareGroup = createShareGroup("group-foo");
- assertThrows(UnsupportedOperationException.class,
shareGroup::validateOffsetDelete);
+ assertThrows(GroupIdNotFoundException.class,
shareGroup::validateOffsetDelete);
}
@Test