This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e1e68db8da KAFKA-19233: Add tests for duplicate heartbeat request
handling (#21319)
0e1e68db8da is described below
commit 0e1e68db8da571fe3c08bdfc29c24dd8cc477997
Author: David Jacot <[email protected]>
AuthorDate: Sat Jan 17 09:46:49 2026 +0100
KAFKA-19233: Add tests for duplicate heartbeat request handling (#21319)
This patch adds tests to validate that duplicate full heartbeat requests
are handled idempotently in all member states (STABLE,
UNREVOKED_PARTITIONS, UNRELEASED_PARTITIONS).
Reviewers: Dongnuo Lyu <[email protected]>, Lianet Magrans
<[email protected]>
---
.../server/ConsumerGroupHeartbeatRequestTest.scala | 334 +++++++++++++++++++
.../group/GroupMetadataManagerTest.java | 369 +++++++++++++++++++++
2 files changed, 703 insertions(+)
diff --git
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 0d54ca6e25c..6707ee94b7f 100644
---
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -764,4 +764,338 @@ class ConsumerGroupHeartbeatRequestTest(cluster:
ClusterInstance) extends GroupC
assertEquals(expectedRejoinResponse, rejoinResponse.data)
}
+
+ @ClusterTest
+ def testDuplicateFullHeartbeatInStableState(): Unit = {
+ createOffsetsTopic()
+
+ val memberId = Uuid.randomUuid().toString
+ val groupId = "test-duplicate-stable-grp"
+
+ // Create topic first so member gets assignment immediately.
+ val topicId = createTopic(topic = "foo", numPartitions = 3)
+
+ // Join the group.
+ val request = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response: ConsumerGroupHeartbeatResponse = null
+ val expectedAssignment = new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+ TestUtils.waitUntilTrue(() => {
+ response = connectAndReceive[ConsumerGroupHeartbeatResponse](request)
+ response.data.errorCode == Errors.NONE.code &&
+ response.data.assignment == expectedAssignment
+ }, msg = s"Could not get assignment. Last response $response.")
+
+ val stableEpoch = response.data.memberEpoch
+
+ // Send full heartbeat request.
+ val fullRequest = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(stableEpoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+ ).build()
+
+ val firstResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest)
+
+ val expectedFirstResponse = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId)
+ .setMemberEpoch(stableEpoch)
+ .setHeartbeatIntervalMs(firstResponse.data.heartbeatIntervalMs)
+ .setAssignment(expectedAssignment)
+
+ assertEquals(expectedFirstResponse, firstResponse.data)
+
+ // Send duplicate heartbeat request.
+ val duplicateResponse =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest)
+
+ // Verify duplicate produces same response.
+ assertEquals(expectedFirstResponse, duplicateResponse.data)
+ }
+
+ @ClusterTest
+ def testDuplicateFullHeartbeatWhileWaitingForPartitions(): Unit = {
+ createOffsetsTopic()
+
+ val memberId1 = Uuid.randomUuid().toString
+ val memberId2 = Uuid.randomUuid().toString
+ val groupId = "test-duplicate-waiting-grp"
+
+ // Create topic.
+ val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+ // Member 1 joins and gets all partitions.
+ val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response1: ConsumerGroupHeartbeatResponse = null
+ val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+ TestUtils.waitUntilTrue(() => {
+ response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+ response1.data.errorCode == Errors.NONE.code &&
+ response1.data.assignment == allPartitions
+ }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+ // Member 2 joins, triggering rebalance. Member 2 will wait for Member 1
to release partitions.
+ val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response2: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+ response2.data.errorCode == Errors.NONE.code
+ }, msg = s"Member 2 could not join. Last response $response2.")
+
+ val member2Epoch = response2.data.memberEpoch
+
+ // Member 2 sends full heartbeat while waiting for partitions from Member
1.
+ val fullRequest2 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(member2Epoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ val firstResponse2 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2)
+
+ val expectedFirstResponse2 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId2)
+ .setMemberEpoch(member2Epoch)
+ .setHeartbeatIntervalMs(firstResponse2.data.heartbeatIntervalMs)
+ .setAssignment(firstResponse2.data.assignment)
+
+ assertEquals(expectedFirstResponse2, firstResponse2.data)
+
+ // Send duplicate heartbeat request.
+ val duplicateResponse2 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2)
+
+ // Verify duplicate produces same response.
+ assertEquals(expectedFirstResponse2, duplicateResponse2.data)
+ }
+
+ @ClusterTest
+ def testDuplicateFullHeartbeatDuringRevocation(): Unit = {
+ createOffsetsTopic()
+
+ val memberId1 = Uuid.randomUuid().toString
+ val memberId2 = Uuid.randomUuid().toString
+ val groupId = "test-duplicate-revocation-grp"
+
+ // Create topic.
+ val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+ // Member 1 joins and gets all partitions.
+ val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response1: ConsumerGroupHeartbeatResponse = null
+ val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+ TestUtils.waitUntilTrue(() => {
+ response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+ response1.data.errorCode == Errors.NONE.code &&
+ response1.data.assignment == allPartitions
+ }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+ val member1Epoch = response1.data.memberEpoch
+
+ // Member 2 joins, triggering rebalance.
+ val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response2: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+ response2.data.errorCode == Errors.NONE.code
+ }, msg = s"Member 2 could not join. Last response $response2.")
+
+ // Member 1 sends full heartbeat (still reporting all partitions).
+ val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1Epoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+ ).build()
+
+ val firstResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+ val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId1)
+ .setMemberEpoch(firstResponse1.data.memberEpoch)
+ .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs)
+ .setAssignment(firstResponse1.data.assignment)
+
+ assertEquals(expectedFirstResponse1, firstResponse1.data)
+
+ // Send duplicate heartbeat request.
+ val duplicateResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+ // Verify duplicate produces same response.
+ assertEquals(expectedFirstResponse1, duplicateResponse1.data)
+ }
+
+ @ClusterTest
+ def testDuplicateFullHeartbeatWithRevocationAck(): Unit = {
+ createOffsetsTopic()
+
+ val memberId1 = Uuid.randomUuid().toString
+ val memberId2 = Uuid.randomUuid().toString
+ val groupId = "test-duplicate-revocation-ack-grp"
+
+ // Create topic.
+ val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+ // Member 1 joins and gets all partitions.
+ val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response1: ConsumerGroupHeartbeatResponse = null
+ val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+ TestUtils.waitUntilTrue(() => {
+ response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+ response1.data.errorCode == Errors.NONE.code &&
+ response1.data.assignment == allPartitions
+ }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+ val member1InitialEpoch = response1.data.memberEpoch
+
+ // Member 2 joins, triggering rebalance.
+ val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId2)
+ .setMemberEpoch(0)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List.empty.asJava)
+ ).build()
+
+ var response2: ConsumerGroupHeartbeatResponse = null
+ TestUtils.waitUntilTrue(() => {
+ response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+ response2.data.errorCode == Errors.NONE.code
+ }, msg = s"Member 2 could not join. Last response $response2.")
+
+ // Member 1 sends heartbeat acknowledging revocation (only reporting
partition 0).
+ val ackRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1InitialEpoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0).asJava)).asJava)
+ ).build()
+
+ val ackResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](ackRequest1)
+ assertEquals(Errors.NONE.code, ackResponse1.data.errorCode)
+
+ val member1NewEpoch = ackResponse1.data.memberEpoch
+
+ // Member 1 sends full heartbeat with new epoch.
+ val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+ new ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(member1NewEpoch)
+ .setRebalanceTimeoutMs(5 * 60 * 1000)
+ .setSubscribedTopicNames(List("foo").asJava)
+ .setTopicPartitions(List(new
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(topicId)
+ .setPartitions(List[Integer](0).asJava)).asJava)
+ ).build()
+
+ val firstResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+ val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData()
+ .setErrorCode(Errors.NONE.code)
+ .setMemberId(memberId1)
+ .setMemberEpoch(firstResponse1.data.memberEpoch)
+ .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs)
+ .setAssignment(firstResponse1.data.assignment)
+
+ assertEquals(expectedFirstResponse1, firstResponse1.data)
+
+ // Send duplicate heartbeat request.
+ val duplicateResponse1 =
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+ // Verify duplicate produces same response.
+ assertEquals(expectedFirstResponse1, duplicateResponse1.data)
+ }
}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b6a2b6e5607..6b25e42475f 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -694,6 +694,375 @@ public class GroupMetadataManagerTest {
);
}
+ @Test
+ public void testDuplicateFullHeartbeatInStableState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in STABLE state with epoch 100.
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.STABLE)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))))),
+ result1.response()
+ );
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], but member still owns [0, 1, 2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request with current epoch. Member still reports owning
all partitions.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1, 2))));
+
+ // First heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() {
+ String groupId = "fooup";
+ String memberId1 = Uuid.randomUuid().toString();
+ String memberId2 = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100.
+ // Member 1 has [0] assigned but target is [0, 1, 2].
+ // Member 2 still owns [1, 2] and needs to revoke them.
+ ConsumerGroupMember member1 = new
ConsumerGroupMember.Builder(memberId1)
+ .setState(MemberState.UNRELEASED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0)))
+ .build();
+
+ ConsumerGroupMember member2 = new
ConsumerGroupMember.Builder(memberId2)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(99)
+ .setPreviousMemberEpoch(98)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment())
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1,
2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member2));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
100, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId1, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1, 2)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId2, mkAssignment()));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
100));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member1));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member2));
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Create full request with current epoch.
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))));
+
+ // First heartbeat. Member is UNRELEASED_PARTITIONS so response
includes current assignment.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId1)
+ .setMemberEpoch(100)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.UNRELEASED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId1));
+ }
+
+ @Test
+ public void testDuplicateFullHeartbeatWithRevocationAck() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+ Uuid fooTopicId = Uuid.randomUuid();
+ String fooTopicName = "foo";
+
+ CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(fooTopicId, fooTopicName, 3)
+ .addRacks()
+ .buildCoordinatorMetadataImage();
+
+ long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+ MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG,
List.of(assignor))
+ .withMetadataImage(metadataImage)
+ .build();
+
+ // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+ // Target assignment is [0, 1], member needs to revoke [2].
+ ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+ .setState(MemberState.UNREVOKED_PARTITIONS)
+ .setMemberEpoch(100)
+ .setPreviousMemberEpoch(99)
+ .setRebalanceTimeoutMs(5000)
+ .setClientId(DEFAULT_CLIENT_ID)
+ .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignorName("range")
+ .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId,
0, 1)))
+
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+ .build();
+
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
member));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
101, computeGroupHash(Map.of(
+ fooTopicName, fooTopicHash
+ ))));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
memberId, mkAssignment(
+ mkTopicAssignment(fooTopicId, 0, 1)
+ )));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
101));
+
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
member));
+
+ assertEquals(MemberState.UNREVOKED_PARTITIONS,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Create full request acknowledging revocation (only owns [0, 1]).
+ ConsumerGroupHeartbeatRequestData fullRequest = new
ConsumerGroupHeartbeatRequestData()
+ .setGroupId(groupId)
+ .setMemberId(memberId)
+ .setMemberEpoch(100)
+ .setRebalanceTimeoutMs(5000)
+ .setSubscribedTopicNames(List.of("foo"))
+ .setServerAssignor("range")
+ .setTopicPartitions(List.of(
+ new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))));
+
+ // First heartbeat acknowledges revocation and transitions to STABLE.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result1 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ assertResponseEquals(
+ new ConsumerGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(101)
+ .setHeartbeatIntervalMs(5000)
+ .setAssignment(new
ConsumerGroupHeartbeatResponseData.Assignment()
+ .setTopicPartitions(List.of(
+ new
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+ .setTopicId(fooTopicId)
+ .setPartitions(List.of(0, 1))))),
+ result1.response()
+ );
+
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+
+ // Duplicate heartbeat.
+ CoordinatorResult<ConsumerGroupHeartbeatResponseData,
CoordinatorRecord> result2 =
+ context.consumerGroupHeartbeat(fullRequest);
+
+ // Verify duplicate produces same response with no records.
+ assertResponseEquals(result1.response(), result2.response());
+ assertEquals(List.of(), result2.records());
+ assertEquals(MemberState.STABLE,
context.consumerGroupMemberState(groupId, memberId));
+ }
+
@Test
public void testShareGroupMemberCanRejoinWithEpochZero() {
String groupId = "fooup";