This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d01662fb0f7 KAFKA-14793: Propagate Topic Ids to the Group Coordinator 
during Offsets Commit (#13378)
d01662fb0f7 is described below

commit d01662fb0f7bbb43e6a2d22f5ea88b5bf136057c
Author: Alexandre Dupriez <[email protected]>
AuthorDate: Wed Mar 15 11:20:26 2023 +0000

    KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets 
Commit (#13378)
    
    This patch refactors the GroupCoordinator to use TopicIdPartition on the 
offsets commit paths. This change is required by 
https://github.com/apache/kafka/pull/13240 in order to correctly handle topic 
ids.
    
    Reviewers: David Jacot <[email protected]>
---
 .../kafka/coordinator/group/GroupCoordinator.scala |  18 +-
 .../group/GroupCoordinatorAdapter.scala            |  14 +-
 .../kafka/coordinator/group/GroupMetadata.scala    |  36 +-
 .../coordinator/group/GroupMetadataManager.scala   |  32 +-
 .../group/GroupCoordinatorAdapterTest.scala        |  18 +-
 .../group/GroupCoordinatorConcurrencyTest.scala    |  15 +-
 .../coordinator/group/GroupCoordinatorTest.scala   | 393 +++++++++---------
 .../group/GroupMetadataManagerTest.scala           | 453 +++++++++++++--------
 .../coordinator/group/GroupMetadataTest.scala      | 155 ++++---
 9 files changed, 654 insertions(+), 480 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index c52d920b583..d706d35d1d6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.OffsetAndMetadata
 import kafka.server._
 import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.internals.Topic
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@@ -899,8 +899,8 @@ private[group] class GroupCoordinator(
                              memberId: String,
                              groupInstanceId: Option[String],
                              generationId: Int,
-                             offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                             responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit,
+                             offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                             responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
                              requestLocal: RequestLocal = 
RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => 
k -> error })
@@ -917,8 +917,8 @@ private[group] class GroupCoordinator(
                           memberId: String,
                           groupInstanceId: Option[String],
                           generationId: Int,
-                          offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit,
+                          offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicIdPartition, 
Errors] => Unit,
                           requestLocal: RequestLocal = 
RequestLocal.NoCaching): Unit = {
     validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
       case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => 
k -> error })
@@ -956,9 +956,9 @@ private[group] class GroupCoordinator(
                                  generationId: Int,
                                  producerId: Long,
                                  producerEpoch: Short,
-                                 offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
+                                 offsetMetadata: 
immutable.Map[TopicIdPartition, OffsetAndMetadata],
                                  requestLocal: RequestLocal,
-                                 responseCallback: 
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+                                 responseCallback: 
immutable.Map[TopicIdPartition, Errors] => Unit): Unit = {
     group.inLock {
       val validationErrorOpt = validateOffsetCommit(
         group,
@@ -1019,8 +1019,8 @@ private[group] class GroupCoordinator(
                               memberId: String,
                               groupInstanceId: Option[String],
                               generationId: Int,
-                              offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                              responseCallback: immutable.Map[TopicPartition, 
Errors] => Unit,
+                              offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                              responseCallback: 
immutable.Map[TopicIdPartition, Errors] => Unit,
                               requestLocal: RequestLocal): Unit = {
     group.inLock {
       val validationErrorOpt = validateOffsetCommit(
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 3211c160a07..87dba8b1019 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
 import kafka.common.OffsetAndMetadata
 import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
 import kafka.utils.Implicits.MapExtensionMethods
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData, 
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, 
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, 
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, 
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData, 
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchRespon [...]
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -357,7 +357,7 @@ private[group] class GroupCoordinatorAdapter(
     val currentTimeMs = time.milliseconds
     val future = new CompletableFuture[OffsetCommitResponseData]()
 
-    def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+    def callback(commitStatus: Map[TopicIdPartition, Errors]): Unit = {
       val response = new OffsetCommitResponseData()
       val byTopics = new mutable.HashMap[String, 
OffsetCommitResponseData.OffsetCommitResponseTopic]()
 
@@ -388,10 +388,10 @@ private[group] class GroupCoordinatorAdapter(
       case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
     }
 
-    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    val partitions = new mutable.HashMap[TopicIdPartition, OffsetAndMetadata]()
     request.topics.forEach { topic =>
       topic.partitions.forEach { partition =>
-        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        val tp = new TopicIdPartition(Uuid.ZERO_UUID, 
partition.partitionIndex, topic.name)
         partitions += tp -> createOffsetAndMetadata(
           currentTimeMs,
           partition.committedOffset,
@@ -424,7 +424,7 @@ private[group] class GroupCoordinatorAdapter(
     val currentTimeMs = time.milliseconds
     val future = new CompletableFuture[TxnOffsetCommitResponseData]()
 
-    def callback(results: Map[TopicPartition, Errors]): Unit = {
+    def callback(results: Map[TopicIdPartition, Errors]): Unit = {
       val response = new TxnOffsetCommitResponseData()
       val byTopics = new mutable.HashMap[String, 
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()
 
@@ -447,10 +447,10 @@ private[group] class GroupCoordinatorAdapter(
       future.complete(response)
     }
 
-    val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    val partitions = new mutable.HashMap[TopicIdPartition, OffsetAndMetadata]()
     request.topics.forEach { topic =>
       topic.partitions.forEach { partition =>
-        val tp = new TopicPartition(topic.name, partition.partitionIndex)
+        val tp = new TopicIdPartition(Uuid.ZERO_UUID, 
partition.partitionIndex, topic.name)
         partitions += tp -> createOffsetAndMetadata(
           currentTimeMs,
           partition.committedOffset,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 543d7ef8ac6..33ea53d6d25 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
 import 
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.protocol.types.SchemaException
@@ -631,7 +631,8 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
   }
 
-  def onOffsetCommitAppend(topicPartition: TopicPartition, 
offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
+  def onOffsetCommitAppend(topicIdPartition: TopicIdPartition, 
offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
+    val topicPartition = topicIdPartition.topicPartition
     if (pendingOffsetCommits.contains(topicPartition)) {
       if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
         throw new IllegalStateException("Cannot complete offset commit write 
without providing the metadata of the record " +
@@ -649,26 +650,29 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     }
   }
 
-  def failPendingOffsetWrite(topicPartition: TopicPartition, offset: 
OffsetAndMetadata): Unit = {
+  def failPendingOffsetWrite(topicIdPartition: TopicIdPartition, offset: 
OffsetAndMetadata): Unit = {
+    val topicPartition = topicIdPartition.topicPartition
     pendingOffsetCommits.get(topicPartition) match {
       case Some(pendingOffset) if offset == pendingOffset => 
pendingOffsetCommits.remove(topicPartition)
       case _ =>
     }
   }
 
-  def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]): 
Unit = {
+  def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]): 
Unit = {
     receivedConsumerOffsetCommits = true
-    pendingOffsetCommits ++= offsets
+    offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+      pendingOffsetCommits += topicIdPartition.topicPartition -> 
offsetAndMetadata
+    }
   }
 
-  def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, 
OffsetAndMetadata]): Unit = {
+  def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicIdPartition, 
OffsetAndMetadata]): Unit = {
     trace(s"TxnOffsetCommit for producer $producerId and group $groupId with 
offsets $offsets is pending")
     receivedTransactionalOffsetCommits = true
     val producerOffsets = 
pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
       mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
 
-    offsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
-      producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, 
offsetAndMetadata))
+    offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+      producerOffsets.put(topicIdPartition.topicPartition, 
CommitRecordMetadataAndOffset(None, offsetAndMetadata))
     }
   }
 
@@ -679,7 +683,8 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   /* Remove a pending transactional offset commit if the actual offset commit 
record was not written to the log.
    * We will return an error and the client will retry the request, 
potentially to a different coordinator.
    */
-  def failPendingTxnOffsetCommit(producerId: Long, topicPartition: 
TopicPartition): Unit = {
+  def failPendingTxnOffsetCommit(producerId: Long, topicIdPartition: 
TopicIdPartition): Unit = {
+    val topicPartition = topicIdPartition.topicPartition
     pendingTransactionalOffsetCommits.get(producerId) match {
       case Some(pendingOffsets) =>
         val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
@@ -692,8 +697,9 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
     }
   }
 
-  def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
+  def onTxnOffsetCommitAppend(producerId: Long, topicIdPartition: 
TopicIdPartition,
                               commitRecordMetadataAndOffset: 
CommitRecordMetadataAndOffset): Unit = {
+    val topicPartition = topicIdPartition.topicPartition
     pendingTransactionalOffsetCommits.get(producerId) match {
       case Some(pendingOffset) =>
         if (pendingOffset.contains(topicPartition)
@@ -827,6 +833,16 @@ private[group] class GroupMetadata(val groupId: String, 
initialState: GroupState
   // visible for testing
   private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): 
Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)
 
+  // Used for testing
+  private[group] def pendingOffsetCommit(topicIdPartition: TopicIdPartition): 
Option[OffsetAndMetadata] = {
+    pendingOffsetCommits.get(topicIdPartition.topicPartition)
+  }
+
+  // Used for testing
+  private[group] def pendingTxnOffsetCommit(producerId: Long, 
topicIdPartition: TopicIdPartition): Option[CommitRecordMetadataAndOffset] = {
+    
pendingTransactionalOffsetCommits.get(producerId).flatMap(_.get(topicIdPartition.topicPartition))
+  }
+
   def numOffsets: Int = offsets.size
 
   def hasOffsets: Boolean = offsets.nonEmpty || pendingOffsetCommits.nonEmpty 
|| pendingTransactionalOffsetCommits.nonEmpty
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index b45ee5f4736..2ea0d79662e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -41,7 +41,7 @@ import 
org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests.{OffsetCommitRequest, 
OffsetFetchResponse}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, MessageFormatter, 
TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter, 
TopicIdPartition, TopicPartition}
 import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, 
OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0, 
IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
@@ -345,8 +345,8 @@ class GroupMetadataManager(brokerId: Int,
    */
   def storeOffsets(group: GroupMetadata,
                    consumerId: String,
-                   offsetMetadata: immutable.Map[TopicPartition, 
OffsetAndMetadata],
-                   responseCallback: immutable.Map[TopicPartition, Errors] => 
Unit,
+                   offsetMetadata: immutable.Map[TopicIdPartition, 
OffsetAndMetadata],
+                   responseCallback: immutable.Map[TopicIdPartition, Errors] 
=> Unit,
                    producerId: Long = RecordBatch.NO_PRODUCER_ID,
                    producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
                    requestLocal: RequestLocal = RequestLocal.NoCaching): Unit 
= {
@@ -375,8 +375,8 @@ class GroupMetadataManager(brokerId: Int,
           val timestampType = TimestampType.CREATE_TIME
           val timestamp = time.milliseconds()
 
-          val records = filteredOffsetMetadata.map { case (topicPartition, 
offsetAndMetadata) =>
-            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicPartition)
+          val records = filteredOffsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
+            val key = GroupMetadataManager.offsetCommitKey(group.groupId, 
topicIdPartition.topicPartition)
             val value = 
GroupMetadataManager.offsetCommitValue(offsetAndMetadata, 
interBrokerProtocolVersion)
             new SimpleRecord(timestamp, key, value)
           }
@@ -406,11 +406,11 @@ class GroupMetadataManager(brokerId: Int,
             val responseError = group.inLock {
               if (status.error == Errors.NONE) {
                 if (!group.is(Dead)) {
-                  filteredOffsetMetadata.forKeyValue { (topicPartition, 
offsetAndMetadata) =>
+                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
                     if (isTxnOffsetCommit)
-                      group.onTxnOffsetCommitAppend(producerId, 
topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
+                      group.onTxnOffsetCommitAppend(producerId, 
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), 
offsetAndMetadata))
                     else
-                      group.onOffsetCommitAppend(topicPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+                      group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                   }
                 }
 
@@ -422,11 +422,11 @@ class GroupMetadataManager(brokerId: Int,
                 if (!group.is(Dead)) {
                   if (!group.hasPendingOffsetCommitsFromProducer(producerId))
                     removeProducerGroup(producerId, group.groupId)
-                  filteredOffsetMetadata.forKeyValue { (topicPartition, 
offsetAndMetadata) =>
+                  filteredOffsetMetadata.forKeyValue { (topicIdPartition, 
offsetAndMetadata) =>
                     if (isTxnOffsetCommit)
-                      group.failPendingTxnOffsetCommit(producerId, 
topicPartition)
+                      group.failPendingTxnOffsetCommit(producerId, 
topicIdPartition)
                     else
-                      group.failPendingOffsetWrite(topicPartition, 
offsetAndMetadata)
+                      group.failPendingOffsetWrite(topicIdPartition, 
offsetAndMetadata)
                   }
                 }
 
@@ -455,11 +455,11 @@ class GroupMetadataManager(brokerId: Int,
             }
 
             // compute the final error codes for the commit response
-            val commitStatus = offsetMetadata.map { case (topicPartition, 
offsetAndMetadata) =>
+            val commitStatus = offsetMetadata.map { case (topicIdPartition, 
offsetAndMetadata) =>
               if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
-                (topicPartition, responseError)
+                (topicIdPartition, responseError)
               else
-                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+                (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
             }
 
             // finally trigger the callback logic passed from the API layer
@@ -480,8 +480,8 @@ class GroupMetadataManager(brokerId: Int,
           appendForGroup(group, entries, requestLocal, putCacheCallback)
 
         case None =>
-          val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
-            (topicPartition, Errors.NOT_COORDINATOR)
+          val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+            (topicIdPartition, Errors.NOT_COORDINATOR)
           }
           responseCallback(commitStatus)
       }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 6704e2317e6..bcf24db2ecc 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
 import 
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, 
SyncGroupCallback}
 import kafka.server.RequestLocal
 import kafka.utils.MockTime
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.errors.{InvalidGroupIdException, 
UnsupportedVersionException}
 import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData, 
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, 
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, 
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, 
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, 
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData, 
OffsetFetchResponseData, SyncGroupRequestData, SyncGr [...]
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
@@ -662,8 +662,8 @@ class GroupCoordinatorAdapterTest {
     val future = adapter.commitOffsets(ctx, data, bufferSupplier)
     assertFalse(future.isDone)
 
-    val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+    val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] => 
Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit])
 
     verify(groupCoordinator).handleCommitOffsets(
       ArgumentMatchers.eq(data.groupId),
@@ -671,7 +671,7 @@ class GroupCoordinatorAdapterTest {
       ArgumentMatchers.eq(None),
       ArgumentMatchers.eq(data.generationId),
       ArgumentMatchers.eq(Map(
-        new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+        new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new 
OffsetAndMetadata(
           offset = 100,
           leaderEpoch = Optional.of[Integer](1),
           metadata = "",
@@ -684,7 +684,7 @@ class GroupCoordinatorAdapterTest {
     )
 
     capturedCallback.getValue.apply(Map(
-      new TopicPartition("foo", 0) -> Errors.NONE
+      new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE
     ))
 
     val expectedResponseData = new OffsetCommitResponseData()
@@ -732,8 +732,8 @@ class GroupCoordinatorAdapterTest {
     val future = adapter.commitTransactionalOffsets(ctx, data, bufferSupplier)
     assertFalse(future.isDone)
 
-    val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
-      ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+    val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] => 
Unit] =
+      ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit])
 
     verify(groupCoordinator).handleTxnCommitOffsets(
       ArgumentMatchers.eq(data.groupId),
@@ -743,7 +743,7 @@ class GroupCoordinatorAdapterTest {
       ArgumentMatchers.eq(None),
       ArgumentMatchers.eq(data.generationId),
       ArgumentMatchers.eq(Map(
-        new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+        new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new 
OffsetAndMetadata(
           offset = 100,
           leaderEpoch = Optional.of[Integer](1),
           metadata = "",
@@ -756,7 +756,7 @@ class GroupCoordinatorAdapterTest {
     )
 
     capturedCallback.getValue.apply(Map(
-      new TopicPartition("foo", 0) -> Errors.NONE
+      new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE
     ))
 
     val expectedData = new TxnOffsetCommitResponseData()
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index bd39b3190e7..40de17749b3 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,7 +26,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@@ -283,8 +283,8 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
       callback
     }
     override def runWithCallback(member: GroupMember, responseCallback: 
CommitOffsetCallback): Unit = {
-      val tp = new TopicPartition("topic", 0)
-      val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", 
Time.SYSTEM.milliseconds()))
+      val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+      val offsets = immutable.Map(tip -> OffsetAndMetadata(1, "", 
Time.SYSTEM.milliseconds()))
       groupCoordinator.handleCommitOffsets(member.groupId, member.memberId,
         member.groupInstanceId, member.generationId, offsets, responseCallback)
       replicaManager.tryCompleteActions()
@@ -297,14 +297,13 @@ class GroupCoordinatorConcurrencyTest extends 
AbstractCoordinatorConcurrencyTest
 
   class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends 
CommitOffsetsOperation {
     override def runWithCallback(member: GroupMember, responseCallback: 
CommitOffsetCallback): Unit = {
-      val tp = new TopicPartition("topic", 0)
-      val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", 
Time.SYSTEM.milliseconds()))
+      val offsets = immutable.Map(new TopicIdPartition(Uuid.randomUuid(), 0, 
"topic") -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
       val producerId = 1000L
       val producerEpoch : Short = 2
       // When transaction offsets are appended to the log, transactions may be 
scheduled for
       // completion. Since group metadata locks are acquired for transaction 
completion, include
       // this in the callback to test that there are no deadlocks.
-      def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit 
= {
+      def callbackWithTxnCompletion(errors: Map[TopicIdPartition, Errors]): 
Unit = {
         val offsetsPartitions = (0 to numPartitions).map(new 
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
         groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId,
           offsetsPartitions.map(_.partition).toSet, isCommit = 
random.nextBoolean)
@@ -373,8 +372,8 @@ object GroupCoordinatorConcurrencyTest {
   type HeartbeatCallback = Errors => Unit
   type OffsetFetchCallbackParams = (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData])
   type OffsetFetchCallback = (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) => Unit
-  type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
-  type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+  type CommitOffsetCallbackParams = Map[TopicIdPartition, Errors]
+  type CommitOffsetCallback = Map[TopicIdPartition, Errors] => Unit
   type LeaveGroupCallbackParams = LeaveGroupResult
   type LeaveGroupCallback = LeaveGroupResult => Unit
   type CompleteTxnCallbackParams = Errors
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 5a9ff6ae6ea..cbb1e5a7d60 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -22,7 +22,7 @@ import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, 
ReplicaManager, RequestLocal}
 import kafka.utils._
 import kafka.utils.timer.MockTimer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -60,8 +60,8 @@ class GroupCoordinatorTest {
   type SyncGroupCallback = SyncGroupResult => Unit
   type HeartbeatCallbackParams = Errors
   type HeartbeatCallback = Errors => Unit
-  type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
-  type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+  type CommitOffsetCallbackParams = Map[TopicIdPartition, Errors]
+  type CommitOffsetCallback = Map[TopicIdPartition, Errors] => Unit
   type LeaveGroupCallback = LeaveGroupResult => Unit
 
   val ClientId = "consumer-test"
@@ -160,11 +160,11 @@ class GroupCoordinatorTest {
     assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse)
 
     // OffsetCommit
-    val topicPartition = new TopicPartition("foo", 0)
-    var offsetCommitErrors = Map.empty[TopicPartition, Errors]
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0 , "foo")
+    var offsetCommitErrors = Map.empty[TopicIdPartition, Errors]
     groupCoordinator.handleCommitOffsets(otherGroupId, memberId, None, 1,
-      Map(topicPartition -> offsetAndMetadata(15L)), result => { 
offsetCommitErrors = result })
-    assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), 
offsetCommitErrors.get(topicPartition))
+      Map(topicIdPartition -> offsetAndMetadata(15L)), result => { 
offsetCommitErrors = result })
+    assertEquals(Map(topicIdPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS), 
offsetCommitErrors)
 
     // Heartbeat
     var heartbeatError: Option[Errors] = None
@@ -1268,12 +1268,12 @@ class GroupCoordinatorTest {
     val memberId = "memberId"
 
     val deadGroupId = "deadGroupId"
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
     groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId, 
Dead, new MockTime()))
-    val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tp -> 
offset))
-    assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, offsetCommitResult(tp))
+    val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tip 
-> offset))
+    assertEquals(Map(tip -> Errors.COORDINATOR_NOT_AVAILABLE), 
offsetCommitResult)
   }
 
   @Test
@@ -1283,14 +1283,14 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation, 
rebalanceResult.leaderId, Map.empty)
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
-    val validOffsetCommitResult = commitOffsets(groupId, 
rebalanceResult.leaderId, rebalanceResult.generation, Map(tp -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(tp))
+    val validOffsetCommitResult = commitOffsets(groupId, 
rebalanceResult.leaderId, rebalanceResult.generation, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult)
 
     val invalidOffsetCommitResult = commitOffsets(groupId, invalidMemberId, 
rebalanceResult.generation,
-      Map(tp -> offset), Some(leaderInstanceId))
-    assertEquals(Errors.FENCED_INSTANCE_ID, invalidOffsetCommitResult(tp))
+      Map(tip -> offset), Some(leaderInstanceId))
+    assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), 
invalidOffsetCommitResult)
   }
 
   @Test
@@ -1785,7 +1785,7 @@ class GroupCoordinatorTest {
   def testCommitMaintainsSession(): Unit = {
     val sessionTimeout = 1000
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, 
protocols,
@@ -1800,8 +1800,8 @@ class GroupCoordinatorTest {
 
     timer.advanceClock(sessionTimeout / 2)
 
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
     timer.advanceClock(sessionTimeout / 2 + 100)
 
@@ -2524,21 +2524,21 @@ class GroupCoordinatorTest {
   @Test
   def testCommitOffsetFromUnknownGroup(): Unit = {
     val generationId = 1
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
-    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
Map(tp -> offset))
-    assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult)
   }
 
   @Test
   def testCommitOffsetWithDefaultGeneration(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
   }
 
   @Test
@@ -2558,33 +2558,33 @@ class GroupCoordinatorTest {
     verifyLeaveGroupResult(leaveGroupResults)
 
     // The simple offset commit should now fail
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+    assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
   }
 
   @Test
   def testFetchOffsets(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = 97L
     val metadata = "some metadata"
     val leaderEpoch = Optional.of[Integer](15)
     val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata, 
timer.time.milliseconds(), None)
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offsetAndMetadata))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offsetAndMetadata))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, error)
 
-    val maybePartitionData = partitionData.get(tp)
+    val maybePartitionData = partitionData.get(tip.topicPartition)
     assertTrue(maybePartitionData.isDefined)
     assertEquals(offset, maybePartitionData.get.offset)
     assertEquals(metadata, maybePartitionData.get.metadata)
@@ -2596,17 +2596,17 @@ class GroupCoordinatorTest {
     // For backwards compatibility, the coordinator supports 
committing/fetching offsets with an empty groupId.
     // To allow inspection and removal of the empty group, we must also 
support DescribeGroups and DeleteGroups
 
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val groupId = ""
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (fetchError, partitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (fetchError, partitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, fetchError)
-    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+    assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
 
     val (describeError, summary) = 
groupCoordinator.handleDescribeGroup(groupId)
     assertEquals(Errors.NONE, describeError)
@@ -2622,26 +2622,26 @@ class GroupCoordinatorTest {
     val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
     assertEquals(Errors.NONE, deleteErrors(groupId))
 
-    val (err, data) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (err, data) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, err)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
data.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
data.get(tip.topicPartition).map(_.offset))
   }
 
   @Test
   def testBasicFetchTxnOffsets(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
 
     // Validate that the offset isn't materialjzed yet.
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tip.topicPartition).map(_.offset))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
@@ -2649,50 +2649,50 @@ class GroupCoordinatorTest {
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
     // Validate that committed offset is materialized.
-    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, secondReqError)
-    assertEquals(Some(0), secondReqPartitionData.get(tp).map(_.offset))
+    assertEquals(Some(0), 
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
   }
 
   @Test
   def testFetchTxnOffsetsWithAbort(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tip.topicPartition).map(_.offset))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Validate that the pending commit is discarded.
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
-    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, secondReqError)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
   }
 
   @Test
   def testFetchPendingTxnOffsetsWithAbort(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
     val nonExistTp = new TopicPartition("non-exist-topic", 0)
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp, nonExistTp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition, nonExistTp)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
-    assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), 
partitionData.get(tp).map(_.error))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tip.topicPartition).map(_.offset))
+    assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), 
partitionData.get(tip.topicPartition).map(_.error))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(nonExistTp).map(_.offset))
     assertEquals(Some(Errors.NONE), partitionData.get(nonExistTp).map(_.error))
 
