This is an automated email from the ASF dual-hosted git repository. clolov pushed a commit to branch 4.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit c6b400cf77c4c9d1fd8b2ce772f2dc8bbcf2945e Author: David Jacot <[email protected]> AuthorDate: Sat Jan 17 09:46:49 2026 +0100 KAFKA-19233: Add tests for duplicate heartbeat request handling (#21319) This patch adds tests to validate that duplicate full heartbeat requests are handled idempotently in all member states (STABLE, UNREVOKED_PARTITIONS, UNRELEASED_PARTITIONS). Reviewers: Dongnuo Lyu <[email protected]>, Lianet Magrans <[email protected]> --- .../server/ConsumerGroupHeartbeatRequestTest.scala | 334 +++++++++++++++++++ .../group/GroupMetadataManagerTest.java | 369 +++++++++++++++++++++ 2 files changed, 703 insertions(+) diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index ae2bb4186ec..ea45d386ec7 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -851,4 +851,338 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) extends GroupC assertEquals(expectedRejoinResponse, rejoinResponse.data) } + + @ClusterTest + def testDuplicateFullHeartbeatInStableState(): Unit = { + createOffsetsTopic() + + val memberId = Uuid.randomUuid().toString + val groupId = "test-duplicate-stable-grp" + + // Create topic first so member gets assignment immediately. + val topicId = createTopic(topic = "foo", numPartitions = 3) + + // Join the group. + val request = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response: ConsumerGroupHeartbeatResponse = null + val expectedAssignment = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + + TestUtils.waitUntilTrue(() => { + response = connectAndReceive[ConsumerGroupHeartbeatResponse](request) + response.data.errorCode == Errors.NONE.code && + response.data.assignment == expectedAssignment + }, msg = s"Could not get assignment. Last response $response.") + + val stableEpoch = response.data.memberEpoch + + // Send full heartbeat request. + val fullRequest = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(stableEpoch) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1, 2).asJava)).asJava) + ).build() + + val firstResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest) + + val expectedFirstResponse = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId) + .setMemberEpoch(stableEpoch) + .setHeartbeatIntervalMs(firstResponse.data.heartbeatIntervalMs) + .setAssignment(expectedAssignment) + + assertEquals(expectedFirstResponse, firstResponse.data) + + // Send duplicate heartbeat request. + val duplicateResponse = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest) + + // Verify duplicate produces same response. + assertEquals(expectedFirstResponse, duplicateResponse.data) + } + + @ClusterTest + def testDuplicateFullHeartbeatWhileWaitingForPartitions(): Unit = { + createOffsetsTopic() + + val memberId1 = Uuid.randomUuid().toString + val memberId2 = Uuid.randomUuid().toString + val groupId = "test-duplicate-waiting-grp" + + // Create topic. + val topicId = createTopic(topic = "foo", numPartitions = 2) + + // Member 1 joins and gets all partitions. + val request1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response1: ConsumerGroupHeartbeatResponse = null + val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1).asJava)).asJava) + + TestUtils.waitUntilTrue(() => { + response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1) + response1.data.errorCode == Errors.NONE.code && + response1.data.assignment == allPartitions + }, msg = s"Member 1 could not get assignment. Last response $response1.") + + // Member 2 joins, triggering rebalance. Member 2 will wait for Member 1 to release partitions. + val request2 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response2: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2) + response2.data.errorCode == Errors.NONE.code + }, msg = s"Member 2 could not join. Last response $response2.") + + val member2Epoch = response2.data.memberEpoch + + // Member 2 sends full heartbeat while waiting for partitions from Member 1. + val fullRequest2 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(member2Epoch) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + val firstResponse2 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2) + + val expectedFirstResponse2 = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId2) + .setMemberEpoch(member2Epoch) + .setHeartbeatIntervalMs(firstResponse2.data.heartbeatIntervalMs) + .setAssignment(firstResponse2.data.assignment) + + assertEquals(expectedFirstResponse2, firstResponse2.data) + + // Send duplicate heartbeat request. + val duplicateResponse2 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2) + + // Verify duplicate produces same response. + assertEquals(expectedFirstResponse2, duplicateResponse2.data) + } + + @ClusterTest + def testDuplicateFullHeartbeatDuringRevocation(): Unit = { + createOffsetsTopic() + + val memberId1 = Uuid.randomUuid().toString + val memberId2 = Uuid.randomUuid().toString + val groupId = "test-duplicate-revocation-grp" + + // Create topic. + val topicId = createTopic(topic = "foo", numPartitions = 2) + + // Member 1 joins and gets all partitions. + val request1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response1: ConsumerGroupHeartbeatResponse = null + val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1).asJava)).asJava) + + TestUtils.waitUntilTrue(() => { + response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1) + response1.data.errorCode == Errors.NONE.code && + response1.data.assignment == allPartitions + }, msg = s"Member 1 could not get assignment. Last response $response1.") + + val member1Epoch = response1.data.memberEpoch + + // Member 2 joins, triggering rebalance. + val request2 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response2: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2) + response2.data.errorCode == Errors.NONE.code + }, msg = s"Member 2 could not join. Last response $response2.") + + // Member 1 sends full heartbeat (still reporting all partitions). + val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(member1Epoch) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1).asJava)).asJava) + ).build() + + val firstResponse1 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1) + + val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId1) + .setMemberEpoch(firstResponse1.data.memberEpoch) + .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs) + .setAssignment(firstResponse1.data.assignment) + + assertEquals(expectedFirstResponse1, firstResponse1.data) + + // Send duplicate heartbeat request. + val duplicateResponse1 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1) + + // Verify duplicate produces same response. + assertEquals(expectedFirstResponse1, duplicateResponse1.data) + } + + @ClusterTest + def testDuplicateFullHeartbeatWithRevocationAck(): Unit = { + createOffsetsTopic() + + val memberId1 = Uuid.randomUuid().toString + val memberId2 = Uuid.randomUuid().toString + val groupId = "test-duplicate-revocation-ack-grp" + + // Create topic. + val topicId = createTopic(topic = "foo", numPartitions = 2) + + // Member 1 joins and gets all partitions. + val request1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response1: ConsumerGroupHeartbeatResponse = null + val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List(new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0, 1).asJava)).asJava) + + TestUtils.waitUntilTrue(() => { + response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1) + response1.data.errorCode == Errors.NONE.code && + response1.data.assignment == allPartitions + }, msg = s"Member 1 could not get assignment. Last response $response1.") + + val member1InitialEpoch = response1.data.memberEpoch + + // Member 2 joins, triggering rebalance. + val request2 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List.empty.asJava) + ).build() + + var response2: ConsumerGroupHeartbeatResponse = null + TestUtils.waitUntilTrue(() => { + response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2) + response2.data.errorCode == Errors.NONE.code + }, msg = s"Member 2 could not join. Last response $response2.") + + // Member 1 sends heartbeat acknowledging revocation (only reporting partition 0). + val ackRequest1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(member1InitialEpoch) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0).asJava)).asJava) + ).build() + + val ackResponse1 = connectAndReceive[ConsumerGroupHeartbeatResponse](ackRequest1) + assertEquals(Errors.NONE.code, ackResponse1.data.errorCode) + + val member1NewEpoch = ackResponse1.data.memberEpoch + + // Member 1 sends full heartbeat with new epoch. + val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder( + new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(member1NewEpoch) + .setRebalanceTimeoutMs(5 * 60 * 1000) + .setSubscribedTopicNames(List("foo").asJava) + .setTopicPartitions(List(new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(topicId) + .setPartitions(List[Integer](0).asJava)).asJava) + ).build() + + val firstResponse1 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1) + + val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData() + .setErrorCode(Errors.NONE.code) + .setMemberId(memberId1) + .setMemberEpoch(firstResponse1.data.memberEpoch) + .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs) + .setAssignment(firstResponse1.data.assignment) + + assertEquals(expectedFirstResponse1, firstResponse1.data) + + // Send duplicate heartbeat request. + val duplicateResponse1 = connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1) + + // Verify duplicate produces same response. + assertEquals(expectedFirstResponse1, duplicateResponse1.data) + } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 274f0946ca6..da84f0b397d 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -694,6 +694,375 @@ public class GroupMetadataManagerTest { ); } + @Test + public void testDuplicateFullHeartbeatInStableState() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in STABLE state with epoch 100. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.STABLE) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId)); + + // Create full request with current epoch. + ConsumerGroupHeartbeatRequestData fullRequest = new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(100) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignor("range") + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2)))); + + // First heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = + context.consumerGroupHeartbeat(fullRequest); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2))))), + result1.response() + ); + + // Duplicate heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = + context.consumerGroupHeartbeat(fullRequest); + + // Verify duplicate produces same response with no records. + assertResponseEquals(result1.response(), result2.response()); + assertEquals(List.of(), result2.records()); + assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId)); + } + + @Test + public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in UNREVOKED_PARTITIONS state with epoch 100. + // Target assignment is [0, 1], but member still owns [0, 1, 2]. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1))) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 101, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 101)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.UNREVOKED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + + // Create full request with current epoch. Member still reports owning all partitions. + ConsumerGroupHeartbeatRequestData fullRequest = new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(100) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignor("range") + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1, 2)))); + + // First heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = + context.consumerGroupHeartbeat(fullRequest); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1))))), + result1.response() + ); + + assertEquals(MemberState.UNREVOKED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + + // Duplicate heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = + context.consumerGroupHeartbeat(fullRequest); + + // Verify duplicate produces same response with no records. + assertResponseEquals(result1.response(), result2.response()); + assertEquals(List.of(), result2.records()); + assertEquals(MemberState.UNREVOKED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + } + + @Test + public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() { + String groupId = "fooup"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100. + // Member 1 has [0] assigned but target is [0, 1, 2]. + // Member 2 still owns [1, 2] and needs to revoke them. + ConsumerGroupMember member1 = new ConsumerGroupMember.Builder(memberId1) + .setState(MemberState.UNRELEASED_PARTITIONS) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0))) + .build(); + + ConsumerGroupMember member2 = new ConsumerGroupMember.Builder(memberId2) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(99) + .setPreviousMemberEpoch(98) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment()) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member1)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member2)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 100, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId1, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId2, mkAssignment())); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 100)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member1)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member2)); + + assertEquals(MemberState.UNRELEASED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId1)); + + // Create full request with current epoch. + ConsumerGroupHeartbeatRequestData fullRequest = new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(100) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignor("range") + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0)))); + + // First heartbeat. Member is UNRELEASED_PARTITIONS so response includes current assignment. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = + context.consumerGroupHeartbeat(fullRequest); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId1) + .setMemberEpoch(100) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0))))), + result1.response() + ); + + assertEquals(MemberState.UNRELEASED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId1)); + + // Duplicate heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = + context.consumerGroupHeartbeat(fullRequest); + + // Verify duplicate produces same response with no records. + assertResponseEquals(result1.response(), result2.response()); + assertEquals(List.of(), result2.records()); + assertEquals(MemberState.UNRELEASED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId1)); + } + + @Test + public void testDuplicateFullHeartbeatWithRevocationAck() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + CoordinatorMetadataImage metadataImage = new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 3) + .addRacks() + .buildCoordinatorMetadataImage(); + + long fooTopicHash = computeTopicHash(fooTopicName, metadataImage); + + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, List.of(assignor)) + .withMetadataImage(metadataImage) + .build(); + + // Member is in UNREVOKED_PARTITIONS state with epoch 100. + // Target assignment is [0, 1], member needs to revoke [2]. + ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId) + .setState(MemberState.UNREVOKED_PARTITIONS) + .setMemberEpoch(100) + .setPreviousMemberEpoch(99) + .setRebalanceTimeoutMs(5000) + .setClientId(DEFAULT_CLIENT_ID) + .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 0, 1))) + .setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2))) + .build(); + + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId, member)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 101, computeGroupHash(Map.of( + fooTopicName, fooTopicHash + )))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId, memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1) + ))); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId, 101)); + context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId, member)); + + assertEquals(MemberState.UNREVOKED_PARTITIONS, context.consumerGroupMemberState(groupId, memberId)); + + // Create full request acknowledging revocation (only owns [0, 1]). + ConsumerGroupHeartbeatRequestData fullRequest = new ConsumerGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(100) + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(List.of("foo")) + .setServerAssignor("range") + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatRequestData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1)))); + + // First heartbeat acknowledges revocation and transitions to STABLE. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result1 = + context.consumerGroupHeartbeat(fullRequest); + + assertResponseEquals( + new ConsumerGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(101) + .setHeartbeatIntervalMs(5000) + .setAssignment(new ConsumerGroupHeartbeatResponseData.Assignment() + .setTopicPartitions(List.of( + new ConsumerGroupHeartbeatResponseData.TopicPartitions() + .setTopicId(fooTopicId) + .setPartitions(List.of(0, 1))))), + result1.response() + ); + + assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId)); + + // Duplicate heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> result2 = + context.consumerGroupHeartbeat(fullRequest); + + // Verify duplicate produces same response with no records. + assertResponseEquals(result1.response(), result2.response()); + assertEquals(List.of(), result2.records()); + assertEquals(MemberState.STABLE, context.consumerGroupMemberState(groupId, memberId)); + } + @Test public void testShareGroupMemberCanRejoinWithEpochZero() { String groupId = "fooup";
