This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d01662fb0f7 KAFKA-14793: Propagate Topic Ids to the Group Coordinator
during Offsets Commit (#13378)
d01662fb0f7 is described below
commit d01662fb0f7bbb43e6a2d22f5ea88b5bf136057c
Author: Alexandre Dupriez <[email protected]>
AuthorDate: Wed Mar 15 11:20:26 2023 +0000
KAFKA-14793: Propagate Topic Ids to the Group Coordinator during Offsets
Commit (#13378)
This patch refactors the GroupCoordinator to use TopicIdPartition on the
offsets commit paths. This change is required by
https://github.com/apache/kafka/pull/13240 in order to correctly handle topic
ids.
Reviewers: David Jacot <[email protected]>
---
.../kafka/coordinator/group/GroupCoordinator.scala | 18 +-
.../group/GroupCoordinatorAdapter.scala | 14 +-
.../kafka/coordinator/group/GroupMetadata.scala | 36 +-
.../coordinator/group/GroupMetadataManager.scala | 32 +-
.../group/GroupCoordinatorAdapterTest.scala | 18 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 15 +-
.../coordinator/group/GroupCoordinatorTest.scala | 393 +++++++++---------
.../group/GroupMetadataManagerTest.scala | 453 +++++++++++++--------
.../coordinator/group/GroupMetadataTest.scala | 155 ++++---
9 files changed, 654 insertions(+), 480 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index c52d920b583..d706d35d1d6 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.OffsetAndMetadata
import kafka.server._
import kafka.utils.Logging
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.internals.Topic
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
@@ -899,8 +899,8 @@ private[group] class GroupCoordinator(
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
- offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition,
Errors] => Unit,
+ offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicIdPartition,
Errors] => Unit,
requestLocal: RequestLocal =
RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.TXN_OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) =>
k -> error })
@@ -917,8 +917,8 @@ private[group] class GroupCoordinator(
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
- offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition,
Errors] => Unit,
+ offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicIdPartition,
Errors] => Unit,
requestLocal: RequestLocal =
RequestLocal.NoCaching): Unit = {
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) =>
k -> error })
@@ -956,9 +956,9 @@ private[group] class GroupCoordinator(
generationId: Int,
producerId: Long,
producerEpoch: Short,
- offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
+ offsetMetadata:
immutable.Map[TopicIdPartition, OffsetAndMetadata],
requestLocal: RequestLocal,
- responseCallback:
immutable.Map[TopicPartition, Errors] => Unit): Unit = {
+ responseCallback:
immutable.Map[TopicIdPartition, Errors] => Unit): Unit = {
group.inLock {
val validationErrorOpt = validateOffsetCommit(
group,
@@ -1019,8 +1019,8 @@ private[group] class GroupCoordinator(
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
- offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition,
Errors] => Unit,
+ offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
+ responseCallback:
immutable.Map[TopicIdPartition, Errors] => Unit,
requestLocal: RequestLocal): Unit = {
group.inLock {
val validationErrorOpt = validateOffsetCommit(
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
index 3211c160a07..87dba8b1019 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
import kafka.server.{KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils.Implicits.MapExtensionMethods
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
ConsumerGroupHeartbeatResponseData, DeleteGroupsResponseData,
DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData,
JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData,
LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData,
OffsetCommitRequestData, OffsetCommitResponseData, OffsetDeleteRequestData,
OffsetDeleteResponseData, OffsetFetchRequestData, OffsetFetchRespon [...]
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -357,7 +357,7 @@ private[group] class GroupCoordinatorAdapter(
val currentTimeMs = time.milliseconds
val future = new CompletableFuture[OffsetCommitResponseData]()
- def callback(commitStatus: Map[TopicPartition, Errors]): Unit = {
+ def callback(commitStatus: Map[TopicIdPartition, Errors]): Unit = {
val response = new OffsetCommitResponseData()
val byTopics = new mutable.HashMap[String,
OffsetCommitResponseData.OffsetCommitResponseTopic]()
@@ -388,10 +388,10 @@ private[group] class GroupCoordinatorAdapter(
case retentionTimeMs => Some(currentTimeMs + retentionTimeMs)
}
- val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+ val partitions = new mutable.HashMap[TopicIdPartition, OffsetAndMetadata]()
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
- val tp = new TopicPartition(topic.name, partition.partitionIndex)
+ val tp = new TopicIdPartition(Uuid.ZERO_UUID,
partition.partitionIndex, topic.name)
partitions += tp -> createOffsetAndMetadata(
currentTimeMs,
partition.committedOffset,
@@ -424,7 +424,7 @@ private[group] class GroupCoordinatorAdapter(
val currentTimeMs = time.milliseconds
val future = new CompletableFuture[TxnOffsetCommitResponseData]()
- def callback(results: Map[TopicPartition, Errors]): Unit = {
+ def callback(results: Map[TopicIdPartition, Errors]): Unit = {
val response = new TxnOffsetCommitResponseData()
val byTopics = new mutable.HashMap[String,
TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]()
@@ -447,10 +447,10 @@ private[group] class GroupCoordinatorAdapter(
future.complete(response)
}
- val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+ val partitions = new mutable.HashMap[TopicIdPartition, OffsetAndMetadata]()
request.topics.forEach { topic =>
topic.partitions.forEach { partition =>
- val tp = new TopicPartition(topic.name, partition.partitionIndex)
+ val tp = new TopicIdPartition(Uuid.ZERO_UUID,
partition.partitionIndex, topic.name)
partitions += tp -> createOffsetAndMetadata(
currentTimeMs,
partition.committedOffset,
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 543d7ef8ac6..33ea53d6d25 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -24,7 +24,7 @@ import kafka.common.OffsetAndMetadata
import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
import kafka.utils.Implicits._
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition}
import
org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.SchemaException
@@ -631,7 +631,8 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
}
- def onOffsetCommitAppend(topicPartition: TopicPartition,
offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
+ def onOffsetCommitAppend(topicIdPartition: TopicIdPartition,
offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset): Unit = {
+ val topicPartition = topicIdPartition.topicPartition
if (pendingOffsetCommits.contains(topicPartition)) {
if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
throw new IllegalStateException("Cannot complete offset commit write
without providing the metadata of the record " +
@@ -649,26 +650,29 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
}
}
- def failPendingOffsetWrite(topicPartition: TopicPartition, offset:
OffsetAndMetadata): Unit = {
+ def failPendingOffsetWrite(topicIdPartition: TopicIdPartition, offset:
OffsetAndMetadata): Unit = {
+ val topicPartition = topicIdPartition.topicPartition
pendingOffsetCommits.get(topicPartition) match {
case Some(pendingOffset) if offset == pendingOffset =>
pendingOffsetCommits.remove(topicPartition)
case _ =>
}
}
- def prepareOffsetCommit(offsets: Map[TopicPartition, OffsetAndMetadata]):
Unit = {
+ def prepareOffsetCommit(offsets: Map[TopicIdPartition, OffsetAndMetadata]):
Unit = {
receivedConsumerOffsetCommits = true
- pendingOffsetCommits ++= offsets
+ offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+ pendingOffsetCommits += topicIdPartition.topicPartition ->
offsetAndMetadata
+ }
}
- def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition,
OffsetAndMetadata]): Unit = {
+ def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicIdPartition,
OffsetAndMetadata]): Unit = {
trace(s"TxnOffsetCommit for producer $producerId and group $groupId with
offsets $offsets is pending")
receivedTransactionalOffsetCommits = true
val producerOffsets =
pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
- offsets.forKeyValue { (topicPartition, offsetAndMetadata) =>
- producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None,
offsetAndMetadata))
+ offsets.forKeyValue { (topicIdPartition, offsetAndMetadata) =>
+ producerOffsets.put(topicIdPartition.topicPartition,
CommitRecordMetadataAndOffset(None, offsetAndMetadata))
}
}
@@ -679,7 +683,8 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
/* Remove a pending transactional offset commit if the actual offset commit
record was not written to the log.
* We will return an error and the client will retry the request,
potentially to a different coordinator.
*/
- def failPendingTxnOffsetCommit(producerId: Long, topicPartition:
TopicPartition): Unit = {
+ def failPendingTxnOffsetCommit(producerId: Long, topicIdPartition:
TopicIdPartition): Unit = {
+ val topicPartition = topicIdPartition.topicPartition
pendingTransactionalOffsetCommits.get(producerId) match {
case Some(pendingOffsets) =>
val pendingOffsetCommit = pendingOffsets.remove(topicPartition)
@@ -692,8 +697,9 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
}
}
- def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
+ def onTxnOffsetCommitAppend(producerId: Long, topicIdPartition:
TopicIdPartition,
commitRecordMetadataAndOffset:
CommitRecordMetadataAndOffset): Unit = {
+ val topicPartition = topicIdPartition.topicPartition
pendingTransactionalOffsetCommits.get(producerId) match {
case Some(pendingOffset) =>
if (pendingOffset.contains(topicPartition)
@@ -827,6 +833,16 @@ private[group] class GroupMetadata(val groupId: String,
initialState: GroupState
// visible for testing
private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition):
Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)
+ // Used for testing
+ private[group] def pendingOffsetCommit(topicIdPartition: TopicIdPartition):
Option[OffsetAndMetadata] = {
+ pendingOffsetCommits.get(topicIdPartition.topicPartition)
+ }
+
+ // Used for testing
+ private[group] def pendingTxnOffsetCommit(producerId: Long,
topicIdPartition: TopicIdPartition): Option[CommitRecordMetadataAndOffset] = {
+
pendingTransactionalOffsetCommits.get(producerId).flatMap(_.get(topicIdPartition.topicPartition))
+ }
+
def numOffsets: Int = offsets.size
def hasOffsets: Boolean = offsets.nonEmpty || pendingOffsetCommits.nonEmpty
|| pendingTransactionalOffsetCommits.nonEmpty
diff --git
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index b45ee5f4736..2ea0d79662e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -41,7 +41,7 @@ import
org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.{OffsetCommitRequest,
OffsetFetchResponse}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.common.{KafkaException, MessageFormatter,
TopicPartition}
+import org.apache.kafka.common.{KafkaException, MessageFormatter,
TopicIdPartition, TopicPartition}
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue,
OffsetCommitKey, OffsetCommitValue, GroupMetadataKey => GroupMetadataKeyData}
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.common.MetadataVersion.{IBP_0_10_1_IV0,
IBP_2_1_IV0, IBP_2_1_IV1, IBP_2_3_IV0}
@@ -345,8 +345,8 @@ class GroupMetadataManager(brokerId: Int,
*/
def storeOffsets(group: GroupMetadata,
consumerId: String,
- offsetMetadata: immutable.Map[TopicPartition,
OffsetAndMetadata],
- responseCallback: immutable.Map[TopicPartition, Errors] =>
Unit,
+ offsetMetadata: immutable.Map[TopicIdPartition,
OffsetAndMetadata],
+ responseCallback: immutable.Map[TopicIdPartition, Errors]
=> Unit,
producerId: Long = RecordBatch.NO_PRODUCER_ID,
producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit
= {
@@ -375,8 +375,8 @@ class GroupMetadataManager(brokerId: Int,
val timestampType = TimestampType.CREATE_TIME
val timestamp = time.milliseconds()
- val records = filteredOffsetMetadata.map { case (topicPartition,
offsetAndMetadata) =>
- val key = GroupMetadataManager.offsetCommitKey(group.groupId,
topicPartition)
+ val records = filteredOffsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
+ val key = GroupMetadataManager.offsetCommitKey(group.groupId,
topicIdPartition.topicPartition)
val value =
GroupMetadataManager.offsetCommitValue(offsetAndMetadata,
interBrokerProtocolVersion)
new SimpleRecord(timestamp, key, value)
}
@@ -406,11 +406,11 @@ class GroupMetadataManager(brokerId: Int,
val responseError = group.inLock {
if (status.error == Errors.NONE) {
if (!group.is(Dead)) {
- filteredOffsetMetadata.forKeyValue { (topicPartition,
offsetAndMetadata) =>
+ filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
if (isTxnOffsetCommit)
- group.onTxnOffsetCommitAppend(producerId,
topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset),
offsetAndMetadata))
+ group.onTxnOffsetCommitAppend(producerId,
topicIdPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset),
offsetAndMetadata))
else
- group.onOffsetCommitAppend(topicPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
}
}
@@ -422,11 +422,11 @@ class GroupMetadataManager(brokerId: Int,
if (!group.is(Dead)) {
if (!group.hasPendingOffsetCommitsFromProducer(producerId))
removeProducerGroup(producerId, group.groupId)
- filteredOffsetMetadata.forKeyValue { (topicPartition,
offsetAndMetadata) =>
+ filteredOffsetMetadata.forKeyValue { (topicIdPartition,
offsetAndMetadata) =>
if (isTxnOffsetCommit)
- group.failPendingTxnOffsetCommit(producerId,
topicPartition)
+ group.failPendingTxnOffsetCommit(producerId,
topicIdPartition)
else
- group.failPendingOffsetWrite(topicPartition,
offsetAndMetadata)
+ group.failPendingOffsetWrite(topicIdPartition,
offsetAndMetadata)
}
}
@@ -455,11 +455,11 @@ class GroupMetadataManager(brokerId: Int,
}
// compute the final error codes for the commit response
- val commitStatus = offsetMetadata.map { case (topicPartition,
offsetAndMetadata) =>
+ val commitStatus = offsetMetadata.map { case (topicIdPartition,
offsetAndMetadata) =>
if (validateOffsetMetadataLength(offsetAndMetadata.metadata))
- (topicPartition, responseError)
+ (topicIdPartition, responseError)
else
- (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE)
+ (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE)
}
// finally trigger the callback logic passed from the API layer
@@ -480,8 +480,8 @@ class GroupMetadataManager(brokerId: Int,
appendForGroup(group, entries, requestLocal, putCacheCallback)
case None =>
- val commitStatus = offsetMetadata.map { case (topicPartition, _) =>
- (topicPartition, Errors.NOT_COORDINATOR)
+ val commitStatus = offsetMetadata.map { case (topicIdPartition, _) =>
+ (topicIdPartition, Errors.NOT_COORDINATOR)
}
responseCallback(commitStatus)
}
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
index 6704e2317e6..bcf24db2ecc 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala
@@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata
import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback,
SyncGroupCallback}
import kafka.server.RequestLocal
import kafka.utils.MockTime
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.errors.{InvalidGroupIdException,
UnsupportedVersionException}
import org.apache.kafka.common.message.{ConsumerGroupHeartbeatRequestData,
DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData,
HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData,
LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData,
ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData,
OffsetDeleteRequestData, OffsetDeleteResponseData, OffsetFetchRequestData,
OffsetFetchResponseData, SyncGroupRequestData, SyncGr [...]
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol
@@ -662,8 +662,8 @@ class GroupCoordinatorAdapterTest {
val future = adapter.commitOffsets(ctx, data, bufferSupplier)
assertFalse(future.isDone)
- val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+ val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] =>
Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit])
verify(groupCoordinator).handleCommitOffsets(
ArgumentMatchers.eq(data.groupId),
@@ -671,7 +671,7 @@ class GroupCoordinatorAdapterTest {
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(data.generationId),
ArgumentMatchers.eq(Map(
- new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+ new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new
OffsetAndMetadata(
offset = 100,
leaderEpoch = Optional.of[Integer](1),
metadata = "",
@@ -684,7 +684,7 @@ class GroupCoordinatorAdapterTest {
)
capturedCallback.getValue.apply(Map(
- new TopicPartition("foo", 0) -> Errors.NONE
+ new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE
))
val expectedResponseData = new OffsetCommitResponseData()
@@ -732,8 +732,8 @@ class GroupCoordinatorAdapterTest {
val future = adapter.commitTransactionalOffsets(ctx, data, bufferSupplier)
assertFalse(future.isDone)
- val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] =
- ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit])
+ val capturedCallback: ArgumentCaptor[Map[TopicIdPartition, Errors] =>
Unit] =
+ ArgumentCaptor.forClass(classOf[Map[TopicIdPartition, Errors] => Unit])
verify(groupCoordinator).handleTxnCommitOffsets(
ArgumentMatchers.eq(data.groupId),
@@ -743,7 +743,7 @@ class GroupCoordinatorAdapterTest {
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(data.generationId),
ArgumentMatchers.eq(Map(
- new TopicPartition("foo", 0) -> new OffsetAndMetadata(
+ new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new
OffsetAndMetadata(
offset = 100,
leaderEpoch = Optional.of[Integer](1),
metadata = "",
@@ -756,7 +756,7 @@ class GroupCoordinatorAdapterTest {
)
capturedCallback.getValue.apply(Map(
- new TopicPartition("foo", 0) -> Errors.NONE
+ new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> Errors.NONE
))
val expectedData = new TxnOffsetCommitResponseData()
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index bd39b3190e7..40de17749b3 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,7 +26,7 @@ import kafka.coordinator.AbstractCoordinatorConcurrencyTest
import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
@@ -283,8 +283,8 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
callback
}
override def runWithCallback(member: GroupMember, responseCallback:
CommitOffsetCallback): Unit = {
- val tp = new TopicPartition("topic", 0)
- val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "",
Time.SYSTEM.milliseconds()))
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+ val offsets = immutable.Map(tip -> OffsetAndMetadata(1, "",
Time.SYSTEM.milliseconds()))
groupCoordinator.handleCommitOffsets(member.groupId, member.memberId,
member.groupInstanceId, member.generationId, offsets, responseCallback)
replicaManager.tryCompleteActions()
@@ -297,14 +297,13 @@ class GroupCoordinatorConcurrencyTest extends
AbstractCoordinatorConcurrencyTest
class CommitTxnOffsetsOperation(lock: Option[Lock] = None) extends
CommitOffsetsOperation {
override def runWithCallback(member: GroupMember, responseCallback:
CommitOffsetCallback): Unit = {
- val tp = new TopicPartition("topic", 0)
- val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "",
Time.SYSTEM.milliseconds()))
+ val offsets = immutable.Map(new TopicIdPartition(Uuid.randomUuid(), 0,
"topic") -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
val producerId = 1000L
val producerEpoch : Short = 2
// When transaction offsets are appended to the log, transactions may be
scheduled for
// completion. Since group metadata locks are acquired for transaction
completion, include
// this in the callback to test that there are no deadlocks.
- def callbackWithTxnCompletion(errors: Map[TopicPartition, Errors]): Unit
= {
+ def callbackWithTxnCompletion(errors: Map[TopicIdPartition, Errors]):
Unit = {
val offsetsPartitions = (0 to numPartitions).map(new
TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, _))
groupCoordinator.groupManager.scheduleHandleTxnCompletion(producerId,
offsetsPartitions.map(_.partition).toSet, isCommit =
random.nextBoolean)
@@ -373,8 +372,8 @@ object GroupCoordinatorConcurrencyTest {
type HeartbeatCallback = Errors => Unit
type OffsetFetchCallbackParams = (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData])
type OffsetFetchCallback = (Errors, Map[TopicPartition,
OffsetFetchResponse.PartitionData]) => Unit
- type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
- type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+ type CommitOffsetCallbackParams = Map[TopicIdPartition, Errors]
+ type CommitOffsetCallback = Map[TopicIdPartition, Errors] => Unit
type LeaveGroupCallbackParams = LeaveGroupResult
type LeaveGroupCallback = LeaveGroupResult => Unit
type CompleteTxnCallbackParams = Errors
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 5a9ff6ae6ea..cbb1e5a7d60 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -22,7 +22,7 @@ import kafka.common.OffsetAndMetadata
import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig,
ReplicaManager, RequestLocal}
import kafka.utils._
import kafka.utils.timer.MockTimer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, RecordBatch}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -60,8 +60,8 @@ class GroupCoordinatorTest {
type SyncGroupCallback = SyncGroupResult => Unit
type HeartbeatCallbackParams = Errors
type HeartbeatCallback = Errors => Unit
- type CommitOffsetCallbackParams = Map[TopicPartition, Errors]
- type CommitOffsetCallback = Map[TopicPartition, Errors] => Unit
+ type CommitOffsetCallbackParams = Map[TopicIdPartition, Errors]
+ type CommitOffsetCallback = Map[TopicIdPartition, Errors] => Unit
type LeaveGroupCallback = LeaveGroupResult => Unit
val ClientId = "consumer-test"
@@ -160,11 +160,11 @@ class GroupCoordinatorTest {
assertEquals(Some(Errors.REBALANCE_IN_PROGRESS), syncGroupResponse)
// OffsetCommit
- val topicPartition = new TopicPartition("foo", 0)
- var offsetCommitErrors = Map.empty[TopicPartition, Errors]
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0 , "foo")
+ var offsetCommitErrors = Map.empty[TopicIdPartition, Errors]
groupCoordinator.handleCommitOffsets(otherGroupId, memberId, None, 1,
- Map(topicPartition -> offsetAndMetadata(15L)), result => {
offsetCommitErrors = result })
- assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS),
offsetCommitErrors.get(topicPartition))
+ Map(topicIdPartition -> offsetAndMetadata(15L)), result => {
offsetCommitErrors = result })
+ assertEquals(Map(topicIdPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS),
offsetCommitErrors)
// Heartbeat
var heartbeatError: Option[Errors] = None
@@ -1268,12 +1268,12 @@ class GroupCoordinatorTest {
val memberId = "memberId"
val deadGroupId = "deadGroupId"
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
groupCoordinator.groupManager.addGroup(new GroupMetadata(deadGroupId,
Dead, new MockTime()))
- val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tp ->
offset))
- assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, offsetCommitResult(tp))
+ val offsetCommitResult = commitOffsets(deadGroupId, memberId, 1, Map(tip
-> offset))
+ assertEquals(Map(tip -> Errors.COORDINATOR_NOT_AVAILABLE),
offsetCommitResult)
}
@Test
@@ -1283,14 +1283,14 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId, rebalanceResult.generation,
rebalanceResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult.error)
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
- val validOffsetCommitResult = commitOffsets(groupId,
rebalanceResult.leaderId, rebalanceResult.generation, Map(tp -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(tp))
+ val validOffsetCommitResult = commitOffsets(groupId,
rebalanceResult.leaderId, rebalanceResult.generation, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult)
val invalidOffsetCommitResult = commitOffsets(groupId, invalidMemberId,
rebalanceResult.generation,
- Map(tp -> offset), Some(leaderInstanceId))
- assertEquals(Errors.FENCED_INSTANCE_ID, invalidOffsetCommitResult(tp))
+ Map(tip -> offset), Some(leaderInstanceId))
+ assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID),
invalidOffsetCommitResult)
}
@Test
@@ -1785,7 +1785,7 @@ class GroupCoordinatorTest {
def testCommitMaintainsSession(): Unit = {
val sessionTimeout = 1000
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType,
protocols,
@@ -1800,8 +1800,8 @@ class GroupCoordinatorTest {
timer.advanceClock(sessionTimeout / 2)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
timer.advanceClock(sessionTimeout / 2 + 100)
@@ -2524,21 +2524,21 @@ class GroupCoordinatorTest {
@Test
def testCommitOffsetFromUnknownGroup(): Unit = {
val generationId = 1
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
- val commitOffsetResult = commitOffsets(groupId, memberId, generationId,
Map(tp -> offset))
- assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, memberId, generationId,
Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult)
}
@Test
def testCommitOffsetWithDefaultGeneration(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId,
OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
}
@Test
@@ -2558,33 +2558,33 @@ class GroupCoordinatorTest {
verifyLeaveGroupResult(leaveGroupResults)
// The simple offset commit should now fail
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val commitOffsetResult = commitOffsets(groupId,
OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+ assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
}
@Test
def testFetchOffsets(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = 97L
val metadata = "some metadata"
val leaderEpoch = Optional.of[Integer](15)
val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata,
timer.time.milliseconds(), None)
val commitOffsetResult = commitOffsets(groupId,
OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offsetAndMetadata))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offsetAndMetadata))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, error)
- val maybePartitionData = partitionData.get(tp)
+ val maybePartitionData = partitionData.get(tip.topicPartition)
assertTrue(maybePartitionData.isDefined)
assertEquals(offset, maybePartitionData.get.offset)
assertEquals(metadata, maybePartitionData.get.metadata)
@@ -2596,17 +2596,17 @@ class GroupCoordinatorTest {
// For backwards compatibility, the coordinator supports
committing/fetching offsets with an empty groupId.
// To allow inspection and removal of the empty group, we must also
support DescribeGroups and DeleteGroups
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val groupId = ""
val commitOffsetResult = commitOffsets(groupId,
OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (fetchError, partitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (fetchError, partitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, fetchError)
- assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+ assertEquals(Some(0), partitionData.get(tip.topicPartition).map(_.offset))
val (describeError, summary) =
groupCoordinator.handleDescribeGroup(groupId)
assertEquals(Errors.NONE, describeError)
@@ -2622,26 +2622,26 @@ class GroupCoordinatorTest {
val deleteErrors = groupCoordinator.handleDeleteGroups(Set(groupId))
assertEquals(Errors.NONE, deleteErrors(groupId))
- val (err, data) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (err, data) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, err)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
data.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
data.get(tip.topicPartition).map(_.offset))
}
@Test
def testBasicFetchTxnOffsets(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
// Validate that the offset isn't materialjzed yet.
assertEquals(Errors.NONE, error)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tip.topicPartition).map(_.offset))
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
@@ -2649,50 +2649,50 @@ class GroupCoordinatorTest {
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.COMMIT)
// Validate that committed offset is materialized.
- val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, secondReqError)
- assertEquals(Some(0), secondReqPartitionData.get(tp).map(_.offset))
+ assertEquals(Some(0),
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
}
@Test
def testFetchTxnOffsetsWithAbort(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tip.topicPartition).map(_.offset))
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
// Validate that the pending commit is discarded.
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.ABORT)
- val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, secondReqError)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
}
@Test
def testFetchPendingTxnOffsetsWithAbort(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
val nonExistTp = new TopicPartition("non-exist-topic", 0)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp, nonExistTp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition, nonExistTp)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tp).map(_.offset))
- assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT),
partitionData.get(tp).map(_.error))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tip.topicPartition).map(_.offset))
+ assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT),
partitionData.get(tip.topicPartition).map(_.error))
assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(nonExistTp).map(_.offset))
assertEquals(Some(Errors.NONE), partitionData.get(nonExistTp).map(_.error))
@@ -2701,65 +2701,65 @@ class GroupCoordinatorTest {
// Validate that the pending commit is discarded.
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.ABORT)
- val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, secondReqError)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tp).map(_.offset))
- assertEquals(Some(Errors.NONE),
secondReqPartitionData.get(tp).map(_.error))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
+ assertEquals(Some(Errors.NONE),
secondReqPartitionData.get(tip.topicPartition).map(_.error))
}
@Test
def testFetchPendingTxnOffsetsWithCommit(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "offset")
val offset = offsetAndMetadata(25)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tp).map(_.offset))
- assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT),
partitionData.get(tp).map(_.error))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tip.topicPartition).map(_.offset))
+ assertEquals(Some(Errors.UNSTABLE_OFFSET_COMMIT),
partitionData.get(tip.topicPartition).map(_.error))
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
// Validate that the pending commit is committed
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.COMMIT)
- val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, secondReqError)
- assertEquals(Some(25), secondReqPartitionData.get(tp).map(_.offset))
- assertEquals(Some(Errors.NONE),
secondReqPartitionData.get(tp).map(_.error))
+ assertEquals(Some(25),
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
+ assertEquals(Some(Errors.NONE),
secondReqPartitionData.get(tip.topicPartition).map(_.error))
}
@Test
def testFetchTxnOffsetsIgnoreSpuriousCommit(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
- val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
- val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tp)))
+ val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable, Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, error)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData.get(tip.topicPartition).map(_.offset))
val offsetsTopic = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME,
groupPartitionId)
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.ABORT)
- val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (secondReqError, secondReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, secondReqError)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
secondReqPartitionData.get(tip.topicPartition).map(_.offset))
// Ignore spurious commit.
handleTxnCompletion(producerId, List(offsetsTopic),
TransactionResult.COMMIT)
- val (thirdReqError, thirdReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable, Some(Seq(tp)))
+ val (thirdReqError, thirdReqPartitionData) =
groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(Seq(tip.topicPartition)))
assertEquals(Errors.NONE, thirdReqError)
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
thirdReqPartitionData.get(tp).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
thirdReqPartitionData.get(tip.topicPartition).map(_.offset))
}
@Test
@@ -2768,7 +2768,10 @@ class GroupCoordinatorTest {
// Both group have pending offset commits.
// Marker for only one partition is received. That commit should be
materialized while the other should not.
- val partitions = List(new TopicPartition("topic1", 0), new
TopicPartition("topic2", 0))
+ val topicIdPartitions = List(
+ new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"),
+ new TopicIdPartition(Uuid.randomUuid(), 0, "topic2")
+ )
val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerId = 1000L
val producerEpoch: Short = 3
@@ -2786,21 +2789,22 @@ class GroupCoordinatorTest {
// Ensure that the two groups map to different partitions.
assertNotEquals(offsetTopicPartitions(0), offsetTopicPartitions(1))
- commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(partitions(0) -> offsets(0))))
- assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
- commitOffsetResults.append(commitTransactionalOffsets(otherGroupId,
producerId, producerEpoch, Map(partitions(1) -> offsets(1))))
- assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId, producerId,
producerEpoch, Map(topicIdPartitions(0) -> offsets(0))))
+ assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0)))
+ commitOffsetResults.append(commitTransactionalOffsets(otherGroupId,
producerId, producerEpoch, Map(topicIdPartitions(1) -> offsets(1))))
+ assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1)))
// We got a commit for only one __consumer_offsets partition. We should
only materialize it's group offsets.
+ val topicPartitions = topicIdPartitions.map(_.topicPartition)
handleTxnCompletion(producerId, List(offsetTopicPartitions(0)),
TransactionResult.COMMIT)
- groupCoordinator.handleFetchOffsets(groupIds(0), requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupIds(0), requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
case _ =>
}
- groupCoordinator.handleFetchOffsets(groupIds(1), requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupIds(1), requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
@@ -2812,33 +2816,33 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, errors(1))
// Exactly one offset commit should have been materialized.
- assertEquals(Some(offsets(0).offset),
partitionData(0).get(partitions(0)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(partitions(1)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(partitions(0)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(partitions(1)).map(_.offset))
+ assertEquals(Some(offsets(0).offset),
partitionData(0).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(topicPartitions(1)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(topicPartitions(1)).map(_.offset))
// Now we receive the other marker.
handleTxnCompletion(producerId, List(offsetTopicPartitions(1)),
TransactionResult.COMMIT)
errors.clear()
partitionData.clear()
- groupCoordinator.handleFetchOffsets(groupIds(0), requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupIds(0), requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
case _ =>
}
- groupCoordinator.handleFetchOffsets(groupIds(1), requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupIds(1), requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
case _ =>
}
// Two offsets should have been materialized
- assertEquals(Some(offsets(0).offset),
partitionData(0).get(partitions(0)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(partitions(1)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(partitions(0)).map(_.offset))
- assertEquals(Some(offsets(1).offset),
partitionData(1).get(partitions(1)).map(_.offset))
+ assertEquals(Some(offsets(0).offset),
partitionData(0).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(topicPartitions(1)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(1).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(offsets(1).offset),
partitionData(1).get(topicPartitions(1)).map(_.offset))
}
@Test
@@ -2847,7 +2851,10 @@ class GroupCoordinatorTest {
// Different producers will commit offsets for different partitions.
// Each partition's offsets should be materialized when the corresponding
producer's marker is received.
- val partitions = List(new TopicPartition("topic1", 0), new
TopicPartition("topic2", 0))
+ val topicIdPartitions = List(
+ new TopicIdPartition(Uuid.randomUuid(), 0, "topic1"),
+ new TopicIdPartition(Uuid.randomUuid(), 0, "topic2")
+ )
val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
val producerIds = List(1000L, 1005L)
val producerEpochs: Seq[Short] = List(3, 4)
@@ -2860,16 +2867,17 @@ class GroupCoordinatorTest {
val commitOffsetResults = mutable.ArrayBuffer[CommitOffsetCallbackParams]()
// producer0 commits the offsets for partition0
- commitOffsetResults.append(commitTransactionalOffsets(groupId,
producerIds(0), producerEpochs(0), Map(partitions(0) -> offsets(0))))
- assertEquals(Errors.NONE, commitOffsetResults(0)(partitions(0)))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId,
producerIds(0), producerEpochs(0), Map(topicIdPartitions(0) -> offsets(0))))
+ assertEquals(Errors.NONE, commitOffsetResults(0)(topicIdPartitions(0)))
// producer1 commits the offsets for partition1
- commitOffsetResults.append(commitTransactionalOffsets(groupId,
producerIds(1), producerEpochs(1), Map(partitions(1) -> offsets(1))))
- assertEquals(Errors.NONE, commitOffsetResults(1)(partitions(1)))
+ commitOffsetResults.append(commitTransactionalOffsets(groupId,
producerIds(1), producerEpochs(1), Map(topicIdPartitions(1) -> offsets(1))))
+ assertEquals(Errors.NONE, commitOffsetResults(1)(topicIdPartitions(1)))
// producer0 commits its transaction.
+ val topicPartitions = topicIdPartitions.map(_.topicPartition)
handleTxnCompletion(producerIds(0), List(offsetTopicPartition),
TransactionResult.COMMIT)
- groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
@@ -2879,13 +2887,13 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, errors(0))
// We should only see the offset commit for producer0
- assertEquals(Some(offsets(0).offset),
partitionData(0).get(partitions(0)).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(partitions(1)).map(_.offset))
+ assertEquals(Some(offsets(0).offset),
partitionData(0).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
partitionData(0).get(topicPartitions(1)).map(_.offset))
// producer1 now commits its transaction.
handleTxnCompletion(producerIds(1), List(offsetTopicPartition),
TransactionResult.COMMIT)
- groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(partitions)) match {
+ groupCoordinator.handleFetchOffsets(groupId, requireStable,
Some(topicPartitions)) match {
case (error, partData) =>
errors.append(error)
partitionData.append(partData)
@@ -2895,8 +2903,8 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, errors(1))
// We should now see the offset commits for both producers.
- assertEquals(Some(offsets(0).offset),
partitionData(1).get(partitions(0)).map(_.offset))
- assertEquals(Some(offsets(1).offset),
partitionData(1).get(partitions(1)).map(_.offset))
+ assertEquals(Some(offsets(0).offset),
partitionData(1).get(topicPartitions(0)).map(_.offset))
+ assertEquals(Some(offsets(1).offset),
partitionData(1).get(topicPartitions(1)).map(_.offset))
}
@Test
@@ -2917,9 +2925,9 @@ class GroupCoordinatorTest {
@Test
def testFetchAllOffsets(): Unit = {
- val tp1 = new TopicPartition("topic", 0)
- val tp2 = new TopicPartition("topic", 1)
- val tp3 = new TopicPartition("other-topic", 0)
+ val tip1 = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+ val tip2 = new TopicIdPartition(tip1.topicId, 1, "topic")
+ val tip3 = new TopicIdPartition(Uuid.randomUuid(), 0, "other-topic")
val offset1 = offsetAndMetadata(15)
val offset2 = offsetAndMetadata(16)
val offset3 = offsetAndMetadata(17)
@@ -2927,24 +2935,22 @@ class GroupCoordinatorTest {
assertEquals((Errors.NONE, Map.empty),
groupCoordinator.handleFetchOffsets(groupId, requireStable))
val commitOffsetResult = commitOffsets(groupId,
OffsetCommitRequest.DEFAULT_MEMBER_ID,
- OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp1 -> offset1, tp2 ->
offset2, tp3 -> offset3))
- assertEquals(Errors.NONE, commitOffsetResult(tp1))
- assertEquals(Errors.NONE, commitOffsetResult(tp2))
- assertEquals(Errors.NONE, commitOffsetResult(tp3))
+ OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tip1 -> offset1, tip2 ->
offset2, tip3 -> offset3))
+ assertEquals(Map(tip1 -> Errors.NONE, tip2 -> Errors.NONE, tip3 ->
Errors.NONE), commitOffsetResult)
val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId,
requireStable)
assertEquals(Errors.NONE, error)
assertEquals(3, partitionData.size)
assertTrue(partitionData.forall(_._2.error == Errors.NONE))
- assertEquals(Some(offset1.offset), partitionData.get(tp1).map(_.offset))
- assertEquals(Some(offset2.offset), partitionData.get(tp2).map(_.offset))
- assertEquals(Some(offset3.offset), partitionData.get(tp3).map(_.offset))
+ assertEquals(Some(offset1.offset),
partitionData.get(tip1.topicPartition).map(_.offset))
+ assertEquals(Some(offset2.offset),
partitionData.get(tip2.topicPartition).map(_.offset))
+ assertEquals(Some(offset3.offset),
partitionData.get(tip3.topicPartition).map(_.offset))
}
@Test
def testCommitOffsetInCompletingRebalance(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType,
protocols)
@@ -2953,14 +2959,14 @@ class GroupCoordinatorTest {
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId, Map(tp -> offset))
- assertEquals(Errors.REBALANCE_IN_PROGRESS, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.REBALANCE_IN_PROGRESS), commitOffsetResult)
}
@Test
def testCommitOffsetInCompletingRebalanceFromUnknownMemberId(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0 , "topic")
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType,
protocols)
@@ -2968,14 +2974,14 @@ class GroupCoordinatorTest {
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
- val commitOffsetResult = commitOffsets(groupId, memberId, generationId,
Map(tp -> offset))
- assertEquals(Errors.UNKNOWN_MEMBER_ID, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, memberId, generationId,
Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID), commitOffsetResult)
}
@Test
def testCommitOffsetInCompletingRebalanceFromIllegalGeneration(): Unit = {
val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val joinGroupResult = dynamicJoinGroup(groupId, memberId, protocolType,
protocols)
@@ -2984,36 +2990,36 @@ class GroupCoordinatorTest {
val joinGroupError = joinGroupResult.error
assertEquals(Errors.NONE, joinGroupError)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId + 1, Map(tp -> offset))
- assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
generationId + 1, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION), commitOffsetResult)
}
@Test
def testManualCommitOffsetShouldNotValidateMemberIdAndInstanceId(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
var commitOffsetResult = commitOffsets(
groupId,
JoinGroupRequest.UNKNOWN_MEMBER_ID,
-1,
- Map(tp -> offsetAndMetadata(0)),
+ Map(tip -> offsetAndMetadata(0)),
Some("instance-id")
)
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
commitOffsetResult = commitOffsets(
groupId,
"unknown",
-1,
- Map(tp -> offsetAndMetadata(0)),
+ Map(tip -> offsetAndMetadata(0)),
None
)
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
}
@Test
def testTxnCommitOffsetWithFencedInstanceId(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -3021,21 +3027,21 @@ class GroupCoordinatorTest {
val rebalanceResult = staticMembersJoinAndRebalance(leaderInstanceId,
followerInstanceId)
val leaderNoMemberIdCommitOffsetResult =
commitTransactionalOffsets(groupId, producerId, producerEpoch,
- Map(tp -> offset), memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupInstanceId = Some(leaderInstanceId))
- assertEquals(Errors.FENCED_INSTANCE_ID,
leaderNoMemberIdCommitOffsetResult(tp))
+ Map(tip -> offset), memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupInstanceId = Some(leaderInstanceId))
+ assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID),
leaderNoMemberIdCommitOffsetResult)
val leaderInvalidMemberIdCommitOffsetResult =
commitTransactionalOffsets(groupId, producerId, producerEpoch,
- Map(tp -> offset), memberId = "invalid-member", groupInstanceId =
Some(leaderInstanceId))
- assertEquals(Errors.FENCED_INSTANCE_ID,
leaderInvalidMemberIdCommitOffsetResult (tp))
+ Map(tip -> offset), memberId = "invalid-member", groupInstanceId =
Some(leaderInstanceId))
+ assertEquals(Map(tip -> Errors.FENCED_INSTANCE_ID),
leaderInvalidMemberIdCommitOffsetResult)
val leaderCommitOffsetResult = commitTransactionalOffsets(groupId,
producerId, producerEpoch,
- Map(tp -> offset), rebalanceResult.leaderId, Some(leaderInstanceId),
rebalanceResult.generation)
- assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+ Map(tip -> offset), rebalanceResult.leaderId, Some(leaderInstanceId),
rebalanceResult.generation)
+ assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
}
@Test
def testTxnCommitOffsetWithInvalidMemberId(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -3045,13 +3051,13 @@ class GroupCoordinatorTest {
assertEquals(Errors.NONE, joinGroupError)
val invalidIdCommitOffsetResult = commitTransactionalOffsets(groupId,
producerId, producerEpoch,
- Map(tp -> offset), "invalid-member")
- assertEquals(Errors.UNKNOWN_MEMBER_ID, invalidIdCommitOffsetResult (tp))
+ Map(tip -> offset), "invalid-member")
+ assertEquals(Map(tip -> Errors.UNKNOWN_MEMBER_ID),
invalidIdCommitOffsetResult)
}
@Test
def testTxnCommitOffsetWithKnownMemberId(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -3063,13 +3069,13 @@ class GroupCoordinatorTest {
val assignedConsumerId = joinGroupResult.memberId
val leaderCommitOffsetResult = commitTransactionalOffsets(groupId,
producerId, producerEpoch,
- Map(tp -> offset), assignedConsumerId, generationId =
joinGroupResult.generationId)
- assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+ Map(tip -> offset), assignedConsumerId, generationId =
joinGroupResult.generationId)
+ assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
}
@Test
def testTxnCommitOffsetWithIllegalGeneration(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -3082,13 +3088,13 @@ class GroupCoordinatorTest {
val assignedConsumerId = joinGroupResult.memberId
val initialGenerationId = joinGroupResult.generationId
val illegalGenerationCommitOffsetResult =
commitTransactionalOffsets(groupId, producerId, producerEpoch,
- Map(tp -> offset), memberId = assignedConsumerId, generationId =
initialGenerationId + 5)
- assertEquals(Errors.ILLEGAL_GENERATION,
illegalGenerationCommitOffsetResult(tp))
+ Map(tip -> offset), memberId = assignedConsumerId, generationId =
initialGenerationId + 5)
+ assertEquals(Map(tip -> Errors.ILLEGAL_GENERATION),
illegalGenerationCommitOffsetResult)
}
@Test
def testTxnCommitOffsetWithLegalGeneration(): Unit = {
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
val producerId = 1000L
val producerEpoch : Short = 2
@@ -3101,8 +3107,8 @@ class GroupCoordinatorTest {
val assignedConsumerId = joinGroupResult.memberId
val initialGenerationId = joinGroupResult.generationId
val leaderCommitOffsetResult = commitTransactionalOffsets(groupId,
producerId, producerEpoch,
- Map(tp -> offset), memberId = assignedConsumerId, generationId =
initialGenerationId)
- assertEquals(Errors.NONE, leaderCommitOffsetResult (tp))
+ Map(tip -> offset), memberId = assignedConsumerId, generationId =
initialGenerationId)
+ assertEquals(Map(tip -> Errors.NONE), leaderCommitOffsetResult)
}
@Test
@@ -3476,10 +3482,10 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, assignedMemberId, Map(assignedMemberId ->
Array[Byte]()))
assertEquals(Errors.NONE, syncGroupResult.error)
- val tp = new TopicPartition("topic", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
val offset = offsetAndMetadata(0)
- val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
joinGroupResult.generationId, Map(tp -> offset))
- assertEquals(Errors.NONE, commitOffsetResult(tp))
+ val commitOffsetResult = commitOffsets(groupId, assignedMemberId,
joinGroupResult.generationId, Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), commitOffsetResult)
val describeGroupResult = groupCoordinator.handleDescribeGroup(groupId)
assertEquals(Stable.toString, describeGroupResult._2.state)
@@ -3534,14 +3540,13 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult.error)
- val t1p0 = new TopicPartition("foo", 0)
- val t2p0 = new TopicPartition("bar", 0)
+ val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
val offset = offsetAndMetadata(37)
val validOffsetCommitResult = commitOffsets(groupId,
joinGroupResult.memberId, joinGroupResult.generationId,
- Map(t1p0 -> offset, t2p0 -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
- assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+ Map(ti1p0 -> offset, ti2p0 -> offset))
+ assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE),
validOffsetCommitResult)
// and leaves.
val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
@@ -3556,17 +3561,17 @@ class GroupCoordinatorTest {
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0),
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(ti1p0.topicPartition),
RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
- assertEquals(Some(Errors.NONE), topics.get(t1p0))
+ assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
- val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(t1p0, t2p0)))
+ val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(t1p0).map(_.offset))
- assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+ assertEquals(Some(offset.offset),
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
}
@Test
@@ -3577,19 +3582,19 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult.error)
- val tp = new TopicPartition("foo", 0)
+ val tip = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = offsetAndMetadata(37)
val validOffsetCommitResult = commitOffsets(groupId,
joinGroupResult.memberId, joinGroupResult.generationId,
- Map(tp -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(tp))
+ Map(tip -> offset))
+ assertEquals(Map(tip -> Errors.NONE), validOffsetCommitResult)
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tp),
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(tip.topicPartition),
RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
- assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(tp))
+ assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC),
topics.get(tip.topicPartition))
}
@Test
@@ -3616,14 +3621,13 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult.error)
- val t1p0 = new TopicPartition("foo", 0)
- val t2p0 = new TopicPartition("bar", 0)
+ val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
val offset = offsetAndMetadata(37)
val validOffsetCommitResult = commitOffsets(groupId,
joinGroupResult.memberId, joinGroupResult.generationId,
- Map(t1p0 -> offset, t2p0 -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
- assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+ Map(ti1p0 -> offset, ti2p0 -> offset))
+ assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE),
validOffsetCommitResult)
// and leaves.
val leaveGroupResults = singleLeaveGroup(groupId, joinGroupResult.memberId)
@@ -3638,17 +3642,17 @@ class GroupCoordinatorTest {
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0),
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(ti1p0.topicPartition),
RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(1, topics.size)
- assertEquals(Some(Errors.NONE), topics.get(t1p0))
+ assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
- val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(t1p0, t2p0)))
+ val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(t1p0).map(_.offset))
- assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+ assertEquals(Some(offset.offset),
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
}
@Test
@@ -3664,14 +3668,13 @@ class GroupCoordinatorTest {
val syncGroupResult = syncGroupLeader(groupId,
joinGroupResult.generationId, joinGroupResult.leaderId, Map.empty)
assertEquals(Errors.NONE, syncGroupResult.error)
- val t1p0 = new TopicPartition("foo", 0)
- val t2p0 = new TopicPartition("bar", 0)
+ val ti1p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val ti2p0 = new TopicIdPartition(Uuid.randomUuid(), 0, "bar")
val offset = offsetAndMetadata(37)
val validOffsetCommitResult = commitOffsets(groupId,
joinGroupResult.memberId, joinGroupResult.generationId,
- Map(t1p0 -> offset, t2p0 -> offset))
- assertEquals(Errors.NONE, validOffsetCommitResult(t1p0))
- assertEquals(Errors.NONE, validOffsetCommitResult(t2p0))
+ Map(ti1p0 -> offset, ti2p0 -> offset))
+ assertEquals(Map(ti1p0 -> Errors.NONE, ti2p0 -> Errors.NONE),
validOffsetCommitResult)
assertTrue(groupCoordinator.groupManager.getGroup(groupId).exists(_.is(Stable)))
@@ -3682,18 +3685,18 @@ class GroupCoordinatorTest {
when(replicaManager.getPartition(groupTopicPartition)).thenReturn(HostedPartition.Online(partition))
when(replicaManager.onlinePartition(groupTopicPartition)).thenReturn(Some(partition))
- val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(t1p0, t2p0),
+ val (groupError, topics) = groupCoordinator.handleDeleteOffsets(groupId,
Seq(ti1p0.topicPartition, ti2p0.topicPartition),
RequestLocal.NoCaching)
assertEquals(Errors.NONE, groupError)
assertEquals(2, topics.size)
- assertEquals(Some(Errors.NONE), topics.get(t1p0))
- assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC), topics.get(t2p0))
+ assertEquals(Some(Errors.NONE), topics.get(ti1p0.topicPartition))
+ assertEquals(Some(Errors.GROUP_SUBSCRIBED_TO_TOPIC),
topics.get(ti2p0.topicPartition))
- val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(t1p0, t2p0)))
+ val cachedOffsets = groupCoordinator.groupManager.getOffsets(groupId,
requireStable, Some(Seq(ti1p0.topicPartition, ti2p0.topicPartition)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(t1p0).map(_.offset))
- assertEquals(Some(offset.offset), cachedOffsets.get(t2p0).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(ti1p0.topicPartition).map(_.offset))
+ assertEquals(Some(offset.offset),
cachedOffsets.get(ti2p0.topicPartition).map(_.offset))
}
@Test
@@ -4024,7 +4027,7 @@ class GroupCoordinatorTest {
private def commitOffsets(groupId: String,
memberId: String,
generationId: Int,
- offsets: Map[TopicPartition, OffsetAndMetadata],
+ offsets: Map[TopicIdPartition, OffsetAndMetadata],
groupInstanceId: Option[String] = None):
CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
@@ -4055,10 +4058,10 @@ class GroupCoordinatorTest {
private def commitTransactionalOffsets(groupId: String,
producerId: Long,
producerEpoch: Short,
- offsets: Map[TopicPartition,
OffsetAndMetadata],
+ offsets: Map[TopicIdPartition,
OffsetAndMetadata],
memberId: String =
JoinGroupRequest.UNKNOWN_MEMBER_ID,
groupInstanceId: Option[String] =
Option.empty,
- generationId: Int =
JoinGroupRequest.UNKNOWN_GENERATION_ID) = {
+ generationId: Int =
JoinGroupRequest.UNKNOWN_GENERATION_ID) : CommitOffsetCallbackParams = {
val (responseFuture, responseCallback) = setupCommitOffsetsCallback
val capturedArgument: ArgumentCaptor[scala.collection.Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[scala.collection.Map[TopicPartition,
PartitionResponse] => Unit])
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index e24530918b5..333aeac5af7 100644
---
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -32,7 +32,7 @@ import kafka.utils.{MockTime, TestUtils}
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext,
Metrics => kMetrics}
import org.apache.kafka.common.protocol.Errors
@@ -1248,7 +1248,7 @@ class GroupMetadataManagerTest {
@Test
def testCommitOffset(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1256,11 +1256,11 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
+ val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1269,12 +1269,12 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- val maybeError = commitErrors.get.get(topicPartition)
+ val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.NONE), maybeError)
assertTrue(group.hasOffsets)
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition)))
- val maybePartitionResponse = cachedOffsets.get(topicPartition)
+ val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicIdPartition.topicPartition)))
+ val maybePartitionResponse =
cachedOffsets.get(topicIdPartition.topicPartition)
assertFalse(maybePartitionResponse.isEmpty)
val partitionResponse = maybePartitionResponse.get
@@ -1297,7 +1297,7 @@ class GroupMetadataManagerTest {
@Test
def testTransactionalCommitOffsetCommitted(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
@@ -1308,12 +1308,12 @@ class GroupMetadataManagerTest {
groupMetadataManager.addGroup(group)
val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
- val offsets = immutable.Map(topicPartition -> offsetAndMetadata)
+ val offsets = immutable.Map(topicIdPartition -> offsetAndMetadata)
val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition,
PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1340,13 +1340,13 @@ class GroupMetadataManagerTest {
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
assertFalse(group.allOffsets.isEmpty)
- assertEquals(Some(offsetAndMetadata), group.offset(topicPartition))
+ assertEquals(Some(offsetAndMetadata),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testTransactionalCommitOffsetAppendFailure(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
@@ -1356,12 +1356,12 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
+ val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1394,7 +1394,7 @@ class GroupMetadataManagerTest {
@Test
def testTransactionalCommitOffsetAborted(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
val producerId = 232L
val producerEpoch = 0.toShort
@@ -1404,12 +1404,12 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
+ val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1443,7 +1443,7 @@ class GroupMetadataManagerTest {
def testCommitOffsetWhenCoordinatorHasMoved(): Unit = {
when(replicaManager.getMagic(any())).thenReturn(None)
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1451,17 +1451,17 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
+ val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
assertFalse(commitErrors.isEmpty)
- val maybeError = commitErrors.get.get(topicPartition)
+ val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.NOT_COORDINATOR), maybeError)
verify(replicaManager).getMagic(any())
@@ -1482,7 +1482,7 @@ class GroupMetadataManagerTest {
private def assertCommitOffsetErrorMapping(appendError: Errors,
expectedError: Errors): Unit = {
reset(replicaManager)
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1490,12 +1490,12 @@ class GroupMetadataManagerTest {
val group = new GroupMetadata(groupId, Empty, time)
groupMetadataManager.addGroup(group)
- val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
+ val offsets = immutable.Map(topicIdPartition -> OffsetAndMetadata(offset,
"", time.milliseconds()))
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1507,12 +1507,19 @@ class GroupMetadataManagerTest {
new PartitionResponse(appendError, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(commitErrors.isEmpty)
- val maybeError = commitErrors.get.get(topicPartition)
+ val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(expectedError), maybeError)
assertFalse(group.hasOffsets)
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition).map(_.offset))
+ val cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+ )
verify(replicaManager).getMagic(any())
// Will not update sensor if failed
@@ -1522,8 +1529,8 @@ class GroupMetadataManagerTest {
@Test
def testCommitOffsetPartialFailure(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
- val topicPartitionFailed = new TopicPartition("foo", 1)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val topicIdPartitionFailed = new TopicIdPartition(Uuid.randomUuid(), 1,
"foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1532,15 +1539,15 @@ class GroupMetadataManagerTest {
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
- topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
+ topicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds()),
// This will failed
- topicPartitionFailed -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+ topicIdPartitionFailed -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1552,13 +1559,23 @@ class GroupMetadataManagerTest {
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition))
- assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE),
commitErrors.get.get(topicPartitionFailed))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition))
+ assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE),
commitErrors.get.get(topicIdPartitionFailed))
assertTrue(group.hasOffsets)
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition, topicPartitionFailed)))
- assertEquals(Some(offset), cachedOffsets.get(topicPartition).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartitionFailed).map(_.offset))
+ val cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition.topicPartition,
topicIdPartitionFailed.topicPartition))
+ )
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartitionFailed.topicPartition).map(_.offset)
+ )
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
@@ -1576,7 +1593,7 @@ class GroupMetadataManagerTest {
@Test
def testOffsetMetadataTooLarge(): Unit = {
val memberId = ""
- val topicPartition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1584,11 +1601,11 @@ class GroupMetadataManagerTest {
groupMetadataManager.addGroup(group)
val offsets = immutable.Map(
- topicPartition -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
+ topicIdPartition -> OffsetAndMetadata(offset, "s" *
(offsetConfig.maxMetadataSize + 1) , time.milliseconds())
)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1597,12 +1614,19 @@ class GroupMetadataManagerTest {
assertFalse(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- val maybeError = commitErrors.get.get(topicPartition)
+ val maybeError = commitErrors.get.get(topicIdPartition)
assertEquals(Some(Errors.OFFSET_METADATA_TOO_LARGE), maybeError)
assertFalse(group.hasOffsets)
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition).map(_.offset))
+ val cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
+ )
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
}
@@ -1610,8 +1634,8 @@ class GroupMetadataManagerTest {
@Test
def testExpireOffset(): Unit = {
val memberId = ""
- val topicPartition1 = new TopicPartition("foo", 0)
- val topicPartition2 = new TopicPartition("foo", 1)
+ val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1,
"foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1622,13 +1646,13 @@ class GroupMetadataManagerTest {
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
- topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
- topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+ topicIdPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+ topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1636,7 +1660,7 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// expire only one of the offsets
time.sleep(2)
@@ -1647,12 +1671,16 @@ class GroupMetadataManagerTest {
groupMetadataManager.cleanupGroupMetadata()
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
- assertEquals(Some(offset), group.offset(topicPartition2).map(_.offset))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+ assertEquals(Some(offset),
group.offset(topicIdPartition2.topicPartition).map(_.offset))
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition2).map(_.offset))
+ val cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition))
+ )
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
verify(replicaManager).appendRecords(anyLong(),
anyShort(),
@@ -1759,8 +1787,8 @@ class GroupMetadataManagerTest {
// this is a group which is only using kafka for offset storage
val memberId = ""
- val topicPartition1 = new TopicPartition("foo", 0)
- val topicPartition2 = new TopicPartition("foo", 1)
+ val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
+ val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1,
"foo")
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1771,13 +1799,13 @@ class GroupMetadataManagerTest {
// expire the offset after 1 millisecond
val startMs = time.milliseconds
val offsets = immutable.Map(
- topicPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "",
startMs, Some(startMs + 1)),
- topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+ topicIdPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "",
startMs, Some(startMs + 1)),
+ topicIdPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
mockGetPartition()
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1785,7 +1813,7 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// expire all of the offsets
time.sleep(4)
@@ -1811,9 +1839,19 @@ class GroupMetadataManagerTest {
// the full group should be gone since all offsets were removed
assertEquals(None, groupMetadataManager.getGroup(groupId))
- val cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition2).map(_.offset))
+ val cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+ )
verify(replicaManager).onlinePartition(groupTopicPartition)
}
@@ -1824,9 +1862,9 @@ class GroupMetadataManagerTest {
val clientId = "clientId"
val clientHost = "localhost"
val topic = "foo"
- val topicPartition1 = new TopicPartition(topic, 0)
- val topicPartition2 = new TopicPartition(topic, 1)
- val topicPartition3 = new TopicPartition(topic, 2)
+ val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
+ val topicIdPartition2 = new TopicIdPartition(topicIdPartition1.topicId, 1,
topic)
+ val topicIdPartition3 = new TopicIdPartition(topicIdPartition1.topicId, 2,
topic)
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1848,14 +1886,14 @@ class GroupMetadataManagerTest {
// new clients, no per-partition expiry timestamp, offsets of group expire
together
val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map(
- topicPartition1 -> tp1OffsetAndMetadata,
- topicPartition2 -> tp2OffsetAndMetadata,
- topicPartition3 -> tp3OffsetAndMetadata)
+ topicIdPartition1 -> tp1OffsetAndMetadata,
+ topicIdPartition2 -> tp2OffsetAndMetadata,
+ topicIdPartition3 -> tp3OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -1863,7 +1901,7 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// do not expire any offset even though expiration timestamp is reached
for one (due to group still being active)
time.sleep(2)
@@ -1872,14 +1910,18 @@ class GroupMetadataManagerTest {
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
- assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
- assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
- var cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2,
topicPartition3)))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition2).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition3).map(_.offset))
+ assertEquals(Some(tp1OffsetAndMetadata),
group.offset(topicIdPartition1.topicPartition))
+ assertEquals(Some(tp2OffsetAndMetadata),
group.offset(topicIdPartition2.topicPartition))
+ assertEquals(Some(tp3OffsetAndMetadata),
group.offset(topicIdPartition3.topicPartition))
+
+ var cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+ )
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
@@ -1894,14 +1936,18 @@ class GroupMetadataManagerTest {
// group is empty now, only one offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
- assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
- assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2,
topicPartition3)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition2).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition3).map(_.offset))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+ assertEquals(Some(tp2OffsetAndMetadata),
group.offset(topicIdPartition2.topicPartition))
+ assertEquals(Some(tp3OffsetAndMetadata),
group.offset(topicIdPartition3.topicPartition))
+
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+ )
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
@@ -1915,14 +1961,18 @@ class GroupMetadataManagerTest {
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
- assertEquals(None, group.offset(topicPartition2))
- assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2,
topicPartition3)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition2).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition3).map(_.offset))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+ assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+ assertEquals(Some(tp3OffsetAndMetadata),
group.offset(topicIdPartition3.topicPartition))
+
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+ )
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
+ assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset))
verify(replicaManager, times(3)).onlinePartition(groupTopicPartition)
@@ -1933,14 +1983,27 @@ class GroupMetadataManagerTest {
// one more offset should expire
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
- assertEquals(None, group.offset(topicPartition2))
- assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
-
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2,
topicPartition3)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition2).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition3).map(_.offset))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+ assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+ assertEquals(Some(tp3OffsetAndMetadata),
group.offset(topicIdPartition3.topicPartition))
+
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
+ )
verify(replicaManager, times(4)).onlinePartition(groupTopicPartition)
@@ -1955,14 +2018,27 @@ class GroupMetadataManagerTest {
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
- assertEquals(None, group.offset(topicPartition2))
- assertEquals(None, group.offset(topicPartition3))
-
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1, topicPartition2,
topicPartition3)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition2).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition3).map(_.offset))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
+ assertEquals(None, group.offset(topicIdPartition2.topicPartition))
+ assertEquals(None, group.offset(topicIdPartition3.topicPartition))
+
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition,
topicIdPartition2.topicPartition, topicIdPartition3.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition2.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition3.topicPartition).map(_.offset)
+ )
verify(replicaManager, times(5)).onlinePartition(groupTopicPartition)
@@ -1973,7 +2049,7 @@ class GroupMetadataManagerTest {
def testOffsetExpirationOfSimpleConsumer(): Unit = {
val memberId = "memberId"
val topic = "foo"
- val topicPartition1 = new TopicPartition(topic, 0)
+ val topicIdPartition1 = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
val offset = 37
groupMetadataManager.addOwnedPartition(groupPartitionId)
@@ -1987,12 +2063,12 @@ class GroupMetadataManagerTest {
val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
// new clients, no per-partition expiry timestamp, offsets of group expire
together
val offsets = immutable.Map(
- topicPartition1 -> tp1OffsetAndMetadata)
+ topicIdPartition1 -> tp1OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -2000,20 +2076,24 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topicIdPartition1))
// do not expire offsets while within retention period since commit
timestamp
- val expiryTimestamp = offsets(topicPartition1).commitTimestamp +
defaultOffsetRetentionMs
+ val expiryTimestamp = offsets(topicIdPartition1).commitTimestamp +
defaultOffsetRetentionMs
time.sleep(expiryTimestamp - time.milliseconds() - 1)
groupMetadataManager.cleanupGroupMetadata()
// group and offsets should still be there
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
- assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+ assertEquals(Some(tp1OffsetAndMetadata),
group.offset(topicIdPartition1.topicPartition))
- var cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1)))
- assertEquals(Some(offset),
cachedOffsets.get(topicPartition1).map(_.offset))
+ var cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition))
+ )
+ assertEquals(Some(offset),
cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset))
verify(replicaManager).onlinePartition(groupTopicPartition)
@@ -2028,10 +2108,17 @@ class GroupMetadataManagerTest {
// group and all its offsets should be gone now
assertEquals(None, groupMetadataManager.getGroup(groupId))
- assertEquals(None, group.offset(topicPartition1))
+ assertEquals(None, group.offset(topicIdPartition1.topicPartition))
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topicPartition1)))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topicPartition1).map(_.offset))
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(topicIdPartition1.topicPartition))
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topicIdPartition1.topicPartition).map(_.offset)
+ )
verify(replicaManager, times(2)).onlinePartition(groupTopicPartition)
@@ -2045,12 +2132,12 @@ class GroupMetadataManagerTest {
val clientHost = "localhost"
val topic1 = "foo"
- val topic1Partition0 = new TopicPartition(topic1, 0)
- val topic1Partition1 = new TopicPartition(topic1, 1)
+ val topic1IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic1)
+ val topic1IdPartition1 = new TopicIdPartition(topic1IdPartition0.topicId,
1, topic1)
val topic2 = "bar"
- val topic2Partition0 = new TopicPartition(topic2, 0)
- val topic2Partition1 = new TopicPartition(topic2, 1)
+ val topic2IdPartition0 = new TopicIdPartition(Uuid.randomUuid(), 0, topic2)
+ val topic2IdPartition1 = new TopicIdPartition(topic2IdPartition0.topicId,
1, topic2)
val offset = 37
@@ -2087,15 +2174,15 @@ class GroupMetadataManagerTest {
val t2p1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
val offsets = immutable.Map(
- topic1Partition0 -> t1p0OffsetAndMetadata,
- topic1Partition1 -> t1p1OffsetAndMetadata,
- topic2Partition0 -> t2p0OffsetAndMetadata,
- topic2Partition1 -> t2p1OffsetAndMetadata)
+ topic1IdPartition0 -> t1p0OffsetAndMetadata,
+ topic1IdPartition1 -> t1p1OffsetAndMetadata,
+ topic2IdPartition0 -> t2p0OffsetAndMetadata,
+ topic2IdPartition1 -> t2p1OffsetAndMetadata)
mockGetPartition()
expectAppendMessage(Errors.NONE)
- var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
- def callback(errors: immutable.Map[TopicPartition, Errors]): Unit = {
+ var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
+ def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
commitErrors = Some(errors)
}
@@ -2103,7 +2190,7 @@ class GroupMetadataManagerTest {
assertTrue(group.hasOffsets)
assertFalse(commitErrors.isEmpty)
- assertEquals(Some(Errors.NONE), commitErrors.get.get(topic1Partition0))
+ assertEquals(Some(Errors.NONE), commitErrors.get.get(topic1IdPartition0))
// advance time to just after the offset of last partition is to be expired
time.sleep(defaultOffsetRetentionMs + 2)
@@ -2114,17 +2201,38 @@ class GroupMetadataManagerTest {
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assert(group.is(Stable))
- assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1Partition0))
- assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1Partition1))
- assertEquals(Some(t2p0OffsetAndMetadata), group.offset(topic2Partition0))
- assertEquals(Some(t2p1OffsetAndMetadata), group.offset(topic2Partition1))
-
- var cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topic1Partition0, topic1Partition1,
topic2Partition0, topic2Partition1)))
+ assertEquals(Some(t1p0OffsetAndMetadata),
group.offset(topic1IdPartition0.topicPartition))
+ assertEquals(Some(t1p1OffsetAndMetadata),
group.offset(topic1IdPartition1.topicPartition))
+ assertEquals(Some(t2p0OffsetAndMetadata),
group.offset(topic2IdPartition0.topicPartition))
+ assertEquals(Some(t2p1OffsetAndMetadata),
group.offset(topic2IdPartition1.topicPartition))
+
+ var cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(
+ topic1IdPartition0.topicPartition,
+ topic1IdPartition1.topicPartition,
+ topic2IdPartition0.topicPartition,
+ topic2IdPartition1.topicPartition)
+ )
+ )
- assertEquals(Some(offset),
cachedOffsets.get(topic1Partition0).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topic1Partition1).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topic2Partition0).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topic2Partition1).map(_.offset))
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(offset),
+ cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
+ )
verify(replicaManager).onlinePartition(groupTopicPartition)
@@ -2159,17 +2267,32 @@ class GroupMetadataManagerTest {
assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
assert(group.is(Stable))
- assertEquals(Some(t1p0OffsetAndMetadata), group.offset(topic1Partition0))
- assertEquals(Some(t1p1OffsetAndMetadata), group.offset(topic1Partition1))
- assertEquals(None, group.offset(topic2Partition0))
- assertEquals(None, group.offset(topic2Partition1))
-
- cachedOffsets = groupMetadataManager.getOffsets(groupId,
defaultRequireStable, Some(Seq(topic1Partition0, topic1Partition1,
topic2Partition0, topic2Partition1)))
+ assertEquals(Some(t1p0OffsetAndMetadata),
group.offset(topic1IdPartition0.topicPartition))
+ assertEquals(Some(t1p1OffsetAndMetadata),
group.offset(topic1IdPartition1.topicPartition))
+ assertEquals(None, group.offset(topic2IdPartition0.topicPartition))
+ assertEquals(None, group.offset(topic2IdPartition1.topicPartition))
+
+ cachedOffsets = groupMetadataManager.getOffsets(
+ groupId,
+ defaultRequireStable,
+ Some(Seq(
+ topic1IdPartition0.topicPartition,
+ topic1IdPartition1.topicPartition,
+ topic2IdPartition0.topicPartition,
+ topic2IdPartition1.topicPartition)
+ )
+ )
- assertEquals(Some(offset),
cachedOffsets.get(topic1Partition0).map(_.offset))
- assertEquals(Some(offset),
cachedOffsets.get(topic1Partition1).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topic2Partition0).map(_.offset))
- assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET),
cachedOffsets.get(topic2Partition1).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topic1IdPartition0.topicPartition).map(_.offset))
+ assertEquals(Some(offset),
cachedOffsets.get(topic1IdPartition1.topicPartition).map(_.offset))
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topic2IdPartition0.topicPartition).map(_.offset)
+ )
+ assertEquals(
+ Some(OffsetFetchResponse.INVALID_OFFSET),
+ cachedOffsets.get(topic2IdPartition1.topicPartition).map(_.offset)
+ )
}
@Test
diff --git
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index cafc73c4efb..c01d11bc618 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -20,7 +20,7 @@ package kafka.coordinator.group
import kafka.common.OffsetAndMetadata
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.utils.{MockTime, Time}
import org.junit.jupiter.api.Assertions._
@@ -262,7 +262,7 @@ class GroupMetadataTest {
@Test
def testOffsetRemovalDuringTransitionFromEmptyToNonEmpty(): Unit = {
val topic = "foo"
- val partition = new TopicPartition(topic, 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, topic)
val time = new MockTime()
group = new GroupMetadata("groupId", Empty, time)
@@ -277,10 +277,10 @@ class GroupMetadataTest {
val offset = offsetAndMetadata(offset = 37, timestamp =
time.milliseconds())
val commitRecordOffset = 3
- group.prepareOffsetCommit(Map(partition -> offset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> offset))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
val offsetRetentionMs = 50000L
time.sleep(offsetRetentionMs + 1)
@@ -386,170 +386,203 @@ class GroupMetadataTest {
@Test
def testOffsetCommit(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val partition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = offsetAndMetadata(37)
val commitRecordOffset = 3
group.prepareOffsetCommit(Map(partition -> offset))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(None, group.offset(partition.topicPartition))
group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
assertTrue(group.hasOffsets)
- assertEquals(Some(offset), group.offset(partition))
+ assertEquals(Some(offset), group.offset(partition.topicPartition))
}
@Test
def testOffsetCommitFailure(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = offsetAndMetadata(37)
- group.prepareOffsetCommit(Map(partition -> offset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> offset))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(offset), group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.failPendingOffsetWrite(partition, offset)
+ group.failPendingOffsetWrite(topicIdPartition, offset)
assertFalse(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
}
@Test
def testOffsetCommitFailureWithAnotherPending(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val firstOffset = offsetAndMetadata(37)
val secondOffset = offsetAndMetadata(57)
- group.prepareOffsetCommit(Map(partition -> firstOffset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> firstOffset))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(firstOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.prepareOffsetCommit(Map(partition -> secondOffset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> secondOffset))
assertTrue(group.hasOffsets)
+ assertEquals(Some(secondOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.failPendingOffsetWrite(partition, firstOffset)
+ group.failPendingOffsetWrite(topicIdPartition, firstOffset)
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(secondOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(3L), secondOffset))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(3L), secondOffset))
assertTrue(group.hasOffsets)
- assertEquals(Some(secondOffset), group.offset(partition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(Some(secondOffset),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testOffsetCommitWithAnotherPending(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val firstOffset = offsetAndMetadata(37)
val secondOffset = offsetAndMetadata(57)
- group.prepareOffsetCommit(Map(partition -> firstOffset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> firstOffset))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(firstOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.prepareOffsetCommit(Map(partition -> secondOffset))
+ group.prepareOffsetCommit(Map(topicIdPartition -> secondOffset))
assertTrue(group.hasOffsets)
+ assertEquals(Some(secondOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(4L), firstOffset))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(4L), firstOffset))
assertTrue(group.hasOffsets)
- assertEquals(Some(firstOffset), group.offset(partition))
+ assertEquals(Some(secondOffset),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(Some(firstOffset),
group.offset(topicIdPartition.topicPartition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(5L), secondOffset))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(5L), secondOffset))
assertTrue(group.hasOffsets)
- assertEquals(Some(secondOffset), group.offset(partition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(Some(secondOffset),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val producerId = 13232L
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
- group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+ group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition ->
txnOffsetCommit))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)),
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+ group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
assertTrue(group.hasOffsets)
+ assertEquals(Some(consumerOffsetCommit),
group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.onTxnOffsetCommitAppend(producerId, partition,
CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit))
+ group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit))
assertTrue(group.hasOffsets)
- assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+ assertEquals(Some(CommitRecordMetadataAndOffset(Some(3),
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(Some(consumerOffsetCommit),
group.offset(topicIdPartition.topicPartition))
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
+ assertEquals(None, group.pendingTxnOffsetCommit(producerId,
topicIdPartition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
// This is the crucial assertion which validates that we materialize
offsets in offset order, not transactional order.
- assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+ assertEquals(Some(consumerOffsetCommit),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testTransactionBeatsConsumerOffsetCommit(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val producerId = 13232L
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
- group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+ group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition ->
txnOffsetCommit))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)),
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
- group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+ group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
assertTrue(group.hasOffsets)
+ assertEquals(Some(consumerOffsetCommit),
group.pendingOffsetCommit(topicIdPartition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
- group.onTxnOffsetCommitAppend(producerId, partition,
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+ group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
assertTrue(group.hasOffsets)
+ assertEquals(Some(CommitRecordMetadataAndOffset(Some(4),
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
// The transactional offset commit hasn't been committed yet, so we should
materialize the consumer offset commit.
- assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+ assertEquals(Some(consumerOffsetCommit),
group.offset(topicIdPartition.topicPartition))
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
assertTrue(group.hasOffsets)
// The transactional offset commit has been materialized and the
transactional commit record is later in the log,
// so it should be materialized.
- assertEquals(Some(txnOffsetCommit), group.offset(partition))
+ assertEquals(None, group.pendingTxnOffsetCommit(producerId,
topicIdPartition))
+ assertEquals(None, group.pendingOffsetCommit(topicIdPartition))
+ assertEquals(Some(txnOffsetCommit),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val producerId = 13232L
val txnOffsetCommit = offsetAndMetadata(37)
val consumerOffsetCommit = offsetAndMetadata(57)
- group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+ group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition ->
txnOffsetCommit))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
+ assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)),
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
- group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+ group.prepareOffsetCommit(Map(topicIdPartition -> consumerOffsetCommit))
assertTrue(group.hasOffsets)
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
+ assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)),
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(Some(consumerOffsetCommit),
group.pendingOffsetCommit(topicIdPartition))
- group.onOffsetCommitAppend(partition,
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
- group.onTxnOffsetCommitAppend(producerId, partition,
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+ group.onOffsetCommitAppend(topicIdPartition,
CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+ group.onTxnOffsetCommitAppend(producerId, topicIdPartition,
CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
assertTrue(group.hasOffsets)
+ assertEquals(Some(CommitRecordMetadataAndOffset(Some(4L),
txnOffsetCommit)), group.pendingTxnOffsetCommit(producerId, topicIdPartition))
// The transactional offset commit hasn't been committed yet, so we should
materialize the consumer offset commit.
- assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+ assertEquals(Some(consumerOffsetCommit),
group.offset(topicIdPartition.topicPartition))
group.completePendingTxnOffsetCommit(producerId, isCommit = false)
assertTrue(group.hasOffsets)
// The transactional offset commit should be discarded and the consumer
offset commit should continue to be
// materialized.
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
- assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+ assertEquals(None, group.pendingTxnOffsetCommit(producerId,
topicIdPartition))
+ assertEquals(Some(consumerOffsetCommit),
group.offset(topicIdPartition.topicPartition))
}
@Test
def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val producerId = 13232L
val txnOffsetCommit = offsetAndMetadata(37)
- group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+ group.prepareTxnOffsetCommit(producerId, Map(topicIdPartition ->
txnOffsetCommit))
assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
assertTrue(group.hasOffsets)
- assertEquals(None, group.offset(partition))
- group.failPendingTxnOffsetCommit(producerId, partition)
+ assertEquals(Some(CommitRecordMetadataAndOffset(None, txnOffsetCommit)),
group.pendingTxnOffsetCommit(producerId, topicIdPartition))
+ assertEquals(None, group.offset(topicIdPartition.topicPartition))
+ group.failPendingTxnOffsetCommit(producerId, topicIdPartition)
assertFalse(group.hasOffsets)
assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+ assertEquals(None, group.pendingTxnOffsetCommit(producerId,
topicIdPartition))
// The commit marker should now have no effect.
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
@@ -706,21 +739,21 @@ class GroupMetadataTest {
@Test
def testHasPendingNonTxnOffsets(): Unit = {
- val partition = new TopicPartition("foo", 0)
+ val partition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = offsetAndMetadata(37)
group.prepareOffsetCommit(Map(partition -> offset))
- assertTrue(group.hasPendingOffsetCommitsForTopicPartition(partition))
+
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(partition.topicPartition))
}
@Test
def testHasPendingTxnOffsets(): Unit = {
- val txnPartition = new TopicPartition("foo", 1)
+ val txnPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
val offset = offsetAndMetadata(37)
val producerId = 5
group.prepareTxnOffsetCommit(producerId, Map(txnPartition -> offset))
- assertTrue(group.hasPendingOffsetCommitsForTopicPartition(txnPartition))
+
assertTrue(group.hasPendingOffsetCommitsForTopicPartition(txnPartition.topicPartition))
assertFalse(group.hasPendingOffsetCommitsForTopicPartition(new
TopicPartition("non-exist", 0)))
}