machi1990 commented on code in PR #13665:
URL: https://github.com/apache/kafka/pull/13665#discussion_r1198690005
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -1007,16 +1047,33 @@ public Map<TopicPartition, OffsetAndMetadata>
fetchCommittedOffsets(final Set<To
if (pendingCommittedOffsetRequest != null) {
future = pendingCommittedOffsetRequest.response;
} else {
- future = sendOffsetFetchRequest(partitions);
- pendingCommittedOffsetRequest = new
PendingCommittedOffsetRequest(partitions, generationForOffsetRequest, future);
+ future = sendOffsetFetchRequest(nonCachedPartitions);
+ pendingCommittedOffsetRequest = new
PendingCommittedOffsetRequest(nonCachedPartitions, generationForOffsetRequest,
future);
}
client.poll(future, timer);
if (future.isDone()) {
pendingCommittedOffsetRequest = null;
if (future.succeeded()) {
- return future.value();
+ Map<TopicPartition, OffsetAndMetadata> freshOffsets =
future.value();
+
+ // update cache for assigned partitions that are not
cached yet
+ for (TopicPartition nonCachedAssignedPartition:
nonCachedAssignedPartitions) {
+ if
(!this.subscriptions.isAssigned(nonCachedAssignedPartition)) {
+ // it is possible that the topic is no longer
assigned when the response is received,
+ // in this case we do not update the cache with
the fresh value
+ continue;
+ }
+
+ OffsetAndMetadata offset =
freshOffsets.get(nonCachedAssignedPartition);
+ if (offset != null) { // it is possible that the
offset and metadata were not fetched
+
this.partitionOffsetsCache.put(nonCachedAssignedPartition, offset);
Review Comment:
Hi @showuon I forgot to mention or call your attention on this line
https://github.com/apache/kafka/blob/401fb417bf60864e6d380f979d268d895c5ad727/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1422
which already updates the cache. Is that what you've in mind?
My thinking is that the data cached during fetch won't get stale quickly due
to that. Let me know if my thinking process is correct. Thanks
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]