showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230868853


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2524,36 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, 
offsetAndMetadata);
+        Map<TopicPartition, OffsetAndMetadata> cache = 
coordinator.committedOffsetsCache();
+        assertTrue(cache.isEmpty());
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);

Review Comment:
   I know this is not your change, but please also update it. Thanks.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);

Review Comment:
   ` assertEquals( 1, committedOffsetsCache.size());`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3142,54 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertTrue(committedOffsetsCache.isEmpty());
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(offsetAndMetadata, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(offsetAndMetadata, committedOffsetsCache.get(t1p));
+
+        // fetch again with t1p + t2p, but will send fetch for t2p since t1p 
is in cache
+        long offsetPartition2 = 50L;
+        String metadataPartition2 = "foobar";
+        Optional<Integer> leaderEpochPartition2 = Optional.of(19909);
+        data = new OffsetFetchResponse.PartitionData(offsetPartition2, 
leaderEpochPartition2,
+                metadataPartition2, Errors.NONE);
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t2p, data)));
+
+        fetchedOffsets = coordinator.fetchCommittedOffsets(new 
HashSet<>(Arrays.asList(t1p, t2p)), time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+
+        assertEquals(fetchedOffsets.size(), 2); // tp1 and tp2 should be 
returned with tp1 coming from cache
+        assertEquals(committedOffsetsCache.size(), 1); // cache size is still 
1 since only tp1 is an owned partition

Review Comment:
   parameter order in assertEquals method.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3818,13 +3899,16 @@ private void gracefulCloseTest(ConsumerCoordinator 
coordinator, boolean shouldLe
             return commitRequest.data().groupId().equals(groupId);
         }, new OffsetCommitResponse(new OffsetCommitResponseData()));
 
+        // add t1p to the committed offset metadata cache, we'll then check 
that the cache is invalidated after revocation which happens during close
+        coordinator.committedOffsetsCache().put(t1p, new 
OffsetAndMetadata(1L));

Review Comment:
   I know you've added a test to verify revoke case, but it's an instance close 
case. Could we add one more revoke test in `testRejoinGroup` for a usual revoke 
case? Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to