jolshan commented on code in PR #14067:
URL: https://github.com/apache/kafka/pull/14067#discussion_r1275350025


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java:
##########
@@ -537,6 +554,55 @@ public void 
testGenericGroupOffsetCommitWithRetentionTime() {
         );
     }
 
+    @Test
+    public void testGenericGroupOffsetCommitMaintainsSession() {
+        OffsetMetadataManagerTestContext context = new 
OffsetMetadataManagerTestContext.Builder().build();
+
+        // Create a group.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "foo",
+            true
+        );
+
+        // Add member.
+        GenericGroupMember member = mkGenericMember("member", 
Optional.empty());
+        group.add(member);
+
+        // Transition to next generation.
+        group.transitionTo(GenericGroupState.PREPARING_REBALANCE);
+        group.initNextGeneration();
+        assertEquals(1, group.generationId());
+        group.transitionTo(GenericGroupState.STABLE);
+
+        // Schedule session timeout. This would be normally done when
+        // the group transitions to stable.
+        
context.groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(group, 
member);
+
+        // Advance time by half of the session timeout.
+        assertEquals(Collections.emptyList(), context.sleep(5000 / 2));
+
+        // Commit.
+        context.commitOffset(
+            new OffsetCommitRequestData()
+                .setGroupId("foo")
+                .setMemberId("member")
+                .setGenerationIdOrMemberEpoch(1)
+                .setRetentionTimeMs(1234L)
+                .setTopics(Collections.singletonList(
+                    new OffsetCommitRequestData.OffsetCommitRequestTopic()
+                        .setName("bar")
+                        .setPartitions(Collections.singletonList(
+                            new 
OffsetCommitRequestData.OffsetCommitRequestPartition()
+                                .setPartitionIndex(0)
+                                .setCommittedOffset(100L)
+                        ))
+                ))
+        );
+
+        // Advance time by half of the session timeout.
+        assertEquals(Collections.emptyList(), context.sleep(5000 / 2));

Review Comment:
   nit: maybe make clear that no timeouts means the group is not expired? And 
then maybe have one more when the group does expire? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to