@@ -2701,65 +2701,65 @@ class GroupCoordinatorTest {
     // Validate that the pending commit is discarded.
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
-    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, secondReqError)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
-    assertEquals(Some(Errors.NONE), 
secondReqPartitionData.get(tp).map(_.error))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
+    assertEquals(Some(Errors.NONE), 
secondReqPartitionData.get(tip.topicPartition).map(_.error))
   }
 
   @Test
   def testFetchPendingTxnOffsetsWithCommit(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "offset")
     val offset = offsetAndMetadata(25)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
-    assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), 
partitionData.get(tp).map(_.error))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tip.topicPartition).map(_.offset))
+    assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT), 
partitionData.get(tip.topicPartition).map(_.error))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
 
     // Validate that the pending commit is committed
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
-    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, secondReqError)
-    assertEquals(Some(25), secondReqPartitionData.get(tp).map(_.offset))
-    assertEquals(Some(Errors.NONE), 
secondReqPartitionData.get(tp).map(_.error))
+    assertEquals(Some(25), 
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
+    assertEquals(Some(Errors.NONE), 
secondReqPartitionData.get(tip.topicPartition).map(_.error))
   }
 
   @Test
   def testFetchTxnOffsetsIgnoreSpuriousCommit(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
-    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
-    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tp)))
+    val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable, Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData.get(tip.topicPartition).map(_.offset))
 
     val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
