vamossagar12 commented on code in PR #14432:
URL: https://github.com/apache/kafka/pull/14432#discussion_r1370542261


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1908,6 +1909,715 @@ public void testLeavingMemberBumpsGroupEpoch() {
         assertRecordsEquals(expectedRecords, result.records());
     }
 
+    @Test
+    public void testGroupEpochBumpWhenNewStaticMemberJoins() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId1)
+                    .setInstanceId(memberId1)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)))
+                    .build())
+                .withMember(new ConsumerGroupMember.Builder(memberId2)
+                    .setInstanceId(memberId2)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(9)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    // Use zar only here to ensure that metadata needs to be 
recomputed.
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar", 
"zar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)))
+                    .build())
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1),
+                        mkTopicAssignment(barTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2, 3),
+                        mkTopicAssignment(barTopicId, 1)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+            ));
+
+        // Member 3 joins the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setInstanceId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(11)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+            result.response()
+        );
+
+        ConsumerGroupMember expectedMember3 = new 
ConsumerGroupMember.Builder(memberId3)
+            .setMemberEpoch(11)
+            .setInstanceId(memberId3)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(11)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        List<Record> expectedRecords = Arrays.asList(
+            RecordHelpers.newMemberSubscriptionRecord(groupId, 
expectedMember3),
+            RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, new 
HashMap<String, TopicMetadata>() {
+                {
+                    put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                    put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                }
+            }),
+            RecordHelpers.newGroupEpochRecord(groupId, 11),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId1, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1),
+                mkTopicAssignment(barTopicId, 0)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId2, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 2, 3),
+                mkTopicAssignment(barTopicId, 1)
+            )),
+            RecordHelpers.newTargetAssignmentRecord(groupId, memberId3, 
mkAssignment(
+                mkTopicAssignment(fooTopicId, 4, 5),
+                mkTopicAssignment(barTopicId, 2)
+            )),
+            RecordHelpers.newTargetAssignmentEpochRecord(groupId, 11),
+            RecordHelpers.newCurrentAssignmentRecord(groupId, expectedMember3)
+        );
+
+        assertRecordsEquals(expectedRecords.subList(0, 3), 
result.records().subList(0, 3));
+        assertUnorderedListEquals(expectedRecords.subList(3, 6), 
result.records().subList(3, 6));
+        assertRecordsEquals(expectedRecords.subList(6, 8), 
result.records().subList(6, 8));
+    }
+
+    @Test
+    public void testStaticMemberGetsBackAssignmentUponRejoin() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String member2RejoinId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        ConsumerGroupMember member1 = new 
ConsumerGroupMember.Builder(memberId1)
+            .setInstanceId(memberId1)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2),
+                mkTopicAssignment(barTopicId, 0, 1)))
+            .build();
+        ConsumerGroupMember member2 = new 
ConsumerGroupMember.Builder(memberId2)
+            .setInstanceId(memberId2)
+            .setMemberEpoch(10)
+            .setPreviousMemberEpoch(9)
+            .setTargetMemberEpoch(10)
+            .setRebalanceTimeoutMs(5000)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setAssignedPartitions(mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+            .build();
+
+        // Consumer group with two static members.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .addRacks()
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(member1)
+                .withMember(member2)
+                .withAssignment(memberId1, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2),
+                    mkTopicAssignment(barTopicId, 0, 1)))
+                .withAssignment(memberId2, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5),
+                    mkTopicAssignment(barTopicId, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 6, mkMapOfPartitionRacks(6)));
+                        put(barTopicName, new TopicMetadata(barTopicId, 
barTopicName, 3, mkMapOfPartitionRacks(3)));
+                    }
+                }))
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2),
+                        mkTopicAssignment(barTopicId, 0, 1)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                    // When the member rejoins, it gets the same assignments.
+                    put(member2RejoinId, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 3, 4, 5),
+                        mkTopicAssignment(barTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 leaves the consumer group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(-2)
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        // member epoch of the response would be set to -2
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(-2),
+            result.response()
+        );
+
+        ConsumerGroupMember member2UpdatedEpoch = new 
ConsumerGroupMember.Builder(member2)
+            .setMemberEpoch(-2)
+            .build();
+        // The departing static member will have it's epoch set to -2.
+        List<Record> expectedRecords = Collections.singletonList(
+            RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch)
+        );
+
+        assertEquals(result.records(), expectedRecords);
+        // Replay the updated record with -2 epoch so that the group can 
update itself.
+        context.replay(RecordHelpers.newCurrentAssignmentRecord(groupId, 
member2UpdatedEpoch));
+
+        // member 2 rejoins the group with the same instance id
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> 
rejoinResult = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setMemberId(member2RejoinId)
+                .setGroupId(groupId)
+                .setInstanceId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(5000)
+                .setServerAssignor("range")
+                .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(member2RejoinId)
+                .setMemberEpoch(10)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()),
+                rejoinResult.response()
+        );
+
+        ConsumerGroupMember expectedRejoinedMember = new 
ConsumerGroupMember.Builder(member2RejoinId)
+            .setMemberEpoch(10)
+            .setInstanceId(memberId2)
+            .setPreviousMemberEpoch(0)
+            .setTargetMemberEpoch(10)
+            .setClientId("client")
+            .setClientHost("localhost/127.0.0.1")
+            .setRebalanceTimeoutMs(5000)
+            .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+            .setServerAssignorName("range")
+            .setPartitionsPendingAssignment(mkAssignment(

Review Comment:
   @dajac , I realised that even in the static member re-joining case, while 
the group epoch doesn't bump, the partitions would be in pending assignment 
state. I believe, eventually the member would get it's assignments. In that 
case, this state seems correct to me. WDYT? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to