This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff010a402d1 KAFKA-20269 [3/N]: Refactor assignment update for delayed 
streams assignments (#21696)
ff010a402d1 is described below

commit ff010a402d1148be06e00da4b1b3d4c95fdae698
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 11 16:20:52 2026 +0000

    KAFKA-20269 [3/N]: Refactor assignment update for delayed streams 
assignments (#21696)
    
    Refactor the streams target assignment update method to return both the
    target assignment epoch and target assignment. When assignment batching
    or assignment offload are implemented, the target assignment update
    method may return the last target assignment, depending on timings and
    the group coordinator config.
    
    Reviewers: Lucas Brutschy <[email protected]>, David Jacot
     <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    | 94 +++++++++++++---------
 1 file changed, 58 insertions(+), 36 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 3d7ecdbfe2c..ffcd909a2b9 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -299,6 +299,23 @@ public class GroupMetadataManager {
                 group.targetAssignment(member.memberId())
             );
         }
+
+        private static UpdateTargetAssignmentResult<TasksTuple> 
fromLastTargetAssignment(
+            StreamsGroup group,
+            Optional<StreamsGroupMember> member
+        ) {
+            if (member.isPresent()) {
+                return new UpdateTargetAssignmentResult<>(
+                    group.assignmentEpoch(),
+                    group.targetAssignment(member.get().memberId())
+                );
+            } else {
+                return new UpdateTargetAssignmentResult<>(
+                    group.assignmentEpoch(),
+                    TasksTuple.EMPTY
+                );
+            }
+        }
     }
 
     public static class Builder {
@@ -2086,36 +2103,16 @@ public class GroupMetadataManager {
         // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
         // replaces an existing static member.
         // The delta between the existing and the new target assignment is 
persisted to the partition.
-        int targetAssignmentEpoch;
-        TasksTuple targetAssignment;
-        if (groupEpoch > group.assignmentEpoch()) {
-            boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
-            if (initialDelayActive) {
-                // During initial rebalance delay, return empty assignment to 
first joining members.
-                targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
-                targetAssignment = TasksTuple.EMPTY;
-
-                returnedStatus.add(
-                    new Status()
-                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
-                        .setStatusDetail("Assignment delayed due to the 
configured initial rebalance delay.")
-                );
-            } else {
-                targetAssignment = updateStreamsTargetAssignment(
-                    group,
-                    groupEpoch,
-                    Optional.of(updatedMember),
-                    updatedConfiguredTopology,
-                    metadataImage,
-                    records,
-                    currentAssignmentConfigs
-                );
-                targetAssignmentEpoch = groupEpoch;
-            }
-        } else {
-            targetAssignmentEpoch = group.assignmentEpoch();
-            targetAssignment = 
group.targetAssignment(updatedMember.memberId());
-        }
+        UpdateTargetAssignmentResult<TasksTuple> updateTargetAssignmentResult 
= maybeUpdateStreamsTargetAssignment(
+            group,
+            groupEpoch,
+            Optional.of(updatedMember),
+            updatedConfiguredTopology,
+            metadataImage,
+            records,
+            Optional.of(returnedStatus),
+            currentAssignmentConfigs
+        );
 
         // 5. Reconcile the member's assignment with the target assignment if 
the member is not
         // fully reconciled yet.
@@ -2125,8 +2122,8 @@ public class GroupMetadataManager {
             group::currentActiveTaskProcessId,
             group::currentStandbyTaskProcessIds,
             group::currentWarmupTaskProcessIds,
-            targetAssignmentEpoch,
-            targetAssignment,
+            updateTargetAssignmentResult.targetAssignmentEpoch(),
+            updateTargetAssignmentResult.targetAssignment(),
             ownedActiveTasks,
             ownedStandbyTasks,
             ownedWarmupTasks,
@@ -3940,17 +3937,38 @@ public class GroupMetadataManager {
      * @param updatedMember        The updated member (optional).
      * @param metadataImage        The metadata image.
      * @param records              The list to accumulate any new records.
+     * @param returnedStatus       A mutable collection of status to be 
returned in the response.
      * @return The new target assignment for the updated member, or EMPTY if 
no member specified.
      */
-    private TasksTuple updateStreamsTargetAssignment(
+    private UpdateTargetAssignmentResult<TasksTuple> 
maybeUpdateStreamsTargetAssignment(
         StreamsGroup group,
         int groupEpoch,
         Optional<StreamsGroupMember> updatedMember,
         ConfiguredTopology configuredTopology,
         CoordinatorMetadataImage metadataImage,
         List<CoordinatorRecord> records,
+        Optional<List<Status>> returnedStatus,
         Map<String, String> assignmentConfigs
     ) {
+        boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(group.groupId()));
+        if (initialDelayActive) {
+            returnedStatus.ifPresent(statusList -> statusList.add(
+                new Status()
+                    
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+                    .setStatusDetail("Assignment delayed due to the configured 
initial rebalance delay.")
+            ));
+
+            return new UpdateTargetAssignmentResult<>(
+                group.assignmentEpoch(),
+                TasksTuple.EMPTY
+            );
+        }
+
+        if (group.assignmentEpoch() >= groupEpoch) {
+            // The assignment is up to date.
+            return 
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+        }
+
         TaskAssignor assignor = streamsGroupAssignor(group.groupId());
         try {
             org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder 
assignmentResultBuilder =
@@ -3986,8 +4004,11 @@ public class GroupMetadataManager {
 
             records.addAll(assignmentResult.records());
 
-            return updatedMember.map(member -> 
assignmentResult.targetAssignment().get(member.memberId()))
-                .orElse(TasksTuple.EMPTY);
+            return new UpdateTargetAssignmentResult<>(
+                groupEpoch,
+                updatedMember.map(member -> 
assignmentResult.targetAssignment().get(member.memberId()))
+                    .orElse(TasksTuple.EMPTY)
+            );
         } catch (TaskAssignorException ex) {
             String msg = String.format("Failed to compute a new target 
assignment for epoch %d: %s",
                 groupEpoch, ex.getMessage());
@@ -4024,13 +4045,14 @@ public class GroupMetadataManager {
             }
 
             List<CoordinatorRecord> records = new ArrayList<>();
-            updateStreamsTargetAssignment(
+            maybeUpdateStreamsTargetAssignment(
                 group,
                 group.groupEpoch(),
                 Optional.empty(),
                 group.configuredTopology().get(),
                 metadataImage,
                 records,
+                Optional.empty(),
                 group.lastAssignmentConfigs()
             );
 

Reply via email to