groupPartitionId)
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.ABORT)
 
-    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (secondReqError, secondReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, secondReqError)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
 
     // Ignore spurious commit.
     handleTxnCompletion(producerId, List(offsetsTopic), 
TransactionResult.COMMIT)
 
-    val (thirdReqError, thirdReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+    val (thirdReqError, thirdReqPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(Seq(tip.topicPartition)))
     assertEquals(Errors.NONE, thirdReqError)
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
thirdReqPartitionData.get(tp).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
thirdReqPartitionData.get(tip.topicPartition).map(_.offset))
   }
 
   @Test
@@ -2768,7 +2768,10 @@ class GroupCoordinatorTest {
     // Both group have pending offset commits.
     // Marker for only one partition is received. That commit should be 
materialized while the other should not.
 
-    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val topicIdPartitions = List(
+      new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"),
+      new TopicIdPartition(Uuid.randomUuid(), 0, "topic2")
+    )
     val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
     val producerId = 1000L
     val producerEpoch: Short = 3
@@ -2786,21 +2789,22 @@ class GroupCoordinatorTest {
     // Ensure that the two groups map to different partitions.
     assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
 
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(partitions(0) -> offsets(0))))
-    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
-    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, 
producerId, producerEpoch, Map(partitions(1) -> offsets(1))))
-    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId, 
producerEpoch, Map(topicIdPartitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0)))
+    commitOffsetResults.append(commitTransactionalOffsets(otherGroupId, 
producerId, producerEpoch, Map(topicIdPartitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1)))
 
     // We got a commit for only one __consumer_offsets partition. We should 
