This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 474bd6f7f18 KAFKA-19899: Bumping group epoch when member regex
subscription changes from non empty to empty (#21075)
474bd6f7f18 is described below
commit 474bd6f7f182fde1fce583f560f74c3d56b3da1b
Author: Anton Vasanth <[email protected]>
AuthorDate: Thu Dec 4 18:09:55 2025 +0530
KAFKA-19899: Bumping group epoch when member regex subscription changes
from non empty to empty (#21075)
This patch fixes a case in GroupMetadataManager where a member’s
regular expression subscription transitions from a non-empty regex to
an empty regex. In this scenario, maybeUpdateRegularExpressions should
return true so that the group epoch is bumped.
[https://issues.apache.org/jira/projects/KAFKA/issues/KAFKA-19899?filter=allissues](url)
Target branch: 4.1
Reviewers: Sean Quah <[email protected]>, David Jacot
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 2 +
.../group/GroupMetadataManagerTest.java | 81 ++++++++++++++++++++++
2 files changed, 83 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 53b5f4ba1d5..5b20c4ba4ba 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -3149,6 +3149,8 @@ public class GroupMetadataManager {
// by bumping the group epoch.
bumpGroupEpoch =
group.resolvedRegularExpression(newSubscribedTopicRegex).isPresent();
}
+ } else if (isNotEmpty(oldSubscribedTopicRegex)) {
+ bumpGroupEpoch = true;
}
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 135752fec61..3c876284515 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -20790,6 +20790,87 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testConsumerGroupMemberClearsRegex() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 6)
+ .build(12345L);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ assignor.prepareGroupAssignment(new GroupAssignment(Map.of()));
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+ .withMember(new ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(10)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("foo*")
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .build())
+ .withAssignment(memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+ .withAssignmentEpoch(10))
+ .build();
+
+ // Member 1 updates its new regular expression.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result = context.consumerGroupHeartbeat(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(10)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("")
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of()));
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(11)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of())
+ ),
+ result.response()
+ );
+
+ ConsumerGroupMember expectedMember1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(11)
+ .setPreviousMemberEpoch(10)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicRegex("")
+ .setServerAssignorName("range")
+ .build();
+
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
+ // previous expression is deleted
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+ GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
11, 0),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, Map.of()),
+
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
11),
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
@Test
public void
testConsumerMemberWithRegexReplacedByClassicMemberWithSameSubscription() {
String groupId = "fooup";