This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b8692f3158e MINOR: Update previous member epoch when members
unsubscribe from topic (#21325)
b8692f3158e is described below
commit b8692f3158eb784994e1dcd3282e039c8068ffdd
Author: Sean Quah <[email protected]>
AuthorDate: Mon Jan 19 15:26:12 2026 +0000
MINOR: Update previous member epoch when members unsubscribe from topic
(#21325)
Update the previous member epoch when unassigning partitions from a
member due to them unsubscribing from a topic.
Reviewers: David Jacot <[email protected]>
---
.../group/modern/consumer/CurrentAssignmentBuilder.java | 14 ++++++++++++--
.../kafka/coordinator/group/GroupMetadataManagerTest.java | 6 ++++--
.../modern/consumer/CurrentAssignmentBuilderTest.java | 10 ++++++++--
3 files changed, 24 insertions(+), 6 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
index cf1d3ddbec4..badedefdee5 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
@@ -197,7 +197,10 @@ public class CurrentAssignmentBuilder {
member.assignedPartitions()
);
} else if (hasSubscriptionChanged) {
- return
updateCurrentAssignment(member.assignedPartitions());
+ return updateCurrentAssignment(
+ member.memberEpoch(),
+ member.assignedPartitions()
+ );
} else {
return member;
}
@@ -214,7 +217,10 @@ public class CurrentAssignmentBuilder {
// owns any of the revoked partitions. If it does, we cannot
progress.
if
(ownsRevokedPartitions(member.partitionsPendingRevocation())) {
if (hasSubscriptionChanged) {
- return
updateCurrentAssignment(member.assignedPartitions());
+ return updateCurrentAssignment(
+ member.memberEpoch(),
+ member.assignedPartitions()
+ );
} else {
return member;
}
@@ -284,10 +290,12 @@ public class CurrentAssignmentBuilder {
* Updates the current assignment, removing any partitions that are not
part of the subscribed topics.
* This method is a lot faster than running the full reconciliation logic
in computeNextAssignment.
*
+ * @param memberEpoch The epoch of the member to use.
* @param memberAssignedPartitions The assigned partitions of the member
to use.
* @return A new ConsumerGroupMember.
*/
private ConsumerGroupMember updateCurrentAssignment(
+ int memberEpoch,
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
@@ -329,6 +337,7 @@ public class CurrentAssignmentBuilder {
if (!newPartitionsPendingRevocation.isEmpty() &&
ownsRevokedPartitions(newPartitionsPendingRevocation)) {
return new ConsumerGroupMember.Builder(member)
.setState(MemberState.UNREVOKED_PARTITIONS)
+ .updateMemberEpoch(memberEpoch)
.setAssignedPartitions(newAssignedPartitions)
.setPartitionsPendingRevocation(newPartitionsPendingRevocation)
.build();
@@ -340,6 +349,7 @@ public class CurrentAssignmentBuilder {
// reconciliation logic should handle the case where the member
has revoked all its
// partitions pending revocation.
return new ConsumerGroupMember.Builder(member)
+ .updateMemberEpoch(memberEpoch)
.setAssignedPartitions(newAssignedPartitions)
.build();
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 5ebeb53e3d2..922521ec91b 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -22153,7 +22153,7 @@ public class GroupMetadataManagerTest {
expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
.setState(MemberState.STABLE)
.setMemberEpoch(1)
- .setPreviousMemberEpoch(0)
+ .setPreviousMemberEpoch(1)
.setClientId(DEFAULT_CLIENT_ID)
.setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
.setRebalanceTimeoutMs(5000)
@@ -22165,7 +22165,9 @@ public class GroupMetadataManagerTest {
// The member subscription is updated.
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
expectedMember1),
// The previous regex is deleted.
-
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*")
+
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
"foo*"),
+ // The previous member epoch is updated.
+
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
expectedMember1)
);
assertRecordsEquals(expectedRecords, result.records());
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
index 751107471e5..69ef7330d90 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
@@ -821,9 +821,10 @@ public class CurrentAssignmentBuilderTest {
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(memberEpoch)
- .setPreviousMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch - 1)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
+ // Topic 1 is assigned, but no longer in the subscription.
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
@@ -845,6 +846,9 @@ public class CurrentAssignmentBuilderTest {
new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(expectedMemberEpoch)
+ // The previous member epoch is updated in all cases tested
here,
+ // including the case where assigned partitions are removed
due to a subscription
+ // change, regardless of whether the member epoch is advanced.
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
@@ -879,9 +883,10 @@ public class CurrentAssignmentBuilderTest {
ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
.setState(MemberState.STABLE)
.setMemberEpoch(memberEpoch)
- .setPreviousMemberEpoch(memberEpoch)
+ .setPreviousMemberEpoch(memberEpoch - 1)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(
+ // Topic 1 is assigned, but no longer in the subscription.
mkTopicAssignment(topicId1, 1, 2, 3),
mkTopicAssignment(topicId2, 4, 5, 6)))
.build();
@@ -906,6 +911,7 @@ public class CurrentAssignmentBuilderTest {
new ConsumerGroupMember.Builder("member")
.setState(MemberState.UNREVOKED_PARTITIONS)
.setMemberEpoch(expectedMemberEpoch)
+ // The previous member epoch is updated in all cases.
.setPreviousMemberEpoch(memberEpoch)
.setSubscribedTopicNames(List.of(topic2))
.setAssignedPartitions(mkAssignment(