This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e1e68db8da KAFKA-19233: Add tests for duplicate heartbeat request 
handling (#21319)
0e1e68db8da is described below

commit 0e1e68db8da571fe3c08bdfc29c24dd8cc477997
Author: David Jacot <[email protected]>
AuthorDate: Sat Jan 17 09:46:49 2026 +0100

    KAFKA-19233: Add tests for duplicate heartbeat request handling (#21319)
    
    This patch adds tests to validate that duplicate full heartbeat requests
    are  handled idempotently in all member states (STABLE,
    UNREVOKED_PARTITIONS,  UNRELEASED_PARTITIONS).
    
    Reviewers: Dongnuo Lyu <[email protected]>, Lianet Magrans
     <[email protected]>
---
 .../server/ConsumerGroupHeartbeatRequestTest.scala | 334 +++++++++++++++++++
 .../group/GroupMetadataManagerTest.java            | 369 +++++++++++++++++++++
 2 files changed, 703 insertions(+)

diff --git 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
index 0d54ca6e25c..6707ee94b7f 100644
--- 
a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala
@@ -764,4 +764,338 @@ class ConsumerGroupHeartbeatRequestTest(cluster: 
ClusterInstance) extends GroupC
 
     assertEquals(expectedRejoinResponse, rejoinResponse.data)
   }
+
+  @ClusterTest
+  def testDuplicateFullHeartbeatInStableState(): Unit = {
+    createOffsetsTopic()
+
+    val memberId = Uuid.randomUuid().toString
+    val groupId = "test-duplicate-stable-grp"
+
+    // Create topic first so member gets assignment immediately.
+    val topicId = createTopic(topic = "foo", numPartitions = 3)
+
+    // Join the group.
+    val request = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response: ConsumerGroupHeartbeatResponse = null
+    val expectedAssignment = new 
ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      response = connectAndReceive[ConsumerGroupHeartbeatResponse](request)
+      response.data.errorCode == Errors.NONE.code &&
+        response.data.assignment == expectedAssignment
+    }, msg = s"Could not get assignment. Last response $response.")
+
+    val stableEpoch = response.data.memberEpoch
+
+    // Send full heartbeat request.
+    val fullRequest = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId)
+        .setMemberEpoch(stableEpoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0, 1, 2).asJava)).asJava)
+    ).build()
+
+    val firstResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest)
+
+    val expectedFirstResponse = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId)
+      .setMemberEpoch(stableEpoch)
+      .setHeartbeatIntervalMs(firstResponse.data.heartbeatIntervalMs)
+      .setAssignment(expectedAssignment)
+
+    assertEquals(expectedFirstResponse, firstResponse.data)
+
+    // Send duplicate heartbeat request.
+    val duplicateResponse = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest)
+
+    // Verify duplicate produces same response.
+    assertEquals(expectedFirstResponse, duplicateResponse.data)
+  }
+
+  @ClusterTest
+  def testDuplicateFullHeartbeatWhileWaitingForPartitions(): Unit = {
+    createOffsetsTopic()
+
+    val memberId1 = Uuid.randomUuid().toString
+    val memberId2 = Uuid.randomUuid().toString
+    val groupId = "test-duplicate-waiting-grp"
+
+    // Create topic.
+    val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+    // Member 1 joins and gets all partitions.
+    val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response1: ConsumerGroupHeartbeatResponse = null
+    val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+      response1.data.errorCode == Errors.NONE.code &&
+        response1.data.assignment == allPartitions
+    }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+    // Member 2 joins, triggering rebalance. Member 2 will wait for Member 1 
to release partitions.
+    val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response2: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+      response2.data.errorCode == Errors.NONE.code
+    }, msg = s"Member 2 could not join. Last response $response2.")
+
+    val member2Epoch = response2.data.memberEpoch
+
+    // Member 2 sends full heartbeat while waiting for partitions from Member 
1.
+    val fullRequest2 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(member2Epoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    val firstResponse2 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2)
+
+    val expectedFirstResponse2 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId2)
+      .setMemberEpoch(member2Epoch)
+      .setHeartbeatIntervalMs(firstResponse2.data.heartbeatIntervalMs)
+      .setAssignment(firstResponse2.data.assignment)
+
+    assertEquals(expectedFirstResponse2, firstResponse2.data)
+
+    // Send duplicate heartbeat request.
+    val duplicateResponse2 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest2)
+
+    // Verify duplicate produces same response.
+    assertEquals(expectedFirstResponse2, duplicateResponse2.data)
+  }
+
+  @ClusterTest
+  def testDuplicateFullHeartbeatDuringRevocation(): Unit = {
+    createOffsetsTopic()
+
+    val memberId1 = Uuid.randomUuid().toString
+    val memberId2 = Uuid.randomUuid().toString
+    val groupId = "test-duplicate-revocation-grp"
+
+    // Create topic.
+    val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+    // Member 1 joins and gets all partitions.
+    val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response1: ConsumerGroupHeartbeatResponse = null
+    val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+      response1.data.errorCode == Errors.NONE.code &&
+        response1.data.assignment == allPartitions
+    }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+    val member1Epoch = response1.data.memberEpoch
+
+    // Member 2 joins, triggering rebalance.
+    val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response2: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+      response2.data.errorCode == Errors.NONE.code
+    }, msg = s"Member 2 could not join. Last response $response2.")
+
+    // Member 1 sends full heartbeat (still reporting all partitions).
+    val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(member1Epoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0, 1).asJava)).asJava)
+    ).build()
+
+    val firstResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+    val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId1)
+      .setMemberEpoch(firstResponse1.data.memberEpoch)
+      .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs)
+      .setAssignment(firstResponse1.data.assignment)
+
+    assertEquals(expectedFirstResponse1, firstResponse1.data)
+
+    // Send duplicate heartbeat request.
+    val duplicateResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+    // Verify duplicate produces same response.
+    assertEquals(expectedFirstResponse1, duplicateResponse1.data)
+  }
+
+  @ClusterTest
+  def testDuplicateFullHeartbeatWithRevocationAck(): Unit = {
+    createOffsetsTopic()
+
+    val memberId1 = Uuid.randomUuid().toString
+    val memberId2 = Uuid.randomUuid().toString
+    val groupId = "test-duplicate-revocation-ack-grp"
+
+    // Create topic.
+    val topicId = createTopic(topic = "foo", numPartitions = 2)
+
+    // Member 1 joins and gets all partitions.
+    val request1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response1: ConsumerGroupHeartbeatResponse = null
+    val allPartitions = new ConsumerGroupHeartbeatResponseData.Assignment()
+      .setTopicPartitions(List(new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+        .setTopicId(topicId)
+        .setPartitions(List[Integer](0, 1).asJava)).asJava)
+
+    TestUtils.waitUntilTrue(() => {
+      response1 = connectAndReceive[ConsumerGroupHeartbeatResponse](request1)
+      response1.data.errorCode == Errors.NONE.code &&
+        response1.data.assignment == allPartitions
+    }, msg = s"Member 1 could not get assignment. Last response $response1.")
+
+    val member1InitialEpoch = response1.data.memberEpoch
+
+    // Member 2 joins, triggering rebalance.
+    val request2 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId2)
+        .setMemberEpoch(0)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List.empty.asJava)
+    ).build()
+
+    var response2: ConsumerGroupHeartbeatResponse = null
+    TestUtils.waitUntilTrue(() => {
+      response2 = connectAndReceive[ConsumerGroupHeartbeatResponse](request2)
+      response2.data.errorCode == Errors.NONE.code
+    }, msg = s"Member 2 could not join. Last response $response2.")
+
+    // Member 1 sends heartbeat acknowledging revocation (only reporting 
partition 0).
+    val ackRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(member1InitialEpoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0).asJava)).asJava)
+    ).build()
+
+    val ackResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](ackRequest1)
+    assertEquals(Errors.NONE.code, ackResponse1.data.errorCode)
+
+    val member1NewEpoch = ackResponse1.data.memberEpoch
+
+    // Member 1 sends full heartbeat with new epoch.
+    val fullRequest1 = new ConsumerGroupHeartbeatRequest.Builder(
+      new ConsumerGroupHeartbeatRequestData()
+        .setGroupId(groupId)
+        .setMemberId(memberId1)
+        .setMemberEpoch(member1NewEpoch)
+        .setRebalanceTimeoutMs(5 * 60 * 1000)
+        .setSubscribedTopicNames(List("foo").asJava)
+        .setTopicPartitions(List(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+          .setTopicId(topicId)
+          .setPartitions(List[Integer](0).asJava)).asJava)
+    ).build()
+
+    val firstResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+    val expectedFirstResponse1 = new ConsumerGroupHeartbeatResponseData()
+      .setErrorCode(Errors.NONE.code)
+      .setMemberId(memberId1)
+      .setMemberEpoch(firstResponse1.data.memberEpoch)
+      .setHeartbeatIntervalMs(firstResponse1.data.heartbeatIntervalMs)
+      .setAssignment(firstResponse1.data.assignment)
+
+    assertEquals(expectedFirstResponse1, firstResponse1.data)
+
+    // Send duplicate heartbeat request.
+    val duplicateResponse1 = 
connectAndReceive[ConsumerGroupHeartbeatResponse](fullRequest1)
+
+    // Verify duplicate produces same response.
+    assertEquals(expectedFirstResponse1, duplicateResponse1.data)
+  }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index b6a2b6e5607..6b25e42475f 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -694,6 +694,375 @@ public class GroupMetadataManagerTest {
         );
     }
 
