This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ef2941154e2 KAFKA-18931: added a share group session timeout task when 
group coordinator is loaded (#19173)
ef2941154e2 is described below

commit ef2941154e2904c48295855d529cff34457e4fe1
Author: Chirag Wadhwa <[email protected]>
AuthorDate: Mon Mar 10 21:13:30 2025 +0530

    KAFKA-18931: added a share group session timeout task when group 
coordinator is loaded (#19173)
    
    This PR adds `scheduleShareGroupSessionTimeout` for all the persisted
    members of a share group when the group coordinator is loaded.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../coordinator/group/GroupMetadataManager.java    |   7 +-
 .../group/GroupMetadataManagerTest.java            | 227 +++++++++++++++++++++
 2 files changed, 233 insertions(+), 1 deletion(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 4985c013760..fc954754de1 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -4885,7 +4885,12 @@ public class GroupMetadataManager {
                     break;
 
                 case SHARE:
-                    // Nothing for now for the ShareGroup, as no members are 
persisted.
+                    ShareGroup shareGroup = (ShareGroup) group;
+                    log.info("Loaded share group {} with {} members.", 
groupId, shareGroup.members().size());
+                    shareGroup.members().forEach((memberId, member) -> {
+                        log.debug("Loaded member {} in share group {}.", 
memberId, groupId);
+                        scheduleShareGroupSessionTimeout(groupId, memberId);
+                    });
                     break;
 
                 default:
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index f83bd1c267b..6df8e237933 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -3204,6 +3204,188 @@ public class GroupMetadataManagerTest {
         context.assertNoRebalanceTimeout(groupId, memberId);
     }
 
+    @Test
+    public void testOnLoadedSessionTimeoutExpiration() {
+        String groupId = "group";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String memberId = "foo-1";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder("foo-1")
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(9)
+                    .setPreviousMemberEpoch(9)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Let's assume that all the records have been replayed and now
+        // onLoaded is called to signal it.
+        context.groupMetadataManager.onLoaded();
+
+        // All members should have a session timeout in place.
+        assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId, 
memberId)));
+
+        // Advance time past the session timeout.
+        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            List.of(
+                new ExpiredTimeout<Void, CoordinatorRecord>(
+                    groupSessionTimeoutKey(groupId, memberId),
+                    new CoordinatorResult<>(
+                        List.of(
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupSubscriptionMetadataRecord(groupId,
 Map.of()),
+                            
GroupCoordinatorRecordHelpers.newConsumerGroupEpochRecord(groupId, 11)
+                        )
+                    )
+                )
+            ),
+            timeouts
+        );
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+    }
+
+    @Test
+    public void testSessionTimeoutExpirationForShareMember() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addRacks()
+                .build())
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Map.of(memberId, new MemberAssignmentImpl(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Session timer is scheduled on first heartbeat.
+        CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> 
result =
+            context.shareGroupHeartbeat(
+                new ShareGroupHeartbeatRequestData()
+                    .setGroupId(groupId)
+                    .setMemberId(memberId)
+                    .setMemberEpoch(0)
+                    .setSubscribedTopicNames(List.of("foo")));
+        assertEquals(1, result.response().memberEpoch());
+
+        // Verify that there is a session time.
+        context.assertSessionTimeout(groupId, memberId, 45000);
+
+        // Advance time past the session timeout.
+        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            List.of(new ExpiredTimeout<Void, CoordinatorRecord>(
+                groupSessionTimeoutKey(groupId, memberId),
+                new CoordinatorResult<>(
+                    List.of(
+                        
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
+                        
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
+                        
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
+                        
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, 
Map.of()),
+                        
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 2)
+                    )
+                )
+            )),
+            timeouts
+        );
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+    }
+
+    @Test
+    public void testOnLoadedSessionTimeoutExpirationForShareMember() {
+        String groupId = "group";
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        String memberId = "foo-1";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withShareGroup(new ShareGroupBuilder(groupId, 10)
+                .withMember(new ShareGroupMember.Builder(memberId)
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(9)
+                    .setPreviousMemberEpoch(9)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Let's assume that all the records have been replayed and now
+        // onLoaded is called to signal it.
+        context.groupMetadataManager.onLoaded();
+
+        // All members should have a session timeout in place.
+        assertNotNull(context.timer.timeout(groupSessionTimeoutKey(groupId, 
memberId)));
+
+        // Advance time past the session timeout.
+        List<ExpiredTimeout<Void, CoordinatorRecord>> timeouts = 
context.sleep(45000 + 1);
+
+        // Verify the expired timeout.
+        assertEquals(
+            List.of(
+                new ExpiredTimeout<Void, CoordinatorRecord>(
+                    groupSessionTimeoutKey(groupId, memberId),
+                    new CoordinatorResult<>(
+                        List.of(
+                            
GroupCoordinatorRecordHelpers.newShareGroupCurrentAssignmentTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord(groupId,
 memberId),
+                            
GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord(groupId, 
Map.of()),
+                            
GroupCoordinatorRecordHelpers.newShareGroupEpochRecord(groupId, 11)
+                        )
+                    )
+                )
+            ),
+            timeouts
+        );
+
+        // Verify that there are no timers.
+        context.assertNoSessionTimeout(groupId, memberId);
+    }
+
     @Test
     public void testSessionTimeoutExpirationStaticMember() {
         String groupId = "fooup";
@@ -3679,6 +3861,51 @@ public class GroupMetadataManagerTest {
         assertNotNull(context.timer.timeout(groupRebalanceTimeoutKey("foo", 
"foo-1")));
     }
 
+    @Test
+    public void testOnLoadedWithShareGroup() {
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+        Uuid barTopicId = Uuid.randomUuid();
+        String barTopicName = "bar";
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .addTopic(barTopicId, barTopicName, 3)
+                .build())
+            .withShareGroup(new ShareGroupBuilder("foo", 10)
+                .withMember(new ShareGroupMember.Builder("foo-1")
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(9)
+                    .setPreviousMemberEpoch(9)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withMember(new ShareGroupMember.Builder("foo-2")
+                    .setState(MemberState.STABLE)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setClientId(DEFAULT_CLIENT_ID)
+                    .setClientHost(DEFAULT_CLIENT_ADDRESS.toString())
+                    .setSubscribedTopicNames(List.of("foo"))
+                    .build())
+                .withAssignment("foo-1", mkAssignment(
+                    mkTopicAssignment(fooTopicId, 3, 4, 5)))
+                .withAssignmentEpoch(10))
+            .build();
+
+        // Let's assume that all the records have been replayed and now
+        // onLoaded is called to signal it.
+        context.groupMetadataManager.onLoaded();
+
+        // All members should have a session timeout in place.
+        assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", 
"foo-1")));
+        assertNotNull(context.timer.timeout(groupSessionTimeoutKey("foo", 
"foo-2")));
+    }
+
     @Test
     public void testUpdateGroupSizeCounter() {
         List<String> groupIds = new ArrayList<>();

Reply via email to