This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3dd5dff307e KAFKA-20269 [2/N]: Refactor assignment update for delayed
share assignments (#21695)
3dd5dff307e is described below
commit 3dd5dff307e3196e5c927d7f02f9616c2f06d509
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 10 17:59:24 2026 +0000
KAFKA-20269 [2/N]: Refactor assignment update for delayed share assignments
(#21695)
Refactor the share target assignment update method to return both the
target assignment epoch and target assignment. When assignment batching
or assignment offload are implemented, the target assignment update
method may return the last target assignment, depending on timings and
the group coordinator config.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 48 ++++++++++++----------
1 file changed, 27 insertions(+), 21 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 0b8b9325442..694097f3ffa 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -289,6 +289,16 @@ public class GroupMetadataManager {
group.targetAssignment(member.memberId(), member.instanceId())
);
}
+
+ private static UpdateTargetAssignmentResult<Assignment>
fromLastTargetAssignment(
+ ShareGroup group,
+ ShareGroupMember member
+ ) {
+ return new UpdateTargetAssignmentResult<>(
+ group.assignmentEpoch(),
+ group.targetAssignment(member.memberId())
+ );
+ }
}
public static class Builder {
@@ -2772,30 +2782,21 @@ public class GroupMetadataManager {
// 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the
partition.
- final int targetAssignmentEpoch;
- final Assignment targetAssignment;
-
- if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateTargetAssignment(
- group,
- groupEpoch,
- updatedMember,
- subscriptionType,
- records
- );
- targetAssignmentEpoch = groupEpoch;
- } else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId());
- }
+ UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult
= maybeUpdateTargetAssignment(
+ group,
+ groupEpoch,
+ updatedMember,
+ subscriptionType,
+ records
+ );
// 3. Reconcile the member's assignment with the target assignment if
the member is not
// fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
- targetAssignmentEpoch,
- targetAssignment,
+ updateTargetAssignmentResult.targetAssignmentEpoch(),
+ updateTargetAssignmentResult.targetAssignment(),
// Force consistency with the subscription when the subscription
has changed.
bumpGroupEpoch,
records
@@ -3881,13 +3882,18 @@ public class GroupMetadataManager {
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
- private Assignment updateTargetAssignment(
+ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
ShareGroup group,
int groupEpoch,
ShareGroupMember updatedMember,
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
+ if (group.assignmentEpoch() >= groupEpoch) {
+ // The assignment is up to date.
+ return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+ }
+
try {
Map<Uuid, Set<Integer>> initializedTopicPartitions =
shareGroupStatePartitionMetadata.containsKey(group.groupId()) ?
stripInitValue(shareGroupStatePartitionMetadata.get(group.groupId()).initializedTopics())
:
@@ -3921,9 +3927,9 @@ public class GroupMetadataManager {
MemberAssignment newMemberAssignment =
assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
- return new Assignment(newMemberAssignment.partitions());
+ return new UpdateTargetAssignmentResult<>(groupEpoch, new
Assignment(newMemberAssignment.partitions()));
} else {
- return Assignment.EMPTY;
+ return new UpdateTargetAssignmentResult<>(groupEpoch,
Assignment.EMPTY);
}
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",