This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ff010a402d1 KAFKA-20269 [3/N]: Refactor assignment update for delayed
streams assignments (#21696)
ff010a402d1 is described below
commit ff010a402d1148be06e00da4b1b3d4c95fdae698
Author: Sean Quah <[email protected]>
AuthorDate: Wed Mar 11 16:20:52 2026 +0000
KAFKA-20269 [3/N]: Refactor assignment update for delayed streams
assignments (#21696)
Refactor the streams target assignment update method to return both the
target assignment epoch and target assignment. When assignment batching
or assignment offload are implemented, the target assignment update
method may return the last target assignment, depending on timings and
the group coordinator config.
Reviewers: Lucas Brutschy <[email protected]>, David Jacot
<[email protected]>
---
.../coordinator/group/GroupMetadataManager.java | 94 +++++++++++++---------
1 file changed, 58 insertions(+), 36 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 3d7ecdbfe2c..ffcd909a2b9 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -299,6 +299,23 @@ public class GroupMetadataManager {
group.targetAssignment(member.memberId())
);
}
+
+ private static UpdateTargetAssignmentResult<TasksTuple>
fromLastTargetAssignment(
+ StreamsGroup group,
+ Optional<StreamsGroupMember> member
+ ) {
+ if (member.isPresent()) {
+ return new UpdateTargetAssignmentResult<>(
+ group.assignmentEpoch(),
+ group.targetAssignment(member.get().memberId())
+ );
+ } else {
+ return new UpdateTargetAssignmentResult<>(
+ group.assignmentEpoch(),
+ TasksTuple.EMPTY
+ );
+ }
+ }
}
public static class Builder {
@@ -2086,36 +2103,16 @@ public class GroupMetadataManager {
// 4. Update the target assignment if the group epoch is larger than
the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is
persisted to the partition.
- int targetAssignmentEpoch;
- TasksTuple targetAssignment;
- if (groupEpoch > group.assignmentEpoch()) {
- boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(groupId));
- if (initialDelayActive) {
- // During initial rebalance delay, return empty assignment to
first joining members.
- targetAssignmentEpoch = Math.max(1, group.assignmentEpoch());
- targetAssignment = TasksTuple.EMPTY;
-
- returnedStatus.add(
- new Status()
-
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
- .setStatusDetail("Assignment delayed due to the
configured initial rebalance delay.")
- );
- } else {
- targetAssignment = updateStreamsTargetAssignment(
- group,
- groupEpoch,
- Optional.of(updatedMember),
- updatedConfiguredTopology,
- metadataImage,
- records,
- currentAssignmentConfigs
- );
- targetAssignmentEpoch = groupEpoch;
- }
- } else {
- targetAssignmentEpoch = group.assignmentEpoch();
- targetAssignment =
group.targetAssignment(updatedMember.memberId());
- }
+ UpdateTargetAssignmentResult<TasksTuple> updateTargetAssignmentResult
= maybeUpdateStreamsTargetAssignment(
+ group,
+ groupEpoch,
+ Optional.of(updatedMember),
+ updatedConfiguredTopology,
+ metadataImage,
+ records,
+ Optional.of(returnedStatus),
+ currentAssignmentConfigs
+ );
// 5. Reconcile the member's assignment with the target assignment if
the member is not
// fully reconciled yet.
@@ -2125,8 +2122,8 @@ public class GroupMetadataManager {
group::currentActiveTaskProcessId,
group::currentStandbyTaskProcessIds,
group::currentWarmupTaskProcessIds,
- targetAssignmentEpoch,
- targetAssignment,
+ updateTargetAssignmentResult.targetAssignmentEpoch(),
+ updateTargetAssignmentResult.targetAssignment(),
ownedActiveTasks,
ownedStandbyTasks,
ownedWarmupTasks,
@@ -3940,17 +3937,38 @@ public class GroupMetadataManager {
* @param updatedMember The updated member (optional).
* @param metadataImage The metadata image.
* @param records The list to accumulate any new records.
+ * @param returnedStatus A mutable collection of status to be
returned in the response.
* @return The new target assignment for the updated member, or EMPTY if
no member specified.
*/
- private TasksTuple updateStreamsTargetAssignment(
+ private UpdateTargetAssignmentResult<TasksTuple>
maybeUpdateStreamsTargetAssignment(
StreamsGroup group,
int groupEpoch,
Optional<StreamsGroupMember> updatedMember,
ConfiguredTopology configuredTopology,
CoordinatorMetadataImage metadataImage,
List<CoordinatorRecord> records,
+ Optional<List<Status>> returnedStatus,
Map<String, String> assignmentConfigs
) {
+ boolean initialDelayActive =
timer.isScheduled(streamsInitialRebalanceKey(group.groupId()));
+ if (initialDelayActive) {
+ returnedStatus.ifPresent(statusList -> statusList.add(
+ new Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.ASSIGNMENT_DELAYED.code())
+ .setStatusDetail("Assignment delayed due to the configured
initial rebalance delay.")
+ ));
+
+ return new UpdateTargetAssignmentResult<>(
+ group.assignmentEpoch(),
+ TasksTuple.EMPTY
+ );
+ }
+
+ if (group.assignmentEpoch() >= groupEpoch) {
+ // The assignment is up to date.
+ return
UpdateTargetAssignmentResult.fromLastTargetAssignment(group, updatedMember);
+ }
+
TaskAssignor assignor = streamsGroupAssignor(group.groupId());
try {
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder
assignmentResultBuilder =
@@ -3986,8 +4004,11 @@ public class GroupMetadataManager {
records.addAll(assignmentResult.records());
- return updatedMember.map(member ->
assignmentResult.targetAssignment().get(member.memberId()))
- .orElse(TasksTuple.EMPTY);
+ return new UpdateTargetAssignmentResult<>(
+ groupEpoch,
+ updatedMember.map(member ->
assignmentResult.targetAssignment().get(member.memberId()))
+ .orElse(TasksTuple.EMPTY)
+ );
} catch (TaskAssignorException ex) {
String msg = String.format("Failed to compute a new target
assignment for epoch %d: %s",
groupEpoch, ex.getMessage());
@@ -4024,13 +4045,14 @@ public class GroupMetadataManager {
}
List<CoordinatorRecord> records = new ArrayList<>();
- updateStreamsTargetAssignment(
+ maybeUpdateStreamsTargetAssignment(
group,
group.groupEpoch(),
Optional.empty(),
group.configuredTopology().get(),
metadataImage,
records,
+ Optional.empty(),
group.lastAssignmentConfigs()
);