This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3dd5dff307e KAFKA-20269 [2/N]: Refactor assignment update for delayed 
share assignments (#21695)
3dd5dff307e is described below

commit 3dd5dff307e3196e5c927d7f02f9616c2f06d509
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 10 17:59:24 2026 +0000

    KAFKA-20269 [2/N]: Refactor assignment update for delayed share assignments 
(#21695)
    
    Refactor the share target assignment update method to return both the
    target assignment epoch and target assignment. When assignment batching
    or assignment offload are implemented, the target assignment update
    method may return the last target assignment, depending on timings and
    the group coordinator config.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 48 ++++++++++++----------
 1 file changed, 27 insertions(+), 21 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 0b8b9325442..694097f3ffa 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -289,6 +289,16 @@ public class GroupMetadataManager {
                 group.targetAssignment(member.memberId(), member.instanceId())
             );
         }
+
+        private static UpdateTargetAssignmentResult<Assignment> 
fromLastTargetAssignment(
+            ShareGroup group,
+            ShareGroupMember member
+        ) {
+            return new UpdateTargetAssignmentResult<>(
+                group.assignmentEpoch(),
+                group.targetAssignment(member.memberId())
+            );
+        }
     }
 
     public static class Builder {
@@ -2772,30 +2782,21 @@ public class GroupMetadataManager {
 
         // 2. Update the target assignment if the group epoch is larger than 
the target assignment epoch. The delta between
         // the existing and the new target assignment is persisted to the 
partition.
-        final int targetAssignmentEpoch;
-        final Assignment targetAssignment;
-
-        if (groupEpoch > group.assignmentEpoch()) {
-            targetAssignment = updateTargetAssignment(
-                group,
-                groupEpoch,
-                updatedMember,
-                subscriptionType,
-                records
-            );
-            targetAssignmentEpoch = groupEpoch;
-        } else {
-            targetAssignmentEpoch = group.assignmentEpoch();
-            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
-        }
+        UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult 
= maybeUpdateTargetAssignment(
+            group,
+            groupEpoch,
+            updatedMember,
+            subscriptionType,
+            records
+        );
 
         // 3. Reconcile the member's assignment with the target assignment if 
the member is not
         // fully reconciled yet.
         updatedMember = maybeReconcile(
             groupId,
             updatedMember,
-            targetAssignmentEpoch,
-            targetAssignment,
+            updateTargetAssignmentResult.targetAssignmentEpoch(),
+            updateTargetAssignmentResult.targetAssignment(),
             // Force consistency with the subscription when the subscription 
has changed.
             bumpGroupEpoch,
             records
@@ -3881,13 +3882,18 @@ public class GroupMetadataManager {
      * @param records          The list to accumulate any new records.
      * @return The new target assignment.
      */
-    private Assignment updateTargetAssignment(
+    private UpdateTargetAssignmentResult<Assignment> 
maybeUpdateTargetAssignment(
         ShareGroup group,
         int groupEpoch,
         ShareGroupMember updatedMember,
         SubscriptionType subscriptionType,
         List<CoordinatorRecord> records
     ) {
+        if (group.assignmentEpoch() >= groupEpoch) {
+            // The assignment is up to date.
+            return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+        }
+
         try {
             Map<Uuid, Set<Integer>> initializedTopicPartitions = 
shareGroupStatePartitionMetadata.containsKey(group.groupId()) ?
                 
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics())
 :
@@ -3921,9 +3927,9 @@ public class GroupMetadataManager {
 
             MemberAssignment newMemberAssignment = 
assignmentResult.targetAssignment().get(updatedMember.memberId());
             if (newMemberAssignment != null) {
-                return new Assignment(newMemberAssignment.partitions());
+                return new UpdateTargetAssignmentResult<>(groupEpoch, new 
Assignment(newMemberAssignment.partitions()));
             } else {
-                return Assignment.EMPTY;
+                return new UpdateTargetAssignmentResult<>(groupEpoch, 
Assignment.EMPTY);
             }
         } catch (PartitionAssignorException ex) {
             String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",

Reply via email to