CalvinConfluent commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1246995181


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -874,4 +1087,1362 @@ public void replay(
             consumerGroup.updateMember(newMember);
         }
     }
+
+    /**
+     * Replays GroupMetadataKey/Value to update the soft state of
+     * the generic group.
+     *
+     * @param key   A GroupMetadataKey key.
+     * @param value A GroupMetadataValue record.
+     */
+    public void replay(
+        GroupMetadataKey key,
+        GroupMetadataValue value,
+        short version
+    ) {
+        String groupId = key.group();
+
+        if (value == null)  {
+            // Tombstone. Group should not be added.
+        } else {
+            List<GenericGroupMember> loadedMembers = new ArrayList<>();
+            for (GroupMetadataValue.MemberMetadata member : value.members()) {
+                int rebalanceTimeout = version == 0 ? member.sessionTimeout() 
: member.rebalanceTimeout();
+
+                JoinGroupRequestProtocolCollection supportedProtocols = new 
JoinGroupRequestProtocolCollection();
+                supportedProtocols.add(new JoinGroupRequestProtocol()
+                    .setName(value.protocol())
+                    .setMetadata(member.subscription()));
+
+                GenericGroupMember loadedMember = new GenericGroupMember(
+                    member.memberId(),
+                    Optional.ofNullable(member.groupInstanceId()),
+                    member.clientId(),
+                    member.clientHost(),
+                    rebalanceTimeout,
+                    member.sessionTimeout(),
+                    value.protocolType(),
+                    supportedProtocols,
+                    member.assignment()
+                );
+
+                loadedMembers.add(loadedMember);
+            }
+
+            String protocolType = value.protocolType();
+
+            GenericGroup genericGroup = new GenericGroup(
+                this.logContext,
+                groupId,
+                loadedMembers.isEmpty() ? EMPTY : STABLE,
+                time,
+                value.generation(),
+                protocolType == null || protocolType.isEmpty() ? 
Optional.empty() : Optional.of(protocolType),
+                Optional.ofNullable(value.protocol()),
+                Optional.ofNullable(value.leader()),
+                value.currentStateTimestamp() == -1 ? Optional.empty() : 
Optional.of(value.currentStateTimestamp())
+            );
+
+            loadedMembers.forEach(member -> {
+                genericGroup.add(member, null);
+                log.info("Loaded member {} in group {} with generation {}.",
+                    member.memberId(), groupId, genericGroup.generationId());
+            });
+
+            genericGroup.setSubscribedTopics(
+                genericGroup.computeSubscribedTopics()
+            );
+        }
+    }
+
+    /**
+     * Handle a JoinGroupRequest.
+     *
+     * @param context The request context.
+     * @param request The actual JoinGroup request.
+     *
+     * @return The result that contains records to append if the join group 
phase completes.
+     */
+    public CoordinatorResult<Void, Record> genericGroupJoin(
+        RequestContext context,
+        JoinGroupRequestData request,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        CoordinatorResult<Void, Record> result = EMPTY_RESULT;
+
+        String groupId = request.groupId();
+        String memberId = request.memberId();
+        int sessionTimeoutMs = request.sessionTimeoutMs();
+
+        if (sessionTimeoutMs < genericGroupMinSessionTimeoutMs ||
+            sessionTimeoutMs > genericGroupMaxSessionTimeoutMs
+        ) {
+            responseFuture.complete(new JoinGroupResponseData()
+                .setMemberId(memberId)
+                .setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code())
+            );
+        } else {
+            boolean isUnknownMember = memberId.equals(UNKNOWN_MEMBER_ID);
+            // Group is created if it does not exist and the member id is 
UNKNOWN. if member
+            // is specified but group does not exist, request is rejected with 
GROUP_ID_NOT_FOUND
+            GenericGroup group;
+            try {
+                group = getOrMaybeCreateGenericGroup(groupId, isUnknownMember);
+            } catch (Throwable t) {
+                responseFuture.complete(new JoinGroupResponseData()
+                    .setMemberId(memberId)
+                    .setErrorCode(Errors.forException(t).code())
+                );
+                return EMPTY_RESULT;
+            }
+
+            if (group.isNew()) {
+                // If a group was newly created, we need to append records to 
the log
+                // to commit the group to the timeline datastructure.
+
+                CompletableFuture<Void> appendFuture = new 
CompletableFuture<>();
+                appendFuture.whenComplete((__, t) -> {
+                    if (t != null) {
+                        // We failed to write the empty group metadata. This 
will revert the snapshot, removing
+                        // the newly created group.
+                        log.warn("Failed to write empty metadata for group {}: 
{}", group.groupId(), t.getMessage());
+                    }
+                });
+
+                List<Record> records = Collections.singletonList(
+                    RecordHelpers.newGroupMetadataRecord(group, 
metadataImage.features().metadataVersion())
+                );
+
+                result = new CoordinatorResult<>(records, appendFuture);
+                genericGroupJoinMember(context, request, group, 
isUnknownMember, responseFuture);
+            } else {
+                result = genericGroupJoinMember(context, request, group, 
isUnknownMember, responseFuture);
+            }
+        }
+        return result;
+    }
+
+    private CoordinatorResult<Void, Record> genericGroupJoinMember(
+        RequestContext context,
+        JoinGroupRequestData request,
+        GenericGroup group,
+        boolean isUnknownMember,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        CoordinatorResult<Void, Record> result = EMPTY_RESULT;
+        String joinReason = request.reason();
+        String memberId = request.memberId();
+        if (joinReason == null || joinReason.isEmpty()) {
+            joinReason = "not provided";
+        }
+
+        if (!acceptJoiningMember(group, memberId)) {
+            group.remove(memberId);
+            responseFuture.complete(new JoinGroupResponseData()
+                .setMemberId(UNKNOWN_MEMBER_ID)
+                .setErrorCode(Errors.GROUP_MAX_SIZE_REACHED.code())
+            );
+
+        } else if (isUnknownMember) {
+            result = genericGroupJoinNewMember(
+                context,
+                request,
+                group,
+                joinReason,
+                responseFuture
+            );
+        } else {
+            result = genericGroupJoinExistingMember(
+                context,
+                request,
+                group,
+                joinReason,
+                responseFuture
+            );
+        }
+
+        tryCompleteJoin(group);
+        return result;
+    }
+
+    private CoordinatorResult<Void, Record> tryCompleteJoin(
+        GenericGroup group
+    ) {
+        // Attempt to complete join group phase. We do not complete
+        // the join group phase if this is the initial rebalance.
+        if (group.isInState(PREPARING_REBALANCE) &&
+            group.hasAllMembersJoined() &&
+            group.generationId() != 0
+        ) {
+            return completeGenericGroupJoin(group);
+        }
+        return EMPTY_RESULT;
+    }
+
+    /**
+     * Handle a new member generic group join.
+     *
+     * @param context         The request context.
+     * @param request         The join group request.
+     * @param group           The group to add the member.
+     * @param joinReason      The client reason for the join request.
+     * @param responseFuture  The response future to complete.
+     *
+     * @return The coordinator result that will be appended to the log.
+     */
+    private CoordinatorResult<Void, Record> genericGroupJoinNewMember(
+        RequestContext context,
+        JoinGroupRequestData request,
+        GenericGroup group,
+        String joinReason,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+
+        if (group.isInState(DEAD)) {
+            // if the group is marked as dead, it means some other thread has 
just removed the group
+            // from the coordinator metadata; it is likely that the group has 
migrated to some other
+            // coordinator OR the group is in a transient unstable phase. Let 
the member retry
+            // finding the correct coordinator and rejoin.
+            responseFuture.complete(new JoinGroupResponseData()
+                .setMemberId(UNKNOWN_MEMBER_ID)
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            );
+        } else if (!group.supportsProtocols(request.protocolType(), 
request.protocols())) {
+            responseFuture.complete(new JoinGroupResponseData()
+                .setMemberId(UNKNOWN_MEMBER_ID)
+                .setErrorCode(Errors.INCONSISTENT_GROUP_PROTOCOL.code())
+            );
+        } else {
+            Optional<String> groupInstanceId = 
Optional.ofNullable(request.groupInstanceId());
+            String newMemberId = group.generateMemberId(context.clientId(), 
groupInstanceId);
+
+            if (groupInstanceId.isPresent()) {
+                return genericGroupJoinNewStaticMember(
+                    context,
+                    request,
+                    group,
+                    newMemberId,
+                    joinReason,
+                    responseFuture
+                );
+            } else {
+                return genericGroupJoinNewDynamicMember(
+                    context,
+                    request,
+                    group,
+                    newMemberId,
+                    joinReason,
+                    responseFuture
+                );
+            }
+        }
+
+        return EMPTY_RESULT;
+    }
+
+    /**
+     * Handle new static member join. If there was an existing member id for 
the group instance id,
+     * replace that member. Otherwise, add the member and rebalance.
+     *
+     * @param context         The request context.
+     * @param request         The join group request.
+     * @param group           The group to add the member.
+     * @param newMemberId     The newly generated member id.
+     * @param joinReason      The client reason for the join request.
+     * @param responseFuture  The response future to complete.
+     *
+     * @return The coordinator result that will be appended to the log.
+     */
+    private CoordinatorResult<Void, Record> genericGroupJoinNewStaticMember(
+        RequestContext context,
+        JoinGroupRequestData request,
+        GenericGroup group,
+        String newMemberId,
+        String joinReason,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        String groupInstanceId = request.groupInstanceId();
+        String existingMemberId = group.staticMemberId(groupInstanceId);
+        if (existingMemberId != null) {
+            log.info("Static member with groupInstanceId={} and unknown member 
id joins " +
+                    "group {} in {} state. Replacing previously mapped member 
{} with this groupInstanceId.",
+                groupInstanceId, group.groupId(), group.currentState(), 
existingMemberId);
+
+            return updateStaticMemberAndRebalance(
+                context,
+                request,
+                group,
+                existingMemberId,
+                newMemberId,
+                joinReason,
+                responseFuture
+            );
+
+        } else {
+            log.info("Static member with groupInstanceId={} and unknown member 
id joins " +
+                    "group {} in {} state. Created a new member id {} for this 
member and added to the group.",
+                groupInstanceId, group.groupId(), group.currentState(), 
newMemberId);
+
+            return addMemberAndRebalance(context, request, group, newMemberId, 
joinReason, responseFuture);
+        }
+    }
+
+    /**
+     * Handle a new dynamic member join. If the member id field is required, 
the group metadata manager
+     * will add the new member id to the pending members and respond with 
MEMBER_ID_REQUIRED along with
+     * the new member id for the client to join with.
+     *
+     * Otherwise, add the new member to the group and rebalance.
+     *
+     * @param context         The request context.
+     * @param request         The join group request.
+     * @param group           The group to add the member.
+     * @param newMemberId     The newly generated member id.
+     * @param joinReason      The client reason for the join request.
+     * @param responseFuture  The response future to complete.
+     *
+     * @return The coordinator result that will be appended to the log.
+     */
+    private CoordinatorResult<Void, Record> genericGroupJoinNewDynamicMember(
+        RequestContext context,
+        JoinGroupRequestData request,
+        GenericGroup group,
+        String newMemberId,
+        String joinReason,
+        CompletableFuture<JoinGroupResponseData> responseFuture
+    ) {
+        boolean requireKnownMemberId = context.apiVersion() >= 4;
+
+        if (requireKnownMemberId) {
+            // If member id required, register the member in the pending 
member list and send
+            // back a response to call for another join group request with 
allocated member id.
+            log.info("Dynamic member with unknown member id joins group {} in 
{} state. " +
+                    "Created a new member id {} and requesting the member to 
rejoin with this id.",
+                group.groupId(), group.currentState(), newMemberId);
+
+            group.addPendingMember(newMemberId);

Review Comment:
   Do we need to handle the IllegalStateException(when member id is not known) 
and complete the responseFuture here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to