Hangleton commented on code in PR #13493:
URL: https://github.com/apache/kafka/pull/13493#discussion_r1165606409


##########
core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala:
##########
@@ -824,14 +823,14 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     expiredOffsets
   }
 
-  def allOffsets: Map[TopicPartition, OffsetAndMetadata] = offsets.map { case 
(topicPartition, commitRecordMetadataAndOffset) =>
-    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  def allOffsets: Map[TopicIdPartition, OffsetAndMetadata] = offsets.map { 
case (topicPartition, commitRecordMetadataAndOffset) =>
+    (new TopicIdPartition(Uuid.ZERO_UUID, topicPartition), 
commitRecordMetadataAndOffset.offsetAndMetadata)

Review Comment:
   Hi Justine (@jolshan), I pushed a commit with the change to resolve the 
topic id in the group metadata manager when the offsets of all topic-partitions 
known by the coordinator are requested. Apologies for the delay.
   
   I don't really like the code I wrote since it introduces one level of 
indirection on the valid path and one more failure path when a topic id cannot 
be resolved. And the implementation proposed is not very elegant. But it 
follows up on the discussion above. 
   
   When the new schemas for metadata and offset records are used, the topic ids 
will be taken from these records tags. However for all current records, there 
will still be a need for reconciliation. Perhaps this reconciliation should be 
done when the offsets are loaded by the coordinator once the in-memory metadata 
data structure become topic-id-aware.
   
   I will add a few more unit tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to