+    @Test
+    public void testDuplicateFullHeartbeatInStableState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in STABLE state with epoch 100.
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.STABLE)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request with current epoch.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1, 2))));
+
+        // First heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1, 2))))),
+            result1.response()
+        );
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatInUnrevokedPartitionsState() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+        // Target assignment is [0, 1], but member still owns [0, 1, 2].
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 101, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request with current epoch. Member still reports owning 
all partitions.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1, 2))));
+
+        // First heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatInUnreleasedPartitionsState() {
+        String groupId = "fooup";
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member 1 is in UNRELEASED_PARTITIONS state with epoch 100.
+        // Member 1 has [0] assigned but target is [0, 1, 2].
+        // Member 2 still owns [1, 2] and needs to revoke them.
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setState(MemberState.UNRELEASED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0)))
+            .build();
+
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(99)
+            .setPreviousMemberEpoch(98)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment())
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 1, 
2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member1));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member2));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 100, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId1, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1, 2)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId2, mkAssignment()));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 100));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member1));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member2));
+
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+
+        // Create full request with current epoch.
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId1)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0))));
+
+        // First heartbeat. Member is UNRELEASED_PARTITIONS so response 
includes current assignment.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(100)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.UNRELEASED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId1));
+    }
+
+    @Test
+    public void testDuplicateFullHeartbeatWithRevocationAck() {
+        String groupId = "fooup";
+        String memberId = Uuid.randomUuid().toString();
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        CoordinatorMetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(fooTopicId, fooTopicName, 3)
+            .addRacks()
+            .buildCoordinatorMetadataImage();
+
+        long fooTopicHash = computeTopicHash(fooTopicName, metadataImage);
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            
.withConfig(GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, 
List.of(assignor))
+            .withMetadataImage(metadataImage)
+            .build();
+
+        // Member is in UNREVOKED_PARTITIONS state with epoch 100.
+        // Target assignment is [0, 1], member needs to revoke [2].
+        ConsumerGroupMember member = new ConsumerGroupMember.Builder(memberId)
+            .setState(MemberState.UNREVOKED_PARTITIONS)
+            .setMemberEpoch(100)
+            .setPreviousMemberEpoch(99)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId(DEFAULT_CLIENT_ID)
+            .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(mkTopicAssignment(fooTopicId, 
0, 1)))
+            
.setPartitionsPendingRevocation(mkAssignment(mkTopicAssignment(fooTopicId, 2)))
+            .build();
+
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionRecord(groupId,
 member));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId,
 101, computeGroupHash(Map.of(
+            fooTopicName, fooTopicHash
+        ))));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentRecord(groupId,
 memberId, mkAssignment(
+            mkTopicAssignment(fooTopicId, 0, 1)
+        )));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentEpochRecord(groupId,
 101));