only materialize it's group offsets.
+    val topicPartitions = topicIdPartitions.map(_.topicPartition)
     handleTxnCompletion(producerId, List(offsetTopicPartitions(0)), 
TransactionResult.COMMIT)
-    groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, 
Some(partitions)) match {
+    groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
       case _ =>
     }
 
-    groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, 
Some(partitions)) match {
+    groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
@@ -2812,33 +2816,33 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, errors(1))
 
     // Exactly one offset commit should have been materialized.
-    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(0)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(1)).map(_.offset))
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(topicPartitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(topicPartitions(1)).map(_.offset))
 
     // Now we receive the other marker.
     handleTxnCompletion(producerId, List(offsetTopicPartitions(1)), 
TransactionResult.COMMIT)
     errors.clear()
     partitionData.clear()
-    groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, 
Some(partitions)) match {
+    groupCoordinator.handleFetchOffsets(groupIds(0), requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
       case _ =>
     }
 
-     groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, 
Some(partitions)) match {
+     groupCoordinator.handleFetchOffsets(groupIds(1), requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
       case _ =>
     }
     // Two offsets should have been materialized
-    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(partitions(0)).map(_.offset))
-    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(partitions(1)).map(_.offset))
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(topicPartitions(1)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(1).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(topicPartitions(1)).map(_.offset))
   }
 
   @Test
@@ -2847,7 +2851,10 @@ class GroupCoordinatorTest {
     // Different producers will commit offsets for different partitions.
     // Each partition's offsets should be materialized when the corresponding 
producer's marker is received.
 
-    val partitions = List(new TopicPartition("topic1", 0), new 
TopicPartition("topic2", 0))
+    val topicIdPartitions = List(
+      new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"),
+      new TopicIdPartition(Uuid.randomUuid(), 0, "topic2")
+    )
     val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
     val producerIds = List(1000L, 1005L)
     val producerEpochs: Seq[Short] = List(3, 4)
@@ -2860,16 +2867,17 @@ class GroupCoordinatorTest {
     val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
 
     // producer0 commits the offsets for partition0
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0))))
-    assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(0), producerEpochs(0), Map(topicIdPartitions(0) -> offsets(0))))
+    assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0)))
 
     // producer1 commits the offsets for partition1
-    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1))))
-    assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+    commitOffsetResults.append(commitTransactionalOffsets(groupId, 
producerIds(1), producerEpochs(1), Map(topicIdPartitions(1) -> offsets(1))))
+    assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1)))
 
     // producer0 commits its transaction.
+    val topicPartitions = topicIdPartitions.map(_.topicPartition)
     handleTxnCompletion(producerIds(0), List(offsetTopicPartition), 
TransactionResult.COMMIT)
-    groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(partitions)) match {
+    groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
@@ -2879,13 +2887,13 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, errors(0))
 
     // We should only see the offset commit for producer0
-    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(partitions(0)).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(partitions(1)).map(_.offset))
+    assertEquals(Some(offsets(0).offset), 
partitionData(0).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
partitionData(0).get(topicPartitions(1)).map(_.offset))
 
     // producer1 now commits its transaction.
     handleTxnCompletion(producerIds(1), List(offsetTopicPartition), 
TransactionResult.COMMIT)
 
-    groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(partitions)) match {
+    groupCoordinator.handleFetchOffsets(groupId, requireStable, 
Some(topicPartitions)) match {
       case (error, partData) =>
         errors.append(error)
         partitionData.append(partData)
@@ -2895,8 +2903,8 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, errors(1))
 
     // We should now see the offset commits for both producers.
-    assertEquals(Some(offsets(0).offset), 
partitionData(1).get(partitions(0)).map(_.offset))
-    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(partitions(1)).map(_.offset))
+    assertEquals(Some(offsets(0).offset), 
partitionData(1).get(topicPartitions(0)).map(_.offset))
+    assertEquals(Some(offsets(1).offset), 
partitionData(1).get(topicPartitions(1)).map(_.offset))
   }
 
   @Test
@@ -2917,9 +2925,9 @@ class GroupCoordinatorTest {
 
   @Test
   def testFetchAllOffsets(): Unit = {
-    val tp1 = new TopicPartition("topic", 0)
-    val tp2 = new TopicPartition("topic", 1)
-    val tp3 = new TopicPartition("other-topic", 0)
+    val tip1 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+    val tip2 = new TopicIdPartition(tip1.topicId, 1, "topic")
+    val tip3 = new TopicIdPartition(Uuid.randomUuid(), 0, "other-topic")
     val offset1 = offsetAndMetadata(15)
     val offset2 = offsetAndMetadata(16)
     val offset3 = offsetAndMetadata(17)
@@ -2927,24 +2935,22 @@ class GroupCoordinatorTest {
     assertEquals((Errors.NONE, Map.empty), 
groupCoordinator.handleFetchOffsets(groupId, requireStable))
 
     val commitOffsetResult = commitOffsets(groupId, 
OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 -> 
offset2, tp3 -> offset3))
-    assertEquals(Errors.NONE, commitOffsetResult(tp1))
-    assertEquals(Errors.NONE, commitOffsetResult(tp2))
-    assertEquals(Errors.NONE, commitOffsetResult(tp3))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip1 -> offset1, tip2 -> 
offset2, tip3 -> offset3))
+    assertEquals(Map(tip1 -> Errors.NONE, tip2 -> Errors.NONE, tip3 -> 
Errors.NONE), commitOffsetResult)
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, 
requireStable)
     assertEquals(Errors.NONE, error)
     assertEquals(3, partitionData.size)
     assertTrue(partitionData.forall(_._2.error == Errors.NONE))
-    assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset))
-    assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset))
-    assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset))
+    assertEquals(Some(offset1.offset), 
partitionData.get(tip1.topicPartition).map(_.offset))
+    assertEquals(Some(offset2.offset), 
partitionData.get(tip2.topicPartition).map(_.offset))
+    assertEquals(Some(offset3.offset), 
partitionData.get(tip3.topicPartition).map(_.offset))
   }
 
   @Test
   def testCommitOffsetInCompletingRebalance(): Unit = {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, 
protocols)
@@ -2953,14 +2959,14 @@ class GroupCoordinatorTest {
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, Map(tp -> offset))
-    assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.REBALANCE_IN_PROGRESS), commitOffsetResult)
   }
 
   @Test
   def testCommitOffsetInCompletingRebalanceFromUnknownMemberId(): Unit = {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0 , "topic")
     val offset = offsetAndMetadata(0)
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, 
protocols)
@@ -2968,14 +2974,14 @@ class GroupCoordinatorTest {
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
-    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
Map(tp -> offset))
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, memberId, generationId, 
Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID), commitOffsetResult)
   }
 
   @Test
   def testCommitOffsetInCompletingRebalanceFromIllegalGeneration(): Unit = {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
 
     val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType, 
protocols)
@@ -2984,36 +2990,36 @@ class GroupCoordinatorTest {
     val joinGroupError = joinGroupResult.error
     assertEquals(Errors.NONE, joinGroupError)
 
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId + 1, Map(tp -> offset))
-    assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
generationId + 1, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult)
   }
 
   @Test
   def testManualCommitOffsetShouldNotValidateMemberIdAndInstanceId(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
 
     var commitOffsetResult = commitOffsets(
       groupId,
       JoinGroupRequest.UNKNOWN_MEMBER_ID,
       -1,
-      Map(tp -> offsetAndMetadata(0)),
+      Map(tip -> offsetAndMetadata(0)),
       Some("instance-id")
     )
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
     commitOffsetResult = commitOffsets(
       groupId,
       "unknown",
       -1,
-      Map(tp -> offsetAndMetadata(0)),
+      Map(tip -> offsetAndMetadata(0)),
       None
     )
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
   }
 
   @Test
   def testTxnCommitOffsetWithFencedInstanceId(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
@@ -3021,21 +3027,21 @@ class GroupCoordinatorTest {
     val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId, 
followerInstanceId)
 
     val leaderNoMemberIdCommitOffsetResult = 
commitTransactionalOffsets(groupId, producerId, producerEpoch,
-      Map(tp -> offset), memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupInstanceId = Some(leaderInstanceId))
-    assertEquals(Errors.FENCED_INSTANCE_ID, 
leaderNoMemberIdCommitOffsetResult(tp))
+      Map(tip -> offset), memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID, 
groupInstanceId = Some(leaderInstanceId))
+    assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), 
leaderNoMemberIdCommitOffsetResult)
 
     val leaderInvalidMemberIdCommitOffsetResult = 
