showuon commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1230345113


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -127,6 +127,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
     // hold onto request&future for committed offset requests to enable async 
calls.
     private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;
+    // holds the offset metadata for assigned partitions to reduce remote 
calls thus speeding up fetching partition metadata
+    private final Map<TopicPartition, OffsetAndMetadata> 
committedTopicPartitionOffsetsCache;

Review Comment:
   nit: the comment above should mention this is the `committed offset metadata`



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -922,13 +922,9 @@ public void testCommitsFetchedDuringAssign() {
 
         // fetch offset for two topics
         Map<TopicPartition, Long> offsets = new HashMap<>();
-        offsets.put(tp0, offset1);
-        client.prepareResponseFrom(offsetResponse(offsets, Errors.NONE), 
coordinator);
-        assertEquals(offset1, 
consumer.committed(Collections.singleton(tp0)).get(tp0).offset());
-
-        offsets.remove(tp0);
         offsets.put(tp1, offset2);

Review Comment:
   Could we add a comment above about why we only need to respond with `tp1, 
offset2`? Something about it's been cached in previous committed offset fetch.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, 
offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = 
coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);
+    }
+
+    @Test
+    public void testCommitOffsetMetadataSync() {

Review Comment:
   Thanks for adding the sync test



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, 
offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = 
coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   assertEquals method signature is `assertEquals(int expected, int actual)`. 
Putting the parameter in the correct order will output the reasonable error 
message if any.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, 
offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = 
coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Also, could we assert cache is empty before we `commitOffsetsAsync`? i.e. 
   ```
   assertTrue(cache.isEmpty());
   coordinator.commitOffsetsAsync(...)
   ...
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -351,6 +359,10 @@ private Exception invokePartitionsRevoked(final 
SortedSet<TopicPartition> revoke
             final long startMs = time.milliseconds();
             listener.onPartitionsRevoked(revokedPartitions);
             sensors.revokeCallbackSensor.record(time.milliseconds() - startMs);
+            // remove the offset metadata cache for revoked partitions
+            for (TopicPartition revokedPartition: revokedPartitions) {
+                
this.committedTopicPartitionOffsetsCache.remove(revokedPartition);
+            }

Review Comment:
   The revoke and lost partition cases should also be tested, to verify we 
indeed remove them from cache.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);
+

Review Comment:
   additional line



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -3114,6 +3137,70 @@ public void testFetchCommittedOffsets() {
         assertEquals(new OffsetAndMetadata(offset, leaderEpoch, metadata), 
fetchedOffsets.get(t1p));
     }
 
+    @Test
+    public void testPopulatingOffsetCacheForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+        // committedOffsetsCache should be empty
+        assertEquals(committedOffsetsCache.size(), 0);
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetFetchResponse.PartitionData data = new 
OffsetFetchResponse.PartitionData(offset, leaderEpoch,
+                metadata, Errors.NONE);
+
+        client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t1p, data)));
+        subscriptions.assignFromUser(singleton(t1p));
+        Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(singleton(t1p),
+                time.timer(Long.MAX_VALUE));
+
+        assertNotNull(fetchedOffsets);
+        OffsetAndMetadata expected = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        assertEquals(expected, fetchedOffsets.get(t1p));
+
+        // check committedOffsetsCache is populated
+        assertEquals(committedOffsetsCache.size(), 1);
+        assertEquals(expected, committedOffsetsCache.get(t1p));
+    }
+
+    @Test
+    public void testReturningCachedOffsetForAssignedPartition() {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady(time.timer(Long.MAX_VALUE));
+
+        Map<TopicPartition, OffsetAndMetadata> committedOffsetsCache = 
coordinator.committedOffsetsCache();
+
+        long offset = 500L;
+        String metadata = "blahblah";
+        Optional<Integer> leaderEpoch = Optional.of(15);
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset, 
leaderEpoch, metadata);
+        committedOffsetsCache.put(t1p, offsetAndMetadata);

Review Comment:
   I'm thinking we can merge this test with the above 
`testPopulatingOffsetCacheForAssignedPartition` test, so that we don't have to 
initialize all the data again. Something like this:
   ```
   // check committedOffsetsCache is populated
           assertEquals(committedOffsetsCache.size(), 1);
           assertEquals(expected, committedOffsetsCache.get(t1p));
   
          
          client.prepareResponse(offsetFetchResponse(Errors.NONE, 
singletonMap(t2p, data)));
          // fetch again with t1p + t2p, but will send fetch for t2p since t1p 
is in cache
           Map<TopicPartition, OffsetAndMetadata> fetchedOffsets = 
coordinator.fetchCommittedOffsets(new HashSet<>(Arrays.asList(t1p, t2p)),
                   time.timer(Long.MAX_VALUE));
   
           assertNotNull(fetchedOffsets);
           // return 2 results
           assertEquals(fetchedOffsets.size(), 2);
           // the cache size is still 1
           assertEquals(committedOffsetsCache.size(), 1);
    .....
   
   ```
   WDYT?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java:
##########
@@ -2521,11 +2521,34 @@ public void testCommitOffsetMetadata() {
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new 
OffsetAndMetadata(100L, "hello"));
+        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(100L, 
"hello");
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, 
offsetAndMetadata);
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
         assertEquals(coordinator.inFlightAsyncCommits.get(), 0);
+        Map<TopicPartition, OffsetAndMetadata> cache = 
coordinator.committedOffsetsCache();
+        assertEquals(cache.size(), 1);
+        assertEquals(cache.get(t1p), offsetAndMetadata);

Review Comment:
   Same comments applies to below tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to