dajac commented on code in PR #13901:
URL: https://github.com/apache/kafka/pull/13901#discussion_r1248097012


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1932,6 +1961,435 @@ public void 
testPartitionAssignorExceptionOnRegularHeartbeat() {
                     .setTopicPartitions(Collections.emptyList())));
     }
 
+    @Test
+    public void testSubscriptionMetadataRefreshedAfterGroupIsLoaded() {
+        String groupId = "fooup";
+        // Use a static member id as it makes the test easier.
+        String memberId = Uuid.randomUuid().toString();
+
+        Uuid fooTopicId = Uuid.randomUuid();
+        String fooTopicName = "foo";
+
+        // Create a context with one consumer group containing one member.
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroupMetadataRefreshIntervalMs(5 * 60 * 1000)
+            .withMetadataImage(new MetadataImageBuilder()
+                .addTopic(fooTopicId, fooTopicName, 6)
+                .build())
+            .withConsumerGroup(new ConsumerGroupBuilder(groupId, 10)
+                .withMember(new ConsumerGroupMember.Builder(memberId)
+                    .setMemberEpoch(10)
+                    .setPreviousMemberEpoch(10)
+                    .setTargetMemberEpoch(10)
+                    .setClientId("client")
+                    .setClientHost("localhost/127.0.0.1")
+                    .setRebalanceTimeoutMs(5000)
+                    .setSubscribedTopicNames(Arrays.asList("foo", "bar"))
+                    .setServerAssignorName("range")
+                    .setAssignedPartitions(mkAssignment(
+                        mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                    .build())
+                .withAssignment(memberId, mkAssignment(
+                    mkTopicAssignment(fooTopicId, 0, 1, 2)))
+                .withAssignmentEpoch(10)
+                .withSubscriptionMetadata(new HashMap<String, TopicMetadata>() 
{
+                    {
+                        // foo only has 3 partitions stored in the metadata 
but foo has
+                        // 6 partitions the metadata image.
+                        put(fooTopicName, new TopicMetadata(fooTopicId, 
fooTopicName, 3));
+                    }
+                }))
+            .build();
+
+        // The metadata refresh flag should be true.
+        ConsumerGroup consumerGroup = context.groupMetadataManager
+            .getOrMaybeCreateConsumerGroup(groupId, false);
+        
assertTrue(consumerGroup.hasMetadataExpired(context.time.milliseconds()));
+
+        // Prepare the assignment result.
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.singletonMap(memberId, new 
MemberAssignment(mkAssignment(
+                mkTopicAssignment(fooTopicId, 0, 1, 2, 3, 4, 5)
+            )))
+        ));
+
+        // Heartbeat.
+        CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> result = 
context.consumerGroupHeartbeat(

Review Comment:
   gotcha.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to