commitTransactionalOffsets(groupId, producerId, producerEpoch,
-      Map(tp -> offset), memberId = "invalid-member", groupInstanceId = 
Some(leaderInstanceId))
-    assertEquals(Errors.FENCED_INSTANCE_ID, 
leaderInvalidMemberIdCommitOffsetResult (tp))
+      Map(tip -> offset), memberId = "invalid-member", groupInstanceId = 
Some(leaderInstanceId))
+    assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID), 
leaderInvalidMemberIdCommitOffsetResult)
 
     val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, 
producerId, producerEpoch,
-      Map(tp -> offset), rebalanceResult.leaderId, Some(leaderInstanceId), 
rebalanceResult.generation)
-    assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+      Map(tip -> offset), rebalanceResult.leaderId, Some(leaderInstanceId), 
rebalanceResult.generation)
+    assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
   }
 
   @Test
   def testTxnCommitOffsetWithInvalidMemberId(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
@@ -3045,13 +3051,13 @@ class GroupCoordinatorTest {
     assertEquals(Errors.NONE, joinGroupError)
 
     val invalidIdCommitOffsetResult = commitTransactionalOffsets(groupId, 
producerId, producerEpoch,
-      Map(tp -> offset), "invalid-member")
-    assertEquals(Errors.UNKNOWN_MEMBER_ID, invalidIdCommitOffsetResult (tp))
+      Map(tip -> offset), "invalid-member")
+    assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID), 
invalidIdCommitOffsetResult)
   }
 
   @Test
   def testTxnCommitOffsetWithKnownMemberId(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
@@ -3063,13 +3069,13 @@ class GroupCoordinatorTest {
 
     val assignedConsumerId = joinGroupResult.memberId
     val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, 
producerId, producerEpoch,
-      Map(tp -> offset), assignedConsumerId, generationId = 
joinGroupResult.generationId)
-    assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+      Map(tip -> offset), assignedConsumerId, generationId = 
joinGroupResult.generationId)
+    assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
   }
 
   @Test
   def testTxnCommitOffsetWithIllegalGeneration(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
@@ -3082,13 +3088,13 @@ class GroupCoordinatorTest {
     val assignedConsumerId = joinGroupResult.memberId
     val initialGenerationId = joinGroupResult.generationId
     val illegalGenerationCommitOffsetResult = 
commitTransactionalOffsets(groupId, producerId, producerEpoch,
-      Map(tp -> offset), memberId = assignedConsumerId, generationId = 
initialGenerationId + 5)
-    assertEquals(Errors.ILLEGAL_GENERATION, 
illegalGenerationCommitOffsetResult(tp))
+      Map(tip -> offset), memberId = assignedConsumerId, generationId = 
initialGenerationId + 5)
+    assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), 
illegalGenerationCommitOffsetResult)
   }
 
   @Test
   def testTxnCommitOffsetWithLegalGeneration(): Unit = {
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
@@ -3101,8 +3107,8 @@ class GroupCoordinatorTest {
     val assignedConsumerId = joinGroupResult.memberId
     val initialGenerationId = joinGroupResult.generationId
     val leaderCommitOffsetResult = commitTransactionalOffsets(groupId, 
producerId, producerEpoch,
-      Map(tp -> offset), memberId = assignedConsumerId, generationId = 
initialGenerationId)
-    assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+      Map(tip -> offset), memberId = assignedConsumerId, generationId = 
initialGenerationId)
+    assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
   }
 
   @Test
@@ -3476,10 +3482,10 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, 
joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId -> 
Array[Byte]()))
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val tp = new TopicPartition("topic", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
     val offset = offsetAndMetadata(0)
-    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
joinGroupResult.generationId, Map(tp -> offset))
-    assertEquals(Errors.NONE, commitOffsetResult(tp))
+    val commitOffsetResult = commitOffsets(groupId, assignedMemberId, 
joinGroupResult.generationId, Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
 
     val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
     assertEquals(Stable.toString, describeGroupResult._2.state)
@@ -3534,14 +3540,13 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, 
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val t1p0 = new TopicPartition("foo", 0)
-    val t2p0 = new TopicPartition("bar", 0)
+    val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
     val offset = offsetAndMetadata(37)
 
     val validOffsetCommitResult = commitOffsets(groupId, 
joinGroupResult.memberId, joinGroupResult.generationId,
-      Map(t1p0 -> offset, t2p0 -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+      Map(ti1p0 -> offset, ti2p0 -> offset))
+    assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), 
validOffsetCommitResult)
 
     // and leaves.
     val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
@@ -3556,17 +3561,17 @@ class GroupCoordinatorTest {
     
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
     
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(t1p0),
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(ti1p0.topicPartition),
       RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
-    assertEquals(Some(Errors.NONE), topics.get(t1p0))
+    assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
 
-    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(t1p0, t2p0)))
+    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
 
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(t1p0).map(_.offset))
-    assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+    assertEquals(Some(offset.offset), 
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
   }
 
   @Test
@@ -3577,19 +3582,19 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, 
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val tp = new TopicPartition("foo", 0)
+    val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = offsetAndMetadata(37)
 
     val validOffsetCommitResult = commitOffsets(groupId, 
joinGroupResult.memberId, joinGroupResult.generationId,
-      Map(tp -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(tp))
+      Map(tip -> offset))
+    assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult)
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(tp),
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(tip.topicPartition),
       RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
-    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(tp))
+    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), 
topics.get(tip.topicPartition))
   }
 
   @Test
@@ -3616,14 +3621,13 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, 
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val t1p0 = new TopicPartition("foo", 0)
-    val t2p0 = new TopicPartition("bar", 0)
+    val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
     val offset = offsetAndMetadata(37)
 
     val validOffsetCommitResult = commitOffsets(groupId, 
joinGroupResult.memberId, joinGroupResult.generationId,
-      Map(t1p0 -> offset, t2p0 -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+      Map(ti1p0 -> offset, ti2p0 -> offset))
+    assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), 
validOffsetCommitResult)
 
     // and leaves.
     val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
@@ -3638,17 +3642,17 @@ class GroupCoordinatorTest {
     
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
     
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(t1p0),
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(ti1p0.topicPartition),
       RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(1, topics.size)
-    assertEquals(Some(Errors.NONE), topics.get(t1p0))
+    assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
 
-    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(t1p0, t2p0)))
+    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
 
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(t1p0).map(_.offset))
-    assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+    assertEquals(Some(offset.offset), 
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
   }
 
   @Test
@@ -3664,14 +3668,13 @@ class GroupCoordinatorTest {
     val syncGroupResult = syncGroupLeader(groupId, 
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
     assertEquals(Errors.NONE, syncGroupResult.error)
 
-    val t1p0 = new TopicPartition("foo", 0)
-    val t2p0 = new TopicPartition("bar", 0)
+    val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
     val offset = offsetAndMetadata(37)
 
     val validOffsetCommitResult = commitOffsets(groupId, 
joinGroupResult.memberId, joinGroupResult.generationId,
-      Map(t1p0 -> offset, t2p0 -> offset))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
-    assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+      Map(ti1p0 -> offset, ti2p0 -> offset))
+    assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE), 
validOffsetCommitResult)
 
     
assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
 
@@ -3682,18 +3685,18 @@ class GroupCoordinatorTest {
     
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
     
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
 
-    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(t1p0, t2p0),
+    val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId, 
Seq(ti1p0.topicPartition, ti2p0.topicPartition),
       RequestLocal.NoCaching)
 
     assertEquals(Errors.NONE, groupError)
     assertEquals(2, topics.size)
-    assertEquals(Some(Errors.NONE), topics.get(t1p0))
-    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t2p0))
+    assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
+    assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), 
topics.get(ti2p0.topicPartition))
 
-    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(t1p0, t2p0)))
+    val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId, 
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
 
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(t1p0).map(_.offset))
-    assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+    assertEquals(Some(offset.offset), 
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
   }
 
   @Test
@@ -4024,7 +4027,7 @@ class GroupCoordinatorTest {
   private def commitOffsets(groupId: String,
                             memberId: String,
                             generationId: Int,
-                            offsets: Map[TopicPartition, OffsetAndMetadata],
+                            offsets: Map[TopicIdPartition, OffsetAndMetadata],
                             groupInstanceId: Option[String] = None): 
CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
@@ -4055,10 +4058,10 @@ class GroupCoordinatorTest {
   private def commitTransactionalOffsets(groupId: String,
                                          producerId: Long,
                                          producerEpoch: Short,
-                                         offsets: Map[TopicPartition, 
OffsetAndMetadata],
+                                         offsets: Map[TopicIdPartition, 
OffsetAndMetadata],
                                          memberId: String = 
JoinGroupRequest.UNKNOWN_MEMBER_ID,
                                          groupInstanceId: Option[String] = 
Option.empty,
-                                         generationId: Int = 
JoinGroupRequest.UNKNOWN_GENERATION_ID) = {
+                                         generationId: Int = 
JoinGroupRequest.UNKNOWN_GENERATION_ID) : CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
     val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition, 
PartitionResponse] => Unit])
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index e24530918b5..333aeac5af7 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -32,7 +32,7 @@ import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, 
Metrics => kMetrics}
 import org.apache.kafka.common.protocol.Errors
@@ -1248,7 +1248,7 @@ class GroupMetadataManagerTest {
   @Test
   def testCommitOffset(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1256,11 +1256,11 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
+    val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
 
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1269,12 +1269,12 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
+    val maybeError = commitErrors.get.get(topicIdPartition)
     assertEquals(Some(Errors.NONE), maybeError)
     assertTrue(group.hasOffsets)
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition)))
-    val maybePartitionResponse = cachedOffsets.get(topicPartition)
+    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicIdPartition.topicPartition)))
+    val maybePartitionResponse = 
cachedOffsets.get(topicIdPartition.topicPartition)
     assertFalse(maybePartitionResponse.isEmpty)
 
     val partitionResponse = maybePartitionResponse.get
@@ -1297,7 +1297,7 @@ class GroupMetadataManagerTest {
   @Test
   def testTransactionalCommitOffsetCommitted(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
     val producerId = 232L
     val producerEpoch = 0.toShort
@@ -1308,12 +1308,12 @@ class GroupMetadataManagerTest {
     groupMetadataManager.addGroup(group)
 
     val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
-    val offsets = immutable.Map(topicPartition -> offsetAndMetadata)
+    val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata)
 
     val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, 
PartitionResponse] => Unit] = 
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
     
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1340,13 +1340,13 @@ class GroupMetadataManagerTest {
     group.completePendingTxnOffsetCommit(producerId, isCommit = true)
     assertTrue(group.hasOffsets)
     assertFalse(group.allOffsets.isEmpty)
-    assertEquals(Some(offsetAndMetadata), group.offset(topicPartition))
+    assertEquals(Some(offsetAndMetadata), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testTransactionalCommitOffsetAppendFailure(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
     val producerId = 232L
     val producerEpoch = 0.toShort
@@ -1356,12 +1356,12 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
+    val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
 
 
     
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1394,7 +1394,7 @@ class GroupMetadataManagerTest {
   @Test
   def testTransactionalCommitOffsetAborted(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
     val producerId = 232L
     val producerEpoch = 0.toShort
@@ -1404,12 +1404,12 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
+    val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
 
     
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1443,7 +1443,7 @@ class GroupMetadataManagerTest {
   def testCommitOffsetWhenCoordinatorHasMoved(): Unit = {
     when(replicaManager.getMagic(any())).thenReturn(None)
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1451,17 +1451,17 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
+    val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
     groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
 
     assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
+    val maybeError = commitErrors.get.get(topicIdPartition)
     assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
 
     verify(replicaManager).getMagic(any())
@@ -1482,7 +1482,7 @@ class GroupMetadataManagerTest {
   private def assertCommitOffsetErrorMapping(appendError: Errors, 
expectedError: Errors): Unit = {
     reset(replicaManager)
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1490,12 +1490,12 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
+    val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset, 
"", time.milliseconds()))
 
     
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1507,12 +1507,19 @@ class GroupMetadataManagerTest {
       new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
+    val maybeError = commitErrors.get.get(topicIdPartition)
     assertEquals(Some(expectedError), maybeError)
     assertFalse(group.hasOffsets)
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition).map(_.offset))
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager).getMagic(any())
     // Will not update sensor if failed
@@ -1522,8 +1529,8 @@ class GroupMetadataManagerTest {
   @Test
   def testCommitOffsetPartialFailure(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
-    val topicPartitionFailed = new TopicPartition("foo", 1)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val topicIdPartitionFailed = new TopicIdPartition(Uuid.randomUuid(), 1, 
"foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1532,15 +1539,15 @@ class GroupMetadataManagerTest {
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(
-      topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
+      topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
       // This will failed
-      topicPartitionFailed -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+      topicIdPartitionFailed -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
     )
 
     
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1552,13 +1559,23 @@ class GroupMetadataManagerTest {
       new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition))
-    assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), 
commitErrors.get.get(topicPartitionFailed))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition))
+    assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), 
commitErrors.get.get(topicIdPartitionFailed))
     assertTrue(group.hasOffsets)
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed)))
-    assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartitionFailed).map(_.offset))
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition.topicPartition, 
topicIdPartitionFailed.topicPartition))
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartitionFailed.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager).appendRecords(anyLong(),
       anyShort(),
@@ -1576,7 +1593,7 @@ class GroupMetadataManagerTest {
   @Test
   def testOffsetMetadataTooLarge(): Unit = {
     val memberId = ""
-    val topicPartition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1584,11 +1601,11 @@ class GroupMetadataManagerTest {
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(
-      topicPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+      topicIdPartition -> OffsetAndMetadata(offset, "s" * 
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
     )
 
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1597,12 +1614,19 @@ class GroupMetadataManagerTest {
     assertFalse(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    val maybeError = commitErrors.get.get(topicPartition)
+    val maybeError = commitErrors.get.get(topicIdPartition)
     assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
     assertFalse(group.hasOffsets)
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition).map(_.offset))
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+    )
 
     assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
   }
@@ -1610,8 +1634,8 @@ class GroupMetadataManagerTest {
   @Test
   def testExpireOffset(): Unit = {
     val memberId = ""
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
+    val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, 
"foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1622,13 +1646,13 @@ class GroupMetadataManagerTest {
     // expire the offset after 1 millisecond
     val startMs = time.milliseconds
     val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+      topicIdPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+      topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1636,7 +1660,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
 
     // expire only one of the offsets
     time.sleep(2)
@@ -1647,12 +1671,16 @@ class GroupMetadataManagerTest {
     groupMetadataManager.cleanupGroupMetadata()
 
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+    assertEquals(Some(offset), 
group.offset(topicIdPartition2.topicPartition).map(_.offset))
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition2).map(_.offset))
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition))
+    )
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
 
     verify(replicaManager).appendRecords(anyLong(),
       anyShort(),
@@ -1759,8 +1787,8 @@ class GroupMetadataManagerTest {
     // this is a group which is only using kafka for offset storage
 
     val memberId = ""
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
+    val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+    val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, 
"foo")
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1771,13 +1799,13 @@ class GroupMetadataManagerTest {
     // expire the offset after 1 millisecond
     val startMs = time.milliseconds
     val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", 
startMs, Some(startMs + 1)),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+      topicIdPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", 
startMs, Some(startMs + 1)),
+      topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1785,7 +1813,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
 
     // expire all of the offsets
     time.sleep(4)
@@ -1811,9 +1839,19 @@ class GroupMetadataManagerTest {
 
     // the full group should be gone since all offsets were removed
     assertEquals(None, groupMetadataManager.getGroup(groupId))
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition2).map(_.offset))
+    val cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager).onlinePartition(groupTopicPartition)
   }
@@ -1824,9 +1862,9 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
     val topic = "foo"
-    val topicPartition1 = new TopicPartition(topic, 0)
-    val topicPartition2 = new TopicPartition(topic, 1)
-    val topicPartition3 = new TopicPartition(topic, 2)
+    val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
+    val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1, 
topic)
+    val topicIdPartition3 = new TopicIdPartition(topicIdPartition1.topicId, 2, 
topic)
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1848,14 +1886,14 @@ class GroupMetadataManagerTest {
     // new clients, no per-partition expiry timestamp, offsets of group expire 
together
     val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
     val offsets = immutable.Map(
-      topicPartition1 -> tp1OffsetAndMetadata,
-      topicPartition2 -> tp2OffsetAndMetadata,
-      topicPartition3 -> tp3OffsetAndMetadata)
+      topicIdPartition1 -> tp1OffsetAndMetadata,
+      topicIdPartition2 -> tp2OffsetAndMetadata,
+      topicIdPartition3 -> tp3OffsetAndMetadata)
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -1863,7 +1901,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
 
     // do not expire any offset even though expiration timestamp is reached 
for one (due to group still being active)
     time.sleep(2)
@@ -1872,14 +1910,18 @@ class GroupMetadataManagerTest {
 
     // group and offsets should still be there
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
-    assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
-    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
-    var cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2, 
topicPartition3)))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition2).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition3).map(_.offset))
+    assertEquals(Some(tp1OffsetAndMetadata), 
group.offset(topicIdPartition1.topicPartition))
+    assertEquals(Some(tp2OffsetAndMetadata), 
group.offset(topicIdPartition2.topicPartition))
+    assertEquals(Some(tp3OffsetAndMetadata), 
group.offset(topicIdPartition3.topicPartition))
+
+    var cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+    )
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
 
     verify(replicaManager).onlinePartition(groupTopicPartition)
 
@@ -1894,14 +1936,18 @@ class GroupMetadataManagerTest {
 
     // group is empty now, only one offset should expire
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
-    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2, 
topicPartition3)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition2).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition3).map(_.offset))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+    assertEquals(Some(tp2OffsetAndMetadata), 
group.offset(topicIdPartition2.topicPartition))
+    assertEquals(Some(tp3OffsetAndMetadata), 
group.offset(topicIdPartition3.topicPartition))
+
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+    )
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
 
     verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
 
@@ -1915,14 +1961,18 @@ class GroupMetadataManagerTest {
 
     // one more offset should expire
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(None, group.offset(topicPartition2))
-    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2, 
topicPartition3)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition2).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition3).map(_.offset))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+    assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+    assertEquals(Some(tp3OffsetAndMetadata), 
group.offset(topicIdPartition3.topicPartition))
+
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+    )
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
 
     verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
 
@@ -1933,14 +1983,27 @@ class GroupMetadataManagerTest {
 
     // one more offset should expire
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(None, group.offset(topicPartition2))
-    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2, 
topicPartition3)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition2).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition3).map(_.offset))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+    assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+    assertEquals(Some(tp3OffsetAndMetadata), 
group.offset(topicIdPartition3.topicPartition))
+
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager, times(4)).onlinePartition(groupTopicPartition)
 
@@ -1955,14 +2018,27 @@ class GroupMetadataManagerTest {
 
     // group and all its offsets should be gone now
     assertEquals(None, groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
-    assertEquals(None, group.offset(topicPartition2))
-    assertEquals(None, group.offset(topicPartition3))
-
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2, 
topicPartition3)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition2).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition3).map(_.offset))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+    assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+    assertEquals(None, group.offset(topicIdPartition3.topicPartition))
+
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition, 
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager, times(5)).onlinePartition(groupTopicPartition)
 
@@ -1973,7 +2049,7 @@ class GroupMetadataManagerTest {
   def testOffsetExpirationOfSimpleConsumer(): Unit = {
     val memberId = "memberId"
     val topic = "foo"
-    val topicPartition1 = new TopicPartition(topic, 0)
+    val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
     val offset = 37
 
     groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1987,12 +2063,12 @@ class GroupMetadataManagerTest {
     val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
     // new clients, no per-partition expiry timestamp, offsets of group expire 
together
     val offsets = immutable.Map(
-      topicPartition1 -> tp1OffsetAndMetadata)
+      topicIdPartition1 -> tp1OffsetAndMetadata)
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -2000,20 +2076,24 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
 
     // do not expire offsets while within retention period since commit 
timestamp
-    val expiryTimestamp = offsets(topicPartition1).commitTimestamp + 
defaultOffsetRetentionMs
+    val expiryTimestamp = offsets(topicIdPartition1).commitTimestamp + 
defaultOffsetRetentionMs
     time.sleep(expiryTimestamp - time.milliseconds() - 1)
 
     groupMetadataManager.cleanupGroupMetadata()
 
     // group and offsets should still be there
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
-    assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+    assertEquals(Some(tp1OffsetAndMetadata), 
group.offset(topicIdPartition1.topicPartition))
 
-    var cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1)))
-    assertEquals(Some(offset), 
cachedOffsets.get(topicPartition1).map(_.offset))
+    var cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition))
+    )
+    assertEquals(Some(offset), 
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
 
     verify(replicaManager).onlinePartition(groupTopicPartition)
 
@@ -2028,10 +2108,17 @@ class GroupMetadataManagerTest {
 
     // group and all its offsets should be gone now
     assertEquals(None, groupMetadataManager.getGroup(groupId))
-    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(None, group.offset(topicIdPartition1.topicPartition))
 
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topicPartition1)))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topicPartition1).map(_.offset))
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(topicIdPartition1.topicPartition))
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
 
@@ -2045,12 +2132,12 @@ class GroupMetadataManagerTest {
     val clientHost = "localhost"
 
     val topic1 = "foo"
-    val topic1Partition0 = new TopicPartition(topic1, 0)
-    val topic1Partition1 = new TopicPartition(topic1, 1)
+    val topic1IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic1)
+    val topic1IdPartition1 = new TopicIdPartition(topic1IdPartition0.topicId, 
1, topic1)
 
     val topic2 = "bar"
-    val topic2Partition0 = new TopicPartition(topic2, 0)
-    val topic2Partition1 = new TopicPartition(topic2, 1)
+    val topic2IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic2)
+    val topic2IdPartition1 = new TopicIdPartition(topic2IdPartition0.topicId, 
1, topic2)
 
     val offset = 37
 
@@ -2087,15 +2174,15 @@ class GroupMetadataManagerTest {
     val t2p1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
 
     val offsets = immutable.Map(
-      topic1Partition0 -> t1p0OffsetAndMetadata,
-      topic1Partition1 -> t1p1OffsetAndMetadata,
-      topic2Partition0 -> t2p0OffsetAndMetadata,
-      topic2Partition1 -> t2p1OffsetAndMetadata)
+      topic1IdPartition0 -> t1p0OffsetAndMetadata,
+      topic1IdPartition1 -> t1p1OffsetAndMetadata,
+      topic2IdPartition0 -> t2p0OffsetAndMetadata,
+      topic2IdPartition1 -> t2p1OffsetAndMetadata)
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
-    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
-    def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+    var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
       commitErrors = Some(errors)
     }
 
@@ -2103,7 +2190,7 @@ class GroupMetadataManagerTest {
     assertTrue(group.hasOffsets)
 
     assertFalse(commitErrors.isEmpty)
-    assertEquals(Some(Errors.NONE), commitErrors.get.get(topic1Partition0))
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topic1IdPartition0))
 
     // advance time to just after the offset of last partition is to be expired
     time.sleep(defaultOffsetRetentionMs + 2)
@@ -2114,17 +2201,38 @@ class GroupMetadataManagerTest {
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
     assert(group.is(Stable))
 
-    assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1Partition0))
-    assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1Partition1))
-    assertEquals(Some(t2p0OffsetAndMetadata), group.offset(topic2Partition0))
-    assertEquals(Some(t2p1OffsetAndMetadata), group.offset(topic2Partition1))
-
-    var cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topic1Partition0, topic1Partition1, 
topic2Partition0, topic2Partition1)))
+    assertEquals(Some(t1p0OffsetAndMetadata), 
group.offset(topic1IdPartition0.topicPartition))
+    assertEquals(Some(t1p1OffsetAndMetadata), 
group.offset(topic1IdPartition1.topicPartition))
+    assertEquals(Some(t2p0OffsetAndMetadata), 
group.offset(topic2IdPartition0.topicPartition))
+    assertEquals(Some(t2p1OffsetAndMetadata), 
group.offset(topic2IdPartition1.topicPartition))
+
+    var cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(
+        topic1IdPartition0.topicPartition,
+        topic1IdPartition1.topicPartition,
+        topic2IdPartition0.topicPartition,
+        topic2IdPartition1.topicPartition)
+      )
+    )
 
-    assertEquals(Some(offset), 
cachedOffsets.get(topic1Partition0).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topic1Partition1).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topic2Partition0).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topic2Partition1).map(_.offset))
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(offset),
+      cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
+    )
 
     verify(replicaManager).onlinePartition(groupTopicPartition)
 
@@ -2159,17 +2267,32 @@ class GroupMetadataManagerTest {
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
     assert(group.is(Stable))
 
-    assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1Partition0))
-    assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1Partition1))
-    assertEquals(None, group.offset(topic2Partition0))
-    assertEquals(None, group.offset(topic2Partition1))
-
-    cachedOffsets = groupMetadataManager.getOffsets(groupId, 
defaultRequireStable, Some(Seq(topic1Partition0, topic1Partition1, 
topic2Partition0, topic2Partition1)))
+    assertEquals(Some(t1p0OffsetAndMetadata), 
group.offset(topic1IdPartition0.topicPartition))
+    assertEquals(Some(t1p1OffsetAndMetadata), 
group.offset(topic1IdPartition1.topicPartition))
+    assertEquals(None, group.offset(topic2IdPartition0.topicPartition))
+    assertEquals(None, group.offset(topic2IdPartition1.topicPartition))
+
+    cachedOffsets = groupMetadataManager.getOffsets(
+      groupId,
+      defaultRequireStable,
+      Some(Seq(
+        topic1IdPartition0.topicPartition,
+        topic1IdPartition1.topicPartition,
+        topic2IdPartition0.topicPartition,
+        topic2IdPartition1.topicPartition)
+      )
+    )
 
-    assertEquals(Some(offset), 
cachedOffsets.get(topic1Partition0).map(_.offset))
-    assertEquals(Some(offset), 
cachedOffsets.get(topic1Partition1).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topic2Partition0).map(_.offset))
-    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), 
cachedOffsets.get(topic2Partition1).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset))
+    assertEquals(Some(offset), 
cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset))
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
+    )
+    assertEquals(
+      Some(OffsetFetchResponse.INVALID_OFFSET),
+      cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
+    )
   }
 
   @Test
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index cafc73c4efb..c01d11bc618 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
 import kafka.common.OffsetAndMetadata
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.utils.{MockTime, Time}
 import org.junit.jupiter.api.Assertions._
@@ -262,7 +262,7 @@ class GroupMetadataTest {
   @Test
   def testOffsetRemovalDuringTransitionFromEmptyToNonEmpty(): Unit = {
     val topic = "foo"
-    val partition = new TopicPartition(topic, 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
     val time = new MockTime()
     group = new GroupMetadata("groupId", Empty, time)
 
@@ -277,10 +277,10 @@ class GroupMetadataTest {
     val offset = offsetAndMetadata(offset = 37, timestamp = 
time.milliseconds())
     val commitRecordOffset = 3
 
-    group.prepareOffsetCommit(Map(partition -> offset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> offset))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
 
     val offsetRetentionMs = 50000L
     time.sleep(offsetRetentionMs + 1)
@@ -386,170 +386,203 @@ class GroupMetadataTest {
 
   @Test
   def testOffsetCommit(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val partition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = offsetAndMetadata(37)
     val commitRecordOffset = 3
 
     group.prepareOffsetCommit(Map(partition -> offset))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(None, group.offset(partition.topicPartition))
 
     group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
     assertTrue(group.hasOffsets)
-    assertEquals(Some(offset), group.offset(partition))
+    assertEquals(Some(offset), group.offset(partition.topicPartition))
   }
 
   @Test
   def testOffsetCommitFailure(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = offsetAndMetadata(37)
 
-    group.prepareOffsetCommit(Map(partition -> offset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> offset))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(offset), group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.failPendingOffsetWrite(partition, offset)
+    group.failPendingOffsetWrite(topicIdPartition, offset)
     assertFalse(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testOffsetCommitFailureWithAnotherPending(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val firstOffset = offsetAndMetadata(37)
     val secondOffset = offsetAndMetadata(57)
 
-    group.prepareOffsetCommit(Map(partition -> firstOffset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> firstOffset))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(firstOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.prepareOffsetCommit(Map(partition -> secondOffset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> secondOffset))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(secondOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.failPendingOffsetWrite(partition, firstOffset)
+    group.failPendingOffsetWrite(topicIdPartition, firstOffset)
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(secondOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(3L), secondOffset))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(3L), secondOffset))
     assertTrue(group.hasOffsets)
-    assertEquals(Some(secondOffset), group.offset(partition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(Some(secondOffset), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testOffsetCommitWithAnotherPending(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val firstOffset = offsetAndMetadata(37)
     val secondOffset = offsetAndMetadata(57)
 
-    group.prepareOffsetCommit(Map(partition -> firstOffset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> firstOffset))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(firstOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.prepareOffsetCommit(Map(partition -> secondOffset))
+    group.prepareOffsetCommit(Map(topicIdPartition -> secondOffset))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(secondOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(4L), firstOffset))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(4L), firstOffset))
     assertTrue(group.hasOffsets)
-    assertEquals(Some(firstOffset), group.offset(partition))
+    assertEquals(Some(secondOffset), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(Some(firstOffset), 
group.offset(topicIdPartition.topicPartition))
 
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(5L), secondOffset))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(5L), secondOffset))
     assertTrue(group.hasOffsets)
-    assertEquals(Some(secondOffset), group.offset(partition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(Some(secondOffset), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val producerId = 13232L
     val txnOffsetCommit = offsetAndMetadata(37)
     val consumerOffsetCommit = offsetAndMetadata(57)
 
-    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition -> 
txnOffsetCommit))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)), 
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(consumerOffsetCommit), 
group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.onTxnOffsetCommitAppend(producerId, partition, 
CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit))
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit))
+    group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit))
     assertTrue(group.hasOffsets)
-    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+    assertEquals(Some(CommitRecordMetadataAndOffset(Some(3), 
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(Some(consumerOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
 
     group.completePendingTxnOffsetCommit(producerId, isCommit = true)
     assertTrue(group.hasOffsets)
+    assertEquals(None, group.pendingTxnOffsetCommit(producerId, 
topicIdPartition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
     // This is the crucial assertion which validates that we materialize 
offsets in offset order, not transactional order.
-    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+    assertEquals(Some(consumerOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testTransactionBeatsConsumerOffsetCommit(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val producerId = 13232L
     val txnOffsetCommit = offsetAndMetadata(37)
     val consumerOffsetCommit = offsetAndMetadata(57)
 
-    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition -> 
txnOffsetCommit))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)), 
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
 
-    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(consumerOffsetCommit), 
group.pendingOffsetCommit(topicIdPartition))
 
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
-    group.onTxnOffsetCommitAppend(producerId, partition, 
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+    group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(CommitRecordMetadataAndOffset(Some(4), 
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
     // The transactional offset commit hasn't been committed yet, so we should 
materialize the consumer offset commit.
-    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+    assertEquals(Some(consumerOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
 
     group.completePendingTxnOffsetCommit(producerId, isCommit = true)
     assertTrue(group.hasOffsets)
     // The transactional offset commit has been materialized and the 
transactional commit record is later in the log,
     // so it should be materialized.
-    assertEquals(Some(txnOffsetCommit), group.offset(partition))
+    assertEquals(None, group.pendingTxnOffsetCommit(producerId, 
topicIdPartition))
+    assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+    assertEquals(Some(txnOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val producerId = 13232L
     val txnOffsetCommit = offsetAndMetadata(37)
     val consumerOffsetCommit = offsetAndMetadata(57)
 
-    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition -> 
txnOffsetCommit))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
+    assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)), 
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
 
-    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
     assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
+    assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)), 
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(Some(consumerOffsetCommit), 
group.pendingOffsetCommit(topicIdPartition))
 
-    group.onOffsetCommitAppend(partition, 
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
-    group.onTxnOffsetCommitAppend(producerId, partition, 
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+    group.onOffsetCommitAppend(topicIdPartition, 
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+    group.onTxnOffsetCommitAppend(producerId, topicIdPartition, 
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
     assertTrue(group.hasOffsets)
+    assertEquals(Some(CommitRecordMetadataAndOffset(Some(4L), 
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
     // The transactional offset commit hasn't been committed yet, so we should 
materialize the consumer offset commit.
-    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+    assertEquals(Some(consumerOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
 
     group.completePendingTxnOffsetCommit(producerId, isCommit = false)
     assertTrue(group.hasOffsets)
     // The transactional offset commit should be discarded and the consumer 
offset commit should continue to be
     // materialized.
     assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
-    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+    assertEquals(None, group.pendingTxnOffsetCommit(producerId, 
topicIdPartition))
+    assertEquals(Some(consumerOffsetCommit), 
group.offset(topicIdPartition.topicPartition))
   }
 
   @Test
   def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val producerId = 13232L
     val txnOffsetCommit = offsetAndMetadata(37)
 
-    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition -> 
txnOffsetCommit))
     assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
     assertTrue(group.hasOffsets)
-    assertEquals(None, group.offset(partition))
-    group.failPendingTxnOffsetCommit(producerId, partition)
+    assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)), 
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+    assertEquals(None, group.offset(topicIdPartition.topicPartition))
+    group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
     assertFalse(group.hasOffsets)
     assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+    assertEquals(None, group.pendingTxnOffsetCommit(producerId, 
topicIdPartition))
 
     // The commit marker should now have no effect.
     group.completePendingTxnOffsetCommit(producerId, isCommit = true)
@@ -706,21 +739,21 @@ class GroupMetadataTest {
 
   @Test
   def testHasPendingNonTxnOffsets(): Unit = {
-    val partition = new TopicPartition("foo", 0)
+    val partition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = offsetAndMetadata(37)
 
     group.prepareOffsetCommit(Map(partition -> offset))
-    assertTrue(group.hasPendingOffsetCommitsForTopicPartition(partition))
+    
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(partition.topicPartition))
   }
 
   @Test
   def testHasPendingTxnOffsets(): Unit = {
-    val txnPartition = new TopicPartition("foo", 1)
+    val txnPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
     val offset = offsetAndMetadata(37)
     val producerId = 5
 
     group.prepareTxnOffsetCommit(producerId, Map(txnPartition -> offset))
-    assertTrue(group.hasPendingOffsetCommitsForTopicPartition(txnPartition))
+    
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(txnPartition.topicPartition))
 
     assertFalse(group.hasPendingOffsetCommitsForTopicPartition(new 
TopicPartition("non-exist", 0)))
   }


Reply via email to