jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1267441961
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record>
updateStaticMemberAndRebalance(
group.stateAsString() + " when the unknown static member " +
request.groupInstanceId() + " rejoins.");
}
+ return maybeCompleteJoinPhase(group);
+ }
+
+ public CoordinatorResult<Void, Record> genericGroupSync(
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+ Optional<Errors> errorOpt = validateSyncGroup(group, request);
+ if (errorOpt.isPresent()) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(errorOpt.get().code()));
+
+ } else if (group.isInState(EMPTY)) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+ } else if (group.isInState(PREPARING_REBALANCE)) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+ } else if (group.isInState(COMPLETING_REBALANCE)) {
+ group.member(memberId).setAwaitingSyncFuture(responseFuture);
+ removePendingSyncMember(group, request.memberId());
+
+ // If this is the leader, then we can attempt to persist state and
transition to stable
+ if (group.isLeader(memberId)) {
+ log.info("Assignment received from leader {} for group {} for
generation {}. " +
+ "The group has {} members, {} of which are static.",
+ memberId, groupId, group.generationId(),
+ group.size(), group.allStaticMemberIds().size());
+
+ // Fill all members with corresponding assignment. Reset
members not specified in
+ // the assignment to empty assignments.
+ Map<String, byte[]> assignments = new HashMap<>();
+ request.assignments()
+ .forEach(assignment ->
assignments.put(assignment.memberId(), assignment.assignment()));
+
+ Set<String> membersWithMissingAssignment = new HashSet<>();
+ group.allMembers().forEach(member -> {
+ byte[] assignment = assignments.get(member.memberId());
+ if (assignment != null) {
+ member.setAssignment(assignment);
+ } else {
+ membersWithMissingAssignment.add(member.memberId());
+ member.setAssignment(new byte[0]);
+ }
+ });
Review Comment:
I thought it would be safe because
`resetAndPropagateAssignmentWithError(group, error);` resets all members to
empty assignment state if the commit fails.
Let's say that another request comes in before the commit fails. The only
place we respond the member assignment to the client is when we receive a sync
group request in Stable state. So, we never expose the updated assignment until
the write succeeds and transitions the state to Stable.
Do you think this is too brittle?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]