This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 59e59fc545b MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager 
(#14463)
59e59fc545b is described below

commit 59e59fc545b17c2893120d616ea4ed170af730b8
Author: Kirk True <[email protected]>
AuthorDate: Wed Oct 4 03:09:16 2023 -0700

    MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager (#14463)
    
    Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that 
provides more clarity. Since static members also have a magic number (-2), this 
will motivate future commits to use constants instead of hardcoded values.
    
    Reviewers: Sagar Rao <[email protected]>, David Jacot 
<[email protected]>
---
 .../common/requests/ConsumerGroupHeartbeatRequest.java   |  5 +++++
 .../kafka/coordinator/group/GroupMetadataManager.java    | 16 ++++++++--------
 .../coordinator/group/GroupMetadataManagerTest.java      |  9 +++++----
 3 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 6e42111670a..1b56c5b91c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -26,6 +26,11 @@ import java.nio.ByteBuffer;
 
 public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
 
+    /**
+     * A member epoch of <code>-1</code> means that the member wants to leave 
the group.
+     */
+    public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+
     public static class Builder extends 
AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
         private final ConsumerGroupHeartbeatRequestData data;
 
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7588c598dcc..9ab0457aa11 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -101,6 +101,7 @@ import static 
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
 import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
 import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
 import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
+import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
 import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
 import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC;
@@ -579,7 +580,7 @@ public class GroupMetadataManager {
         throwIfEmptyString(request.rackId(), "RackId can't be empty.");
         throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex 
is not supported yet.");
 
-        if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+        if (request.memberEpoch() > 0 || request.memberEpoch() == 
LEAVE_GROUP_MEMBER_EPOCH) {
             throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
         } else if (request.memberEpoch() == 0) {
             if (request.rebalanceTimeoutMs() == -1) {
@@ -923,7 +924,7 @@ public class GroupMetadataManager {
         List<Record> records = consumerGroupFenceMember(group, member);
         return new CoordinatorResult<>(records, new 
ConsumerGroupHeartbeatResponseData()
             .setMemberId(memberId)
-            .setMemberEpoch(-1));
+            .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
     }
 
     /**
@@ -1083,8 +1084,7 @@ public class GroupMetadataManager {
     ) throws ApiException {
         throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
 
-        if (request.memberEpoch() == -1) {
-            // -1 means that the member wants to leave the group.
+        if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
             return consumerGroupLeave(
                 request.groupId(),
                 request.memberId()
@@ -1133,7 +1133,7 @@ public class GroupMetadataManager {
                 .build());
         } else {
             ConsumerGroupMember oldMember = 
consumerGroup.getOrMaybeCreateMember(memberId, false);
-            if (oldMember.memberEpoch() != -1) {
+            if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
                 throw new IllegalStateException("Received a tombstone record 
to delete member " + memberId
                     + " but did not receive 
ConsumerGroupCurrentMemberAssignmentValue tombstone.");
             }
@@ -1354,9 +1354,9 @@ public class GroupMetadataManager {
             consumerGroup.updateMember(newMember);
         } else {
             ConsumerGroupMember newMember = new 
ConsumerGroupMember.Builder(oldMember)
-                .setMemberEpoch(-1)
-                .setPreviousMemberEpoch(-1)
-                .setTargetMemberEpoch(-1)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+                .setTargetMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
                 .setAssignedPartitions(Collections.emptyMap())
                 .setPartitionsPendingRevocation(Collections.emptyMap())
                 .setPartitionsPendingAssignment(Collections.emptyMap())
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6bf08f10934..3a3db59855e 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -124,6 +124,7 @@ import java.util.stream.Stream;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import static 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static 
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
 import static 
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -1859,7 +1860,7 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId2)
-                .setMemberEpoch(-1)
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
                 .setRebalanceTimeoutMs(5000)
                 .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
                 .setTopicPartitions(Collections.emptyList()));
@@ -1867,7 +1868,7 @@ public class GroupMetadataManagerTest {
         assertResponseEquals(
             new ConsumerGroupHeartbeatResponseData()
                 .setMemberId(memberId2)
-                .setMemberEpoch(-1),
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
             result.response()
         );
 
@@ -3225,8 +3226,8 @@ public class GroupMetadataManagerTest {
             new ConsumerGroupHeartbeatRequestData()
                 .setGroupId(groupId)
                 .setMemberId(memberId)
-                .setMemberEpoch(-1));
-        assertEquals(-1, result.response().memberEpoch());
+                .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+        assertEquals(LEAVE_GROUP_MEMBER_EPOCH, 
result.response().memberEpoch());
 
         // Verify that there are no timers.
         context.assertNoSessionTimeout(groupId, memberId);

Reply via email to