This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ccfc37d3743 KAFKA-20269 [1/N]: Refactor assignment update for delayed
consumer assignments (#21694)
ccfc37d3743 is described below
commit ccfc37d3743649128bcd606ad1d954ac76d12fcf
Author: Sean Quah <[email protected]>
AuthorDate: Tue Mar 10 14:53:46 2026 +0000
KAFKA-20269 [1/N]: Refactor assignment update for delayed consumer
assignments (#21694)
Refactor the consumer target assignment update method to return both the
target assignment epoch and target assignment. When assignment batching
or assignment offload are implemented, the target assignment update
method may return the last target assignment, depending on timings and
the group coordinator config.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 84 +++++++++++-----------
1 file changed, 43 insertions(+), 41 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 1967089ebd5..0b8b9325442 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -276,6 +276,21 @@ public class GroupMetadataManager {
}
}
+ private record UpdateTargetAssignmentResult<T>(
+ int targetAssignmentEpoch,
+ T targetAssignment
+ ) {
+ private static UpdateTargetAssignmentResult<Assignment>
fromLastTargetAssignment(
+ ConsumerGroup group,
+ ConsumerGroupMember member
+ ) {
+ return new UpdateTargetAssignmentResult<>(
+ group.assignmentEpoch(),
+ group.targetAssignment(member.memberId(), member.instanceId())
+ );
+ }
+ }
+
public static class Builder {
private LogContext logContext = null;
private SnapshotRegistry snapshotRegistry = null;
@@ -2394,23 +2409,14 @@ public class GroupMetadataManager {
// 2. Update the target assignment if the group epoch is larger than
the target assignment epoch. The delta between
// the existing and the new target assignment is persisted to the
partition.
- final int targetAssignmentEpoch;
- final Assignment targetAssignment;
-
- if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateTargetAssignment(
- group,
- groupEpoch,
- member,
- updatedMember,
- subscriptionType,
- records
- );
- targetAssignmentEpoch = groupEpoch;
- } else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
- }
+ UpdateTargetAssignmentResult<Assignment> updateTargetAssignmentResult
= maybeUpdateTargetAssignment(
+ group,
+ groupEpoch,
+ member,
+ updatedMember,
+ subscriptionType,
+ records
+ );
// 3. Reconcile the member's assignment with the target assignment if
the member is not
// fully reconciled yet.
@@ -2418,8 +2424,8 @@ public class GroupMetadataManager {
groupId,
updatedMember,
group::currentPartitionEpoch,
- targetAssignmentEpoch,
- targetAssignment,
+ updateTargetAssignmentResult.targetAssignmentEpoch(),
+ updateTargetAssignmentResult.targetAssignment(),
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the subscription
has changed.
hasSubscriptionChanged,
@@ -2613,31 +2619,22 @@ public class GroupMetadataManager {
// 2. Update the target assignment if the group epoch is larger
than the target assignment epoch.
// The delta between the existing and the new target assignment is
persisted to the partition.
- final int targetAssignmentEpoch;
- final Assignment targetAssignment;
-
- if (groupEpoch > group.assignmentEpoch()) {
- targetAssignment = updateTargetAssignment(
- group,
- groupEpoch,
- member,
- updatedMember,
- subscriptionType,
- records
- );
- targetAssignmentEpoch = groupEpoch;
- } else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId(), updatedMember.instanceId());
- }
+ UpdateTargetAssignmentResult<Assignment>
updateTargetAssignmentResult = maybeUpdateTargetAssignment(
+ group,
+ groupEpoch,
+ member,
+ updatedMember,
+ subscriptionType,
+ records
+ );
// 3. Reconcile the member's assignment with the target assignment
if the member is not fully reconciled yet.
updatedMember = maybeReconcile(
groupId,
updatedMember,
group::currentPartitionEpoch,
- targetAssignmentEpoch,
- targetAssignment,
+ updateTargetAssignmentResult.targetAssignmentEpoch(),
+ updateTargetAssignmentResult.targetAssignment(),
group.resolvedRegularExpressions(),
// Force consistency with the subscription when the
subscription has changed.
bumpGroupEpoch,
@@ -3808,7 +3805,7 @@ public class GroupMetadataManager {
* @param records The list to accumulate any new records.
* @return The new target assignment.
*/
- private Assignment updateTargetAssignment(
+ private UpdateTargetAssignmentResult<Assignment>
maybeUpdateTargetAssignment(
ConsumerGroup group,
int groupEpoch,
ConsumerGroupMember member,
@@ -3816,6 +3813,11 @@ public class GroupMetadataManager {
SubscriptionType subscriptionType,
List<CoordinatorRecord> records
) {
+ if (group.assignmentEpoch() >= groupEpoch) {
+ // The assignment is up to date.
+ return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+ }
+
String preferredServerAssignor = group.computePreferredServerAssignor(
member,
updatedMember
@@ -3857,9 +3859,9 @@ public class GroupMetadataManager {
MemberAssignment newMemberAssignment =
assignmentResult.targetAssignment().get(updatedMember.memberId());
if (newMemberAssignment != null) {
- return new Assignment(newMemberAssignment.partitions());
+ return new UpdateTargetAssignmentResult<>(groupEpoch, new
Assignment(newMemberAssignment.partitions()));
} else {
- return Assignment.EMPTY;
+ return new UpdateTargetAssignmentResult<>(groupEpoch,
Assignment.EMPTY);
}
} catch (PartitionAssignorException ex) {
String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",