+        
context.replay(GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentRecord(groupId,
 member));
+
+        assertEquals(MemberState.UNREVOKED_PARTITIONS, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Create full request acknowledging revocation (only owns [0, 1]).
+        ConsumerGroupHeartbeatRequestData fullRequest = new 
ConsumerGroupHeartbeatRequestData()
+            .setGroupId(groupId)
+            .setMemberId(memberId)
+            .setMemberEpoch(100)
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(List.of("foo"))
+            .setServerAssignor("range")
+            .setTopicPartitions(List.of(
+                new ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(List.of(0, 1))));
+
+        // First heartbeat acknowledges revocation and transitions to STABLE.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result1 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(101)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setTopicPartitions(List.of(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(List.of(0, 1))))),
+            result1.response()
+        );
+
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+
+        // Duplicate heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, 
CoordinatorRecord> result2 =
+            context.consumerGroupHeartbeat(fullRequest);
+
+        // Verify duplicate produces same response with no records.
+        assertResponseEquals(result1.response(), result2.response());
+        assertEquals(List.of(), result2.records());
+        assertEquals(MemberState.STABLE, 
context.consumerGroupMemberState(groupId, memberId));
+    }
+
     @Test
     public void testShareGroupMemberCanRejoinWithEpochZero() {
         String groupId = "fooup";


Reply via email to