dajac commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1348699907


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -107,6 +107,11 @@ public static class DeadlineAndEpoch {
      */
     private final TimelineHashMap<String, ConsumerGroupMember> members;
 
+    /**
+     * The static group members.
+     */
+    private final Map<String, String> staticMembers;

Review Comment:
   I suppose that this must be a `TimelineHashMap`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -564,6 +564,22 @@ private void throwIfNotNull(
         }
     }
 
+    /**
+     * Throws an InvalidRequestException if the value is null.
+     *
+     * @param value The value.
+     * @param error The error message.
+     * @throws InvalidRequestException
+     */
+    private void throwIfNull(
+            Object value,
+            String error

Review Comment:
   nit: Indentation should be 4 spaces.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);
+                return member;
+            } else {
+                // Get the details of the existing member
+                ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+                int currentMemberEpoch = existingMember.memberEpoch();
+                // A new member with a used instance id joined but the 
previous member using the same instance id
+                // hasn't requested leaving the group.
+                if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+                    throw Errors.UNRELEASED_INSTANCE_ID.exception();
+                }
+                // A new static member is trying to take the place of a 
departed static member. We will
+                // provide the assignments of the old member to the new one.
+                member = new ConsumerGroupMember.Builder(memberId, 
existingMember)
+                        .setMemberEpoch(existingMember.targetMemberEpoch())
+                        .setPreviousMemberEpoch(0)
+                        
.setTargetMemberEpoch(existingMember.targetMemberEpoch())
+                        .build();
+                updateMember(member);
+                staticMembers.put(instanceId, memberId);

Review Comment:
   As said previously, the state should not be updated here but in replay.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -750,7 +770,9 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        final ConsumerGroupMember member = instanceId == null ?
+                group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+                group.getOrMaybeCreateStaticMember(memberId, instanceId, 
createIfNotExists);

Review Comment:
   nit: indentation.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -908,23 +936,40 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
+        ConsumerGroupMember member = memberEpoch == -2 ?
+                group.getOrMaybeCreateStaticMember(memberId, instanceId, 
false) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        // The departing member is a static one. We don't need to fence this 
member because it is
+        // expected to come back within session timeout
+        if (memberEpoch == -2) {

Review Comment:
   I would rather prefer to have a separate method for the static leave group.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1127,7 +1176,9 @@ public void replay(
         Set<String> oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
 
         if (value != null) {
-            ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
+            ConsumerGroupMember oldMember = value.instanceId() != null ?
+                    consumerGroup.getOrMaybeCreateStaticMember(memberId, 
value.instanceId(), true) :
+                    consumerGroup.getOrMaybeCreateMember(memberId, true);
             consumerGroup.updateMember(new 
ConsumerGroupMember.Builder(oldMember)

Review Comment:
   In there, we need to update the static id mapping in updateMember and 
removeMember, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();

Review Comment:
   Could we add custom message to all the exceptions raise in this method?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);

Review Comment:
   The state should not be updated like this. All the updates are handled in 
the `replay()` methods.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);
+                return member;
+            } else {
+                // Get the details of the existing member
+                ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+                int currentMemberEpoch = existingMember.memberEpoch();
+                // A new member with a used instance id joined but the 
previous member using the same instance id
+                // hasn't requested leaving the group.
+                if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {

Review Comment:
   I wonder if we really need the second part of the condition here. What was 
your thinking about it?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists
+    ) {
+        ConsumerGroupMember member;
+        String existingMemberId = staticMemberId(instanceId);
+        if (!createIfNotExists) {
+            // The member joined with a non-zero epoch but we haven't 
registered this static member
+            // This could be an unknown member for the coordinator.
+            if (existingMemberId == null) {
+                throw Errors.UNKNOWN_MEMBER_ID.exception();
+            }
+            // We can't create a member at this point. If the 2 member-ids 
don't match,
+            // we will throw an error.
+            if (!existingMemberId.equals(memberId)) {
+                throw Errors.FENCED_INSTANCE_ID.exception();
+            }
+            member = getOrMaybeCreateMember(memberId, false);
+        } else {
+            // No existing member found against this instance id. Creating new.
+            if (existingMemberId == null) {
+                member = getOrMaybeCreateMember(memberId, true);
+                staticMembers.put(instanceId, memberId);
+                return member;
+            } else {
+                // Get the details of the existing member
+                ConsumerGroupMember existingMember = 
getOrMaybeCreateMember(existingMemberId, false);
+                int currentMemberEpoch = existingMember.memberEpoch();
+                // A new member with a used instance id joined but the 
previous member using the same instance id
+                // hasn't requested leaving the group.
+                if (currentMemberEpoch != -2 && 
!existingMemberId.equals(memberId)) {
+                    throw Errors.UNRELEASED_INSTANCE_ID.exception();
+                }
+                // A new static member is trying to take the place of a 
departed static member. We will
+                // provide the assignments of the old member to the new one.
+                member = new ConsumerGroupMember.Builder(memberId, 
existingMember)
+                        .setMemberEpoch(existingMember.targetMemberEpoch())
+                        .setPreviousMemberEpoch(0)
+                        
.setTargetMemberEpoch(existingMember.targetMemberEpoch())
+                        .build();

Review Comment:
   This is a bit weird because you pass `existingMember` to the builder and 
then you still have to override other fields. Would it be better to do `new 
ConsumerGroupMember.Builder(existingMember)` and then override the fields? I 
think that we only need to set the new member id.
   
   nit: The indentation should be four spaces.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroup.java:
##########
@@ -285,6 +302,64 @@ public ConsumerGroupMember getOrMaybeCreateMember(
         return member;
     }
 
+    /**
+     * Gets or creates a static member.
+     *
+     * @param memberId          The member id.
+     * @param instanceId        The group instance id.
+     * @param createIfNotExists Booleans indicating whether the member must be
+     *                          created if it does not exist.
+     *
+     * @return A ConsumerGroupMember.
+     */
+    public ConsumerGroupMember getOrMaybeCreateStaticMember(
+            String memberId,
+            String instanceId,
+            boolean createIfNotExists

Review Comment:
   nit: Indentation should be four spaces.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -790,6 +812,12 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
                     groupId, memberId, updatedMember.subscribedTopicRegex());
                 bumpGroupEpoch = true;
             }
+        } else {
+            // A new static member replaces an older one with the same 
instance id, assignments etc.
+            // We will create a new member subscription record for this new 
member.
+            if (instanceId != null) {
+                records.add(newMemberSubscriptionRecord(groupId, 
updatedMember));
+            }

Review Comment:
   This does not seem correct because we will write a record whenever the 
member is not updated and we have an instance id. I think that it would be 
better to capture the fact that we have a new static member in the condition at 
L801.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -908,23 +936,40 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
      * Handles leave request from a consumer group member.
      * @param groupId       The group id from the request.
      * @param memberId      The member id from the request.
+     * @param memberEpoch   The member epoch from the request.
      *
      * @return A Result containing the ConsumerGroupHeartbeat response and
      *         a list of records to update the state machine.
      */
     private CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
consumerGroupLeave(
         String groupId,
-        String memberId
+        String instanceId,
+        String memberId,
+        int memberEpoch
     ) throws ApiException {
         ConsumerGroup group = getOrMaybeCreateConsumerGroup(groupId, false);
-        ConsumerGroupMember member = group.getOrMaybeCreateMember(memberId, 
false);
-
-        log.info("[GroupId " + groupId + "] Member " + memberId + " left the 
consumer group.");
+        ConsumerGroupMember member = memberEpoch == -2 ?
+                group.getOrMaybeCreateStaticMember(memberId, instanceId, 
false) :
+                group.getOrMaybeCreateMember(memberId, false);
 
-        List<Record> records = consumerGroupFenceMember(group, member);
+        List<Record> records = new ArrayList<>();
+        // The departing member is a static one. We don't need to fence this 
member because it is
+        // expected to come back within session timeout
+        if (memberEpoch == -2) {
+            log.info("[GroupId {}] Member {} with instance id {} is a static 
member and will not be fenced from the group",
+                    groupId, memberId, member.instanceId());
+            // We will write a member epoch of -2 for this departing static 
member.
+            ConsumerGroupMember leavingStaticMember = new 
ConsumerGroupMember.Builder(member)
+                    .setMemberEpoch(-2)
+                    .build();

Review Comment:
   nit: indentation.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -750,7 +770,9 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        final ConsumerGroupMember member = instanceId == null ?
+                group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+                group.getOrMaybeCreateStaticMember(memberId, instanceId, 
createIfNotExists);
         throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
 
         if (memberEpoch == 0) {

Review Comment:
   I just thought about something else. When a static member is replaced, we 
need to write records to erase the state of the previous member.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -750,7 +770,9 @@ private 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGr
 
         // Get or create the member.
         if (memberId.isEmpty()) memberId = Uuid.randomUuid().toString();
-        final ConsumerGroupMember member = 
group.getOrMaybeCreateMember(memberId, createIfNotExists);
+        final ConsumerGroupMember member = instanceId == null ?
+                group.getOrMaybeCreateMember(memberId, createIfNotExists) :
+                group.getOrMaybeCreateStaticMember(memberId, instanceId, 
createIfNotExists);
         throwIfMemberEpochIsInvalid(member, memberEpoch, ownedTopicPartitions);
 
         if (memberEpoch == 0) {

Review Comment:
   I wonder if we could log something here as well when a static member is 
replaced.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1084,10 +1129,14 @@ public 
CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGro
     ) throws ApiException {
         throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
 
-        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
+        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH || 
request.memberEpoch() == -2) {

Review Comment:
   nit: Should we introduce a constant for -2 as well?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1127,7 +1176,9 @@ public void replay(
         Set<String> oldSubscribedTopicNames = new 
HashSet<>(consumerGroup.subscribedTopicNames());
 
         if (value != null) {
-            ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, true);
+            ConsumerGroupMember oldMember = value.instanceId() != null ?
+                    consumerGroup.getOrMaybeCreateStaticMember(memberId, 
value.instanceId(), true) :
+                    consumerGroup.getOrMaybeCreateMember(memberId, true);

Review Comment:
   We don't need to use getOrMaybeCreateStaticMember here as we only want to 
look up the member by its id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to