This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f4dbf7fd76 KAFKA-19720: Regex subscription should be empty for
classic members joining mixed group (#20626)
0f4dbf7fd76 is described below
commit 0f4dbf7fd76d0fb29fec1e3588399d67628d278f
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Sat Nov 15 06:02:58 2025 -0500
KAFKA-19720: Regex subscription should be empty for classic members joining
mixed group (#20626)
We don't recompute the assignment on consumer -> classic member
replacement when the consumer member had a regex subscription and the
classic member does not.
This PR sets regex subscription to empty during the replacement.
Reviewers: Sean Quah <[email protected]>, David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 15 +-
.../group/GroupMetadataManagerTest.java | 297 +++++++++++++++++++++
2 files changed, 311 insertions(+), 1 deletion(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index f31d2c9a06d..946fe7f0f24 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -2543,6 +2543,7 @@ public class GroupMetadataManager {
.maybeUpdateRebalanceTimeoutMs(ofSentinel(request.rebalanceTimeoutMs()))
.maybeUpdateServerAssignorName(Optional.empty())
.maybeUpdateSubscribedTopicNames(Optional.ofNullable(subscription.topics()))
+ .setSubscribedTopicRegex("") // Regex subscription is not
supported for classic member.
.setClientId(context.clientId())
.setClientHost(context.clientAddress().toString())
.setClassicMemberMetadata(
@@ -2551,13 +2552,25 @@ public class GroupMetadataManager {
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(protocols)))
.build();
- boolean bumpGroupEpoch = hasMemberSubscriptionChanged(
+ boolean hasMemberSubscriptionChanged = hasMemberSubscriptionChanged(
groupId,
member,
updatedMember,
records
);
+ // Maybe create tombstone for the regex if the joining member replaces
a static member
+ // with regex subscription.
+ UpdateRegularExpressionsResult updateRegularExpressionsResult =
maybeUpdateRegularExpressions(
+ context,
+ group,
+ member,
+ updatedMember,
+ records
+ );
+
+ boolean bumpGroupEpoch = hasMemberSubscriptionChanged ||
updateRegularExpressionsResult.regexUpdated();
+
if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6f0a4048de1..ef90ba0b631 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -21486,6 +21486,303 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
+ String groupId = "fooup";
+ String instanceId = "instance-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+ Uuid barTopicId = Uuid.randomUuid();
+ String barTopicName = "bar";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .addTopic(barTopicId, barTopicName, 1)
+ .buildCoordinatorMetadataImage(12345L);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+
+ // Member 1 is a static member with both regex and topic name
subscription
+ // Member 2 uses topic name subscription.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("bar*")
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withResolvedRegularExpression("bar*", new
ResolvedRegularExpression(
+ Set.of(barTopicName), 0L, 0L)))
+ .build();
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
+
+ // Member 1 is replaced by a classic member with the same instance id.
+ JoinGroupRequestProtocolCollection joinProtocols = new
JoinGroupRequestProtocolCollection();
+ joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of(fooTopicName)
+ ))))
+ );
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withRebalanceTimeoutMs(5000)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocols(joinProtocols)
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+
+ ConsumerGroupMember newMember1 = group.staticMember(instanceId);
+
+ ConsumerGroupMember expectedCopiedMember = new
ConsumerGroupMember.Builder(newMember1.memberId())
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("bar*") // Still uses regex subscription.
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2),
+ mkTopicAssignment(barTopicId, 0)))
+ .build();
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(newMember1.memberId())
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("") // empty regex subscription
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(Map.of()) // empty assignment
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(500)
+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
+ )
+ .build();
+
+ List<List<CoordinatorRecord>> expectedRecords = List.of(
+ // The previous member is deleted.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1)),
+ // The previous member is replaced by the new one.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedCopiedMember)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedCopiedMember)),
+ // The member subscription is updated.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1)),
+ // The regex is tombstoned.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"bar*")),
+ // The group epoch is bumped.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ )))),
+ // The target assignment is updated.
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedMember1.memberId(), Map.of()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Map.of())
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+ // The member assignment is updated.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
+ );
+
+ assertUnorderedRecordsEquals(
+ expectedRecords,
+ result.records
+ );
+ }
+
+ @Test
+ public void
testConsumerMemberWithRegexReplacedByClassicMemberWithChangedSubscription() {
+ String groupId = "fooup";
+ String instanceId = "instance-id";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .buildCoordinatorMetadataImage(12345L);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+
+ // Member 1 is a static member with regex subscription and
+ // Member 2 uses topic name subscription.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setInstanceId(instanceId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build())
+ .withMember(new ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of(fooTopicName))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .withAssignment(memberId2, mkAssignment(
+ mkTopicAssignment(fooTopicId, 3, 4, 5)))
+ .withAssignmentEpoch(10)
+ .withResolvedRegularExpression("foo*", new
ResolvedRegularExpression(
+ Set.of(fooTopicName), 0L, 0L)))
+ .build();
+ ConsumerGroup group =
context.groupMetadataManager.consumerGroup(groupId);
+ group.setMetadataRefreshDeadline(Long.MAX_VALUE, 10);
+
+ // Member 1 is replaced by a classic member with the same instance id.
+ JoinGroupRequestProtocolCollection joinProtocols = new
JoinGroupRequestProtocolCollection();
+ joinProtocols.add(new JoinGroupRequestData.JoinGroupRequestProtocol()
+ .setName("range")
+
.setMetadata(Utils.toArray(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ List.of()
+ ))))
+ );
+ JoinGroupRequestData joinRequest = new
GroupMetadataManagerTestContext.JoinGroupRequestBuilder()
+ .withGroupId(groupId)
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId(instanceId)
+ .withRebalanceTimeoutMs(5000)
+ .withProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .withProtocols(joinProtocols)
+ .build();
+ GroupMetadataManagerTestContext.JoinResult result =
context.sendClassicGroupJoin(joinRequest);
+
+ ConsumerGroupMember newMember1 = group.staticMember(instanceId);
+
+ ConsumerGroupMember expectedCopiedMember = new
ConsumerGroupMember.Builder(newMember1.memberId())
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(0)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*") // Still uses regex subscription.
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)))
+ .build();
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(newMember1.memberId())
+ .setState(MemberState.STABLE)
+ .setInstanceId(instanceId)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(0)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("") // empty regex subscription
+ .setServerAssignorName("range")
+ .setAssignedPartitions(Map.of()) // empty assignment
+ .setClassicMemberMetadata(
+ new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()
+ .setSessionTimeoutMs(500)
+
.setSupportedProtocols(ConsumerGroupMember.classicProtocolListFromJoinRequestProtocolCollection(joinRequest.protocols()))
+ )
+ .build();
+
+ List<List<CoordinatorRecord>> expectedRecords = List.of(
+ // The previous member is deleted.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
memberId1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
memberId1)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
memberId1)),
+ // The previous member is replaced by the new one.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedCopiedMember)),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedCopiedMember.memberId(), expectedCopiedMember.assignedPartitions())),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedCopiedMember)),
+ // The member subscription is updated.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1)),
+ // The regex is tombstoned.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")),
+ // The group epoch is bumped.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11,
computeGroupHash(Map.of(
+ fooTopicName, computeTopicHash(fooTopicName, metadataImage)
+ )))),
+ // The target assignment is updated.
+ List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
expectedMember1.memberId(), Map.of()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, Map.of())
+ ),
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11)),
+ // The member assignment is updated.
+
List.of(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1))
+ );
+
+ assertUnorderedRecordsEquals(
+ expectedRecords,
+ result.records
+ );
+ }
+
@Test
public void
testConsumerGroupMemberJoinsWithRegexWithTopicAuthorizationFailure() {
String groupId = "fooup";