dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1269398879
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2643,9 +2652,175 @@ private CoordinatorResult<Void, Record>
updateStaticMemberAndRebalance(
group.stateAsString() + " when the unknown static member " +
request.groupInstanceId() + " rejoins.");
}
+ return maybeCompleteJoinPhase(group);
+ }
+
+ public CoordinatorResult<Void, Record> genericGroupSync(
+ RequestContext context,
+ SyncGroupRequestData request,
+ CompletableFuture<SyncGroupResponseData> responseFuture
+ ) throws UnknownMemberIdException, GroupIdNotFoundException {
+ String groupId = request.groupId();
+ String memberId = request.memberId();
+ GenericGroup group = getOrMaybeCreateGenericGroup(groupId, false);
+ Optional<Errors> errorOpt = validateSyncGroup(group, request);
+ if (errorOpt.isPresent()) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(errorOpt.get().code()));
+
+ } else if (group.isInState(EMPTY)) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()));
+
+ } else if (group.isInState(PREPARING_REBALANCE)) {
+ responseFuture.complete(new SyncGroupResponseData()
+ .setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()));
+
+ } else if (group.isInState(COMPLETING_REBALANCE)) {
+ group.member(memberId).setAwaitingSyncFuture(responseFuture);
+ removePendingSyncMember(group, request.memberId());
+
+ // If this is the leader, then we can attempt to persist state and
transition to stable
+ if (group.isLeader(memberId)) {
+ log.info("Assignment received from leader {} for group {} for
generation {}. " +
+ "The group has {} members, {} of which are static.",
+ memberId, groupId, group.generationId(),
+ group.size(), group.allStaticMemberIds().size());
+
+ // Fill all members with corresponding assignment. Reset
members not specified in
+ // the assignment to empty assignments.
+ Map<String, byte[]> assignments = new HashMap<>();
+ request.assignments()
+ .forEach(assignment ->
assignments.put(assignment.memberId(), assignment.assignment()));
+
+ Set<String> membersWithMissingAssignment = new HashSet<>();
+ group.allMembers().forEach(member -> {
+ byte[] assignment = assignments.get(member.memberId());
+ if (assignment != null) {
+ member.setAssignment(assignment);
+ } else {
+ membersWithMissingAssignment.add(member.memberId());
+ member.setAssignment(new byte[0]);
+ }
+ });
Review Comment:
I am not sure. I lean towards keeping the implementation as it was to avoid
any unwanted side effects.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]