This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b8692f3158e MINOR: Update previous member epoch when members 
unsubscribe from topic (#21325)
b8692f3158e is described below

commit b8692f3158eb784994e1dcd3282e039c8068ffdd
Author: Sean Quah <[email protected]>
AuthorDate: Mon Jan 19 15:26:12 2026 +0000

    MINOR: Update previous member epoch when members unsubscribe from topic 
(#21325)
    
    Update the previous member epoch when unassigning partitions from a
    member due to them unsubscribing from a topic.
    
    Reviewers: David Jacot <[email protected]>
---
 .../group/modern/consumer/CurrentAssignmentBuilder.java    | 14 ++++++++++++--
 .../kafka/coordinator/group/GroupMetadataManagerTest.java  |  6 ++++--
 .../modern/consumer/CurrentAssignmentBuilderTest.java      | 10 ++++++++--
 3 files changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
index cf1d3ddbec4..badedefdee5 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java
@@ -197,7 +197,10 @@ public class CurrentAssignmentBuilder {
                         member.assignedPartitions()
                     );
                 } else if (hasSubscriptionChanged) {
-                    return 
updateCurrentAssignment(member.assignedPartitions());
+                    return updateCurrentAssignment(
+                        member.memberEpoch(),
+                        member.assignedPartitions()
+                    );
                 } else {
                     return member;
                 }
@@ -214,7 +217,10 @@ public class CurrentAssignmentBuilder {
                 // owns any of the revoked partitions. If it does, we cannot 
progress.
                 if 
(ownsRevokedPartitions(member.partitionsPendingRevocation())) {
                     if (hasSubscriptionChanged) {
-                        return 
updateCurrentAssignment(member.assignedPartitions());
+                        return updateCurrentAssignment(
+                            member.memberEpoch(),
+                            member.assignedPartitions()
+                        );
                     } else {
                         return member;
                     }
@@ -284,10 +290,12 @@ public class CurrentAssignmentBuilder {
      * Updates the current assignment, removing any partitions that are not 
part of the subscribed topics.
      * This method is a lot faster than running the full reconciliation logic 
in computeNextAssignment.
      *
+     * @param memberEpoch               The epoch of the member to use.
      * @param memberAssignedPartitions  The assigned partitions of the member 
to use.
      * @return A new ConsumerGroupMember.
      */
     private ConsumerGroupMember updateCurrentAssignment(
+        int memberEpoch,
         Map<Uuid, Set<Integer>> memberAssignedPartitions
     ) {
         Set<Uuid> subscribedTopicIds = subscribedTopicIds();
@@ -329,6 +337,7 @@ public class CurrentAssignmentBuilder {
         if (!newPartitionsPendingRevocation.isEmpty() && 
ownsRevokedPartitions(newPartitionsPendingRevocation)) {
             return new ConsumerGroupMember.Builder(member)
                 .setState(MemberState.UNREVOKED_PARTITIONS)
+                .updateMemberEpoch(memberEpoch)
                 .setAssignedPartitions(newAssignedPartitions)
                 .setPartitionsPendingRevocation(newPartitionsPendingRevocation)
                 .build();
@@ -340,6 +349,7 @@ public class CurrentAssignmentBuilder {
             // reconciliation logic should handle the case where the member 
has revoked all its
             // partitions pending revocation.
             return new ConsumerGroupMember.Builder(member)
+                .updateMemberEpoch(memberEpoch)
                 .setAssignedPartitions(newAssignedPartitions)
                 .build();
         }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 5ebeb53e3d2..922521ec91b 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -22153,7 +22153,7 @@ public class GroupMetadataManagerTest {
         expectedMember1 = new ConsumerGroupMember.Builder(memberId1)
             .setState(MemberState.STABLE)
             .setMemberEpoch(1)
-            .setPreviousMemberEpoch(0)
+            .setPreviousMemberEpoch(1)
             .setClientId(DEFAULT_CLIENT_ID)
             .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
             .setRebalanceTimeoutMs(5000)
@@ -22165,7 +22165,9 @@ public class GroupMetadataManagerTest {
             // The member subscription is updated.
             
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, 
expectedMember1),
             // The previous regex is deleted.
-            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*")
+            
GroupCoordinatorRecordHelpers.newConsumerGroupRegularExpressionTombstone(groupId,
 "foo*"),
+            // The previous member epoch is updated.
+            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, 
expectedMember1)
         );
 
         assertRecordsEquals(expectedRecords, result.records());
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
index 751107471e5..69ef7330d90 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilderTest.java
@@ -821,9 +821,10 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setState(MemberState.STABLE)
             .setMemberEpoch(memberEpoch)
-            .setPreviousMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch - 1)
             .setSubscribedTopicNames(List.of(topic2))
             .setAssignedPartitions(mkAssignment(
+                // Topic 1 is assigned, but no longer in the subscription.
                 mkTopicAssignment(topicId1, 1, 2, 3),
                 mkTopicAssignment(topicId2, 4, 5, 6)))
             .build();
@@ -845,6 +846,9 @@ public class CurrentAssignmentBuilderTest {
             new ConsumerGroupMember.Builder("member")
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(expectedMemberEpoch)
+                // The previous member epoch is updated in all cases tested 
here,
+                // including the case where assigned partitions are removed 
due to a subscription
+                // change, regardless of whether the member epoch is advanced.
                 .setPreviousMemberEpoch(memberEpoch)
                 .setSubscribedTopicNames(List.of(topic2))
                 .setAssignedPartitions(mkAssignment(
@@ -879,9 +883,10 @@ public class CurrentAssignmentBuilderTest {
         ConsumerGroupMember member = new ConsumerGroupMember.Builder("member")
             .setState(MemberState.STABLE)
             .setMemberEpoch(memberEpoch)
-            .setPreviousMemberEpoch(memberEpoch)
+            .setPreviousMemberEpoch(memberEpoch - 1)
             .setSubscribedTopicNames(List.of(topic2))
             .setAssignedPartitions(mkAssignment(
+                // Topic 1 is assigned, but no longer in the subscription.
                 mkTopicAssignment(topicId1, 1, 2, 3),
                 mkTopicAssignment(topicId2, 4, 5, 6)))
             .build();
@@ -906,6 +911,7 @@ public class CurrentAssignmentBuilderTest {
             new ConsumerGroupMember.Builder("member")
                 .setState(MemberState.UNREVOKED_PARTITIONS)
                 .setMemberEpoch(expectedMemberEpoch)
+                // The previous member epoch is updated in all cases.
                 .setPreviousMemberEpoch(memberEpoch)
                 .setSubscribedTopicNames(List.of(topic2))
                 .setAssignedPartitions(mkAssignment(

Reply via email to