This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ccfc37d3743 KAFKA-20269 [1/N]: Refactor assignment update for delayed 
consumer assignments (#21694)
ccfc37d3743 is described below

commit ccfc37d3743649128bcd606ad1d954ac76d12fcf
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 10 14:53:46 2026 +0000

    KAFKA-20269 [1/N]: Refactor assignment update for delayed consumer 
assignments (#21694)
    
    Refactor the consumer target assignment update method to return both the
    target assignment epoch and target assignment. When assignment batching
    or assignment offload are implemented, the target assignment update
    method may return the last target assignment, depending on timings and
    the group coordinator config.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 84 +++++++++++-----------
 1 file changed, 43 insertions(+), 41 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 1967089ebd5..0b8b9325442 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -276,6 +276,21 @@ public class GroupMetadataManager {
         }
     }
 
+    private record UpdateTargetAssignmentResult<T>(
+        int targetAssignmentEpoch,
+        T targetAssignment
+    ) {
+        private static UpdateTargetAssignmentResult<Assignment> 
fromLastTargetAssignment(
+            ConsumerGroup group,
+            ConsumerGroupMember member
+        ) {
+            return new UpdateTargetAssignmentResult<>(
+                group.assignmentEpoch(),
+                group.targetAssignment(member.memberId(), member.instanceId())
+            );
+        }
+    }
+
     public static class Builder {
         private LogContext logContext = null;
         private SnapshotRegistry snapshotRegistry = null;
@@ -2394,23 +2409,14 @@ public class GroupMetadataManager {
 
         // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The delta between
         // the existing and the new target assignment is persisted to the 
partition.
-        final int targetAssignmentEpoch;
-        final Assignment targetAssignment;
-
-        if (groupEpoch > group.assignmentEpoch()) {
-            targetAssignment = updateTargetAssignment(
-                group,
-                groupEpoch,
-                member,
-                updatedMember,
-                subscriptionType,
-                records
-            );
-            targetAssignmentEpoch = groupEpoch;
-        } else {
-            targetAssignmentEpoch = group.assignmentEpoch();
-            targetAssignment = 
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
-        }
+        UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult 
= maybeUpdateTargetAssignment(
+            group,
+            groupEpoch,
+            member,
+            updatedMember,
+            subscriptionType,
+            records
+        );
 
         // 3. Reconcile the member's assignment with the target assignment if 
the member is not
         // fully reconciled yet.
@@ -2418,8 +2424,8 @@ public class GroupMetadataManager {
             groupId,
             updatedMember,
             group::currentPartitionEpoch,
-            targetAssignmentEpoch,
-            targetAssignment,
+            updateTargetAssignmentResult.targetAssignmentEpoch(),
+            updateTargetAssignmentResult.targetAssignment(),
             group.resolvedRegularExpressions(),
             // Force consistency with the subscription when the subscription 
has changed.
             hasSubscriptionChanged,
@@ -2613,31 +2619,22 @@ public class GroupMetadataManager {
 
             // 2. Update the target assignment if the group epoch is larger 
than the target assignment epoch.
             // The delta between the existing and the new target assignment is 
persisted to the partition.
-            final int targetAssignmentEpoch;
-            final Assignment targetAssignment;
-
-            if (groupEpoch > group.assignmentEpoch()) {
-                targetAssignment = updateTargetAssignment(
-                    group,
-                    groupEpoch,
-                    member,
-                    updatedMember,
-                    subscriptionType,
-                    records
-                );
-                targetAssignmentEpoch = groupEpoch;
-            } else {
-                targetAssignmentEpoch = group.assignmentEpoch();
-                targetAssignment = 
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
-            }
+            UpdateTargetAssignmentResult<Assignment> 
updateTargetAssignmentResult = maybeUpdateTargetAssignment(
+                group,
+                groupEpoch,
+                member,
+                updatedMember,
+                subscriptionType,
+                records
+            );
 
             // 3. Reconcile the member's assignment with the target assignment 
if the member is not fully reconciled yet.
             updatedMember = maybeReconcile(
                 groupId,
                 updatedMember,
                 group::currentPartitionEpoch,
-                targetAssignmentEpoch,
-                targetAssignment,
+                updateTargetAssignmentResult.targetAssignmentEpoch(),
+                updateTargetAssignmentResult.targetAssignment(),
                 group.resolvedRegularExpressions(),
                 // Force consistency with the subscription when the 
subscription has changed.
                 bumpGroupEpoch,
@@ -3808,7 +3805,7 @@ public class GroupMetadataManager {
      * @param records          The list to accumulate any new records.
      * @return The new target assignment.
      */
-    private Assignment updateTargetAssignment(
+    private UpdateTargetAssignmentResult<Assignment> 
maybeUpdateTargetAssignment(
         ConsumerGroup group,
         int groupEpoch,
         ConsumerGroupMember member,
@@ -3816,6 +3813,11 @@ public class GroupMetadataManager {
         SubscriptionType subscriptionType,
         List<CoordinatorRecord> records
     ) {
+        if (group.assignmentEpoch() >= groupEpoch) {
+            // The assignment is up to date.
+            return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+        }
+
         String preferredServerAssignor = group.computePreferredServerAssignor(
             member,
             updatedMember
@@ -3857,9 +3859,9 @@ public class GroupMetadataManager {
 
             MemberAssignment newMemberAssignment = 
assignmentResult.targetAssignment().get(updatedMember.memberId());
             if (newMemberAssignment != null) {
-                return new Assignment(newMemberAssignment.partitions());
+                return new UpdateTargetAssignmentResult<>(groupEpoch, new 
Assignment(newMemberAssignment.partitions()));
             } else {
-                return Assignment.EMPTY;
+                return new UpdateTargetAssignmentResult<>(groupEpoch, 
Assignment.EMPTY);
             }
         } catch (PartitionAssignorException ex) {
             String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",

Reply via email to