This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59e59fc545b MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager
(#14463)
59e59fc545b is described below
commit 59e59fc545b17c2893120d616ea4ed170af730b8
Author: Kirk True <[email protected]>
AuthorDate: Wed Oct 4 03:09:16 2023 -0700
MINOR: Add LEAVE_GROUP_EPOCH to GroupMetadataManager (#14463)
Replacing the use a hardcoded -1 with a constant (`LEAVE_GROUP_EPOCH`) that
provides more clarity. Since static members also have a magic number (-2), this
will motivate future commits to use constants instead of hardcoded values.
Reviewers: Sagar Rao <[email protected]>, David Jacot
<[email protected]>
---
.../common/requests/ConsumerGroupHeartbeatRequest.java | 5 +++++
.../kafka/coordinator/group/GroupMetadataManager.java | 16 ++++++++--------
.../coordinator/group/GroupMetadataManagerTest.java | 9 +++++----
3 files changed, 18 insertions(+), 12 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
index 6e42111670a..1b56c5b91c1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java
@@ -26,6 +26,11 @@ import java.nio.ByteBuffer;
public class ConsumerGroupHeartbeatRequest extends AbstractRequest {
+ /**
+ * A member epoch of <code>-1</code> means that the member wants to leave
the group.
+ */
+ public static final int LEAVE_GROUP_MEMBER_EPOCH = -1;
+
public static class Builder extends
AbstractRequest.Builder<ConsumerGroupHeartbeatRequest> {
private final ConsumerGroupHeartbeatRequestData data;
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 7588c598dcc..9ab0457aa11 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -101,6 +101,7 @@ import static
org.apache.kafka.common.protocol.Errors.COORDINATOR_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.ILLEGAL_GENERATION;
import static org.apache.kafka.common.protocol.Errors.NOT_COORDINATOR;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_SERVER_ERROR;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static org.apache.kafka.coordinator.group.Group.GroupType.CONSUMER;
import static org.apache.kafka.coordinator.group.Group.GroupType.GENERIC;
@@ -579,7 +580,7 @@ public class GroupMetadataManager {
throwIfEmptyString(request.rackId(), "RackId can't be empty.");
throwIfNotNull(request.subscribedTopicRegex(), "SubscribedTopicRegex
is not supported yet.");
- if (request.memberEpoch() > 0 || request.memberEpoch() == -1) {
+ if (request.memberEpoch() > 0 || request.memberEpoch() ==
LEAVE_GROUP_MEMBER_EPOCH) {
throwIfEmptyString(request.memberId(), "MemberId can't be empty.");
} else if (request.memberEpoch() == 0) {
if (request.rebalanceTimeoutMs() == -1) {
@@ -923,7 +924,7 @@ public class GroupMetadataManager {
List<Record> records = consumerGroupFenceMember(group, member);
return new CoordinatorResult<>(records, new
ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId)
- .setMemberEpoch(-1));
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
}
/**
@@ -1083,8 +1084,7 @@ public class GroupMetadataManager {
) throws ApiException {
throwIfConsumerGroupHeartbeatRequestIsInvalid(request);
- if (request.memberEpoch() == -1) {
- // -1 means that the member wants to leave the group.
+ if (request.memberEpoch() == LEAVE_GROUP_MEMBER_EPOCH) {
return consumerGroupLeave(
request.groupId(),
request.memberId()
@@ -1133,7 +1133,7 @@ public class GroupMetadataManager {
.build());
} else {
ConsumerGroupMember oldMember =
consumerGroup.getOrMaybeCreateMember(memberId, false);
- if (oldMember.memberEpoch() != -1) {
+ if (oldMember.memberEpoch() != LEAVE_GROUP_MEMBER_EPOCH) {
throw new IllegalStateException("Received a tombstone record
to delete member " + memberId
+ " but did not receive
ConsumerGroupCurrentMemberAssignmentValue tombstone.");
}
@@ -1354,9 +1354,9 @@ public class GroupMetadataManager {
consumerGroup.updateMember(newMember);
} else {
ConsumerGroupMember newMember = new
ConsumerGroupMember.Builder(oldMember)
- .setMemberEpoch(-1)
- .setPreviousMemberEpoch(-1)
- .setTargetMemberEpoch(-1)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
+ .setTargetMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedPartitions(Collections.emptyMap())
.setPartitionsPendingRevocation(Collections.emptyMap())
.setPartitionsPendingAssignment(Collections.emptyMap())
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 6bf08f10934..3a3db59855e 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -124,6 +124,7 @@ import java.util.stream.Stream;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import static
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static
org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
import static
org.apache.kafka.common.requests.JoinGroupRequest.UNKNOWN_MEMBER_ID;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
import static
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
@@ -1859,7 +1860,7 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId2)
- .setMemberEpoch(-1)
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setRebalanceTimeoutMs(5000)
.setSubscribedTopicNames(Arrays.asList("foo", "bar"))
.setTopicPartitions(Collections.emptyList()));
@@ -1867,7 +1868,7 @@ public class GroupMetadataManagerTest {
assertResponseEquals(
new ConsumerGroupHeartbeatResponseData()
.setMemberId(memberId2)
- .setMemberEpoch(-1),
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH),
result.response()
);
@@ -3225,8 +3226,8 @@ public class GroupMetadataManagerTest {
new ConsumerGroupHeartbeatRequestData()
.setGroupId(groupId)
.setMemberId(memberId)
- .setMemberEpoch(-1));
- assertEquals(-1, result.response().memberEpoch());
+ .setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH));
+ assertEquals(LEAVE_GROUP_MEMBER_EPOCH,
result.response().memberEpoch());
// Verify that there are no timers.
context.assertNoSessionTimeout(groupId, memberId);