dajac commented on code in PR #13963:
URL: https://github.com/apache/kafka/pull/13963#discussion_r1260159325


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2402,6 +2448,584 @@ public void testOnNewMetadataImage() {
         assertEquals(image, context.groupMetadataManager.image());
     }
 
+    @Test
+    public void testSessionTimeoutLifecycle() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Session timer is rescheduled on second heartbeat.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(result.response().memberEpoch()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time.
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Session timer is cancelled on leave.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId)
+                .setMemberEpoch(-1));
+        assertEquals(-1, result.response().memberEpoch());
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+        context.assertNoRevocationTimeout(groupId, memberId);
+    }
+
+    @Test
+    public void testSessionTimeoutExpiration() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time past the session timeout.
+        List<MockCoordinatorTimer.ExpiredTimeout<Record>> timeouts = 
context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            Collections.singletonList(new 
MockCoordinatorTimer.ExpiredTimeout<Record>(
+                consumerGroupSessionTimeoutKey(groupId, memberId),
+                Arrays.asList(
+                    RecordHelpers.newCurrentAssignmentTombstoneRecord(groupId, 
memberId),
+                    RecordHelpers.newTargetAssignmentTombstoneRecord(groupId, 
memberId),
+                    
RecordHelpers.newMemberSubscriptionTombstoneRecord(groupId, memberId),
+                    RecordHelpers.newGroupSubscriptionMetadataRecord(groupId, 
Collections.emptyMap()),
+                    RecordHelpers.newGroupEpochRecord(groupId, 2)
+                )
+            )),
+            timeouts
+        );
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+        context.assertNoRevocationTimeout(groupId, memberId);
+    }
+
+    @Test
+    public void testRevocationTimeoutLifecycle() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+        String memberId3 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 3)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 1 joins the group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId1)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(90000)
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2))))),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Prepare next assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 joins the group.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(2))))),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Member 1 heartbeats and transitions to revoking. The revocation 
timeout
+        // is scheduled.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo")));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1))))),
+            result.response()
+        );
+
+        // Verify that there is a revocation timeout.
+        context.assertRevocationTimeout(groupId, memberId1, 90000);
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Prepare next assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2)
+                    )));
+                    put(memberId3, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 1)
+                    )));
+                }
+            }
+        ));
+
+        // Member 3 joins the group.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId3)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId3)
+                .setMemberEpoch(3)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(1))))),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Member 1 heartbeats and re-transitions to revoking. The revocation 
timeout
+        // is re-scheduled.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo")));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0))))),
+            result.response()
+        );
+
+        // Verify that there is a revocation timeout. Keep a reference
+        // to the timeout for later.
+        MockCoordinatorTimer.ScheduledTimeout<Record> scheduledTimeout =
+            context.assertRevocationTimeout(groupId, memberId1, 90000);
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Member 1 acks the revocation. The revocation timeout is cancelled.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(90000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.singletonList(new 
ConsumerGroupHeartbeatRequestData.TopicPartitions()
+                    .setTopicId(fooTopicId)
+                    .setPartitions(Collections.singletonList(0)))));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(3)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0))))),
+            result.response()
+        );
+
+        // Verify that there is not revocation timeout.
+        context.assertNoRevocationTimeout(groupId, memberId1);
+
+        // Execute the scheduled revocation timeout captured earlier to 
simulate a
+        // stale timeout. This should be a no-op.
+        assertEquals(Collections.emptyList(), 
scheduledTimeout.operation.generateRecords());
+    }
+
+    @Test
+    public void testRevocationTimeoutExpiration() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId1 = Uuid.randomUuid().toString();
+        String memberId2 = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 3)
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 1 joins the group.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result =
+            context.consumerGroupHeartbeat(
+                new ConsumerGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId1)
+                    .setMemberEpoch(0)
+                    .setRebalanceTimeoutMs(10000) // Use timeout smaller than 
session timeout.
+                    .setSubscribedTopicNames(Collections.singletonList("foo"))
+                    .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setAssignedTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(0, 1, 2))))),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Prepare next assignment.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            new HashMap<String, MemberAssignment>() {
+                {
+                    put(memberId1, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1)
+                    )));
+                    put(memberId2, new MemberAssignment(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 2)
+                    )));
+                }
+            }
+        ));
+
+        // Member 2 joins the group.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId2)
+                .setMemberEpoch(0)
+                .setRebalanceTimeoutMs(10000)
+                .setSubscribedTopicNames(Collections.singletonList("foo"))
+                .setTopicPartitions(Collections.emptyList()));
+
+        assertResponseEquals(
+            new ConsumerGroupHeartbeatResponseData()
+                .setMemberId(memberId2)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setAssignment(new 
ConsumerGroupHeartbeatResponseData.Assignment()
+                    .setPendingTopicPartitions(Arrays.asList(
+                        new 
ConsumerGroupHeartbeatResponseData.TopicPartitions()
+                            .setTopicId(fooTopicId)
+                            .setPartitions(Arrays.asList(2))))),
+            result.response()
+        );
+
+        assertEquals(
+            Collections.emptyList(),
+            context.sleep(result.response().heartbeatIntervalMs())
+        );
+
+        // Member 1 heartbeats and transitions to revoking. The revocation 
timeout
+        // is scheduled.
+        result = context.consumerGroupHeartbeat(
+            new ConsumerGroupHeartbeatRequestData()
+                .setGroupId(groupId)
+                .setMemberId(memberId1)
+                .setMemberEpoch(1)
+                .setRebalanceTimeoutMs(90000)

Review Comment:
   Updates `testRevocationTimeoutLifecycle` to use different timeout values. 
This ensures that we always use the last one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to