dajac commented on code in PR #13378: URL: https://github.com/apache/kafka/pull/13378#discussion_r1133929949
########## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ########## @@ -824,9 +827,18 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState def offset(topicPartition: TopicPartition): Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata) + Review Comment: nit: This can be reverted. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala: ########## @@ -298,13 +299,14 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends CommitOffsetsOperation { override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = { val tp = new TopicPartition("topic", 0) - val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds())) + val topicId = Uuid.randomUuid() Review Comment: nit: Let's combine these two lines in `new TopicIdPartition...`. Then we can reuse it at L303 as well. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala: ########## @@ -1249,18 +1249,19 @@ class GroupMetadataManagerTest { def testCommitOffset(): Unit = { val memberId = "" val topicPartition = new TopicPartition("foo", 0) + val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), topicPartition) Review Comment: nit: Let's combine them. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala: ########## @@ -636,6 +636,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT) def testCommitOffsets(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) + Review Comment: nit: This can be reverted. ########## core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala: ########## @@ -388,10 +388,10 @@ private[group] class GroupCoordinatorAdapter( case retentionTimeMs => Some(currentTimeMs + retentionTimeMs) } - val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + val partitions = new mutable.HashMap[TopicIdPartition, OffsetAndMetadata]() request.topics.forEach { topic => topic.partitions.forEach { partition => - val tp = new TopicPartition(topic.name, partition.partitionIndex) + val tp = new TopicIdPartition(Uuid.ZERO_UUID, new TopicPartition(topic.name, partition.partitionIndex)) Review Comment: nit: There is another constructor which takes the uuid, partition id and topic name. Could we use that one? ########## core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala: ########## @@ -649,26 +649,27 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState } } - def failPendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata): Unit = { + def failPendingOffsetWrite(topicIdPartition: TopicIdPartition, offset: OffsetAndMetadata): Unit = { + val topicPartition = topicIdPartition.topicPartition pendingOffsetCommits.get(topicPartition) match { case Some(pendingOffset) if offset == pendingOffset => pendingOffsetCommits.remove(topicPartition) case _ => } } - def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]): Unit = { + def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): Unit = { receivedConsumerOffsetCommits = true - pendingOffsetCommits ++= offsets + pendingOffsetCommits ++= offsets.map { case(k, v) => k.topicPartition -> v } Review Comment: nit: I wonder what is best here: using ++= (addAll) or iterating over offsets and use +=. The latter does not create an intermediate collection. What do you think? nit2: `case (k, v)`. We put a space after `case`. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -2559,10 +2559,11 @@ class GroupCoordinatorTest { // The simple offset commit should now fail val tp = new TopicPartition("topic", 0) + val tip = new TopicIdPartition(Uuid.randomUuid(), tp) Review Comment: nit: Let's combine them. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -2768,7 +2776,10 @@ class GroupCoordinatorTest { // Both group have pending offset commits. // Marker for only one partition is received. That commit should be materialized while the other should not. - val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0)) + val partitions = List( + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic1", 0)), + new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic2", 0)) Review Comment: nit: There is an extra space after `,`. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala: ########## @@ -386,175 +386,269 @@ class GroupMetadataTest { @Test def testOffsetCommit(): Unit = { - val partition = new TopicPartition("foo", 0) + val partition = new TopicIdPartition(Uuid.randomUuid, new TopicPartition("foo", 0)) val offset = offsetAndMetadata(37) val commitRecordOffset = 3 group.prepareOffsetCommit(Map(partition -> offset)) - assertTrue(group.hasOffsets) - assertEquals(None, group.offset(partition)) + assertOffsets( Review Comment: What's the reason for changing all those assertions here? It is usually better to avoid doing double refactoring like this in the same PR because there is a risk of introducing issues in the tests as well. I would rather prefer to keep assertions as they were before so I know that they are good to cover the refactoring that we are doing to introduce the topic id. Note that we could do this refactoring in a separate PR if you believe that it is necessary. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -2571,15 +2572,16 @@ class GroupCoordinatorTest { @Test def testFetchOffsets(): Unit = { + val tip = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("topic", 0)) val tp = new TopicPartition("topic", 0) Review Comment: nit: Ditto. There are other cases in this file. ########## core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala: ########## @@ -160,8 +160,8 @@ class GroupCoordinatorTest { assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse) // OffsetCommit - val topicPartition = new TopicPartition("foo", 0) - var offsetCommitErrors = Map.empty[TopicPartition, Errors] + val topicPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)) Review Comment: nit: Use the second constructor. There are other cases in this file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org