dajac commented on code in PR #13901: URL: https://github.com/apache/kafka/pull/13901#discussion_r1248097012
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -1932,6 +1961,435 @@ public void testPartitionAssignorExceptionOnRegularHeartbeat() { .setTopicPartitions(Collections.emptyList()))); } + @Test + public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() { + String groupId = "fooup"; + // Use a static member id as it makes the test easier. + String memberId = Uuid.randomUuid().toString(); + + Uuid fooTopicId = Uuid.randomUuid(); + String fooTopicName = "foo"; + + // Create a context with one consumer group containing one member. + MockPartitionAssignor assignor = new MockPartitionAssignor("range"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withAssignors(Collections.singletonList(assignor)) + .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 6) + .build()) + .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10) + .withMember(new ConsumerGroupMember.Builder(memberId) + .setMemberEpoch(10) + .setPreviousMemberEpoch(10) + .setTargetMemberEpoch(10) + .setClientId("client") + .setClientHost("localhost/127.0.0.1") + .setRebalanceTimeoutMs(5000) + .setSubscribedTopicNames(Arrays.asList("foo", "bar")) + .setServerAssignorName("range") + .setAssignedPartitions(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .build()) + .withAssignment(memberId, mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2))) + .withAssignmentEpoch(10) + .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() { + { + // foo only has 3 partitions stored in the metadata but foo has + // 6 partitions the metadata image. + put(fooTopicName, new TopicMetadata(fooTopicId, fooTopicName, 3)); + } + })) + .build(); + + // The metadata refresh flag should be true. + ConsumerGroup consumerGroup = context.groupMetadataManager + .getOrMaybeCreateConsumerGroup(groupId, false); + assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds())); + + // Prepare the assignment result. + assignor.prepareGroupAssignment(new GroupAssignment( + Collections.singletonMap(memberId, new MemberAssignment(mkAssignment( + mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5) + ))) + )); + + // Heartbeat. + CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = context.consumerGroupHeartbeat( Review Comment: gotcha. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org