This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c5e06f6e7ab KAFKA-18486 Update
testExceptionWhenUnverifiedTransactionHasMultipleProducerIds (#19883)
c5e06f6e7ab is described below
commit c5e06f6e7ab2643f8a74b56dfa8bf111e27c0eb5
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Sun Jun 8 00:55:20 2025 +0800
KAFKA-18486 Update
testExceptionWhenUnverifiedTransactionHasMultipleProducerIds (#19883)
- Replace the deprecated `becomeLeaderOrFollower` with the
metadata-based `applyDelta` method.
- Add overloaded `topicsCreateDelta` to support custom topic name and
topicId.
Reviewers: Ken Huang <[email protected]>, TengYao Chi
<[email protected]>, Nick Guo <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../unit/kafka/server/ReplicaManagerTest.scala | 42 +++++++++++-----------
1 file changed, 21 insertions(+), 21 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9077f664eff..3611af26488 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2519,6 +2519,7 @@ class ReplicaManagerTest {
@Test
def testExceptionWhenUnverifiedTransactionHasMultipleProducerIds(): Unit = {
+ val localId = 1
val tp0 = new TopicPartition(topic, 0)
val tp1 = new TopicPartition(topic, 1)
val transactionalId = "txn1"
@@ -2531,13 +2532,9 @@ class ReplicaManagerTest {
val replicaManager =
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
List(tp0, tp1))
try {
- replicaManager.becomeLeaderOrFollower(1,
- makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
- (_, _) => ())
-
- replicaManager.becomeLeaderOrFollower(1,
- makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
- (_, _) => ())
+ val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true,
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+ val leaderImage = imageFromTopics(leaderDelta.apply())
+ replicaManager.applyDelta(leaderDelta, leaderImage)
// Append some transactional records with different producer IDs
val transactionalRecords = mutable.Map[TopicPartition, MemoryRecords]()
@@ -4651,7 +4648,7 @@ class ReplicaManagerTest {
try {
val directoryIds = replicaManager.logManager.directoryIdsSet.toList
assertEquals(directoryIds.size, 2)
- val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true,
partition = 0, directoryIds = directoryIds)
+ val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true,
partitions = List(0), directoryIds = directoryIds)
val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) =
replicaManager.getOrCreatePartition(topicPartition0.topicPartition(),
leaderTopicsDelta, FOO_UUID).get
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
1, localId, Seq(1, 2)),
new
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
@@ -4698,7 +4695,7 @@ class ReplicaManagerTest {
// Test applying delta as leader
val directoryIds = replicaManager.logManager.directoryIdsSet.toList
// Make the local replica the leader
- val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0,
directoryIds = directoryIds)
+ val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions =
List(0), directoryIds = directoryIds)
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
@@ -4708,7 +4705,7 @@ class ReplicaManagerTest {
assertEquals(directoryIds.head, logDirIdHostingPartition0)
// Test applying delta as follower
- val followerTopicsDelta = topicsCreateDelta(localId, false, partition =
1, directoryIds = directoryIds)
+ val followerTopicsDelta = topicsCreateDelta(localId, false, partitions =
List(1), directoryIds = directoryIds)
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@@ -4736,7 +4733,7 @@ class ReplicaManagerTest {
try {
// Make the local replica the leader
- val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0,
directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+ val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions =
List(0), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id
val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0)
@@ -4744,7 +4741,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Make the local replica the as follower
- val followerTopicsDelta = topicsCreateDelta(localId, false, partition =
1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+ val followerTopicsDelta = topicsCreateDelta(localId, false, partitions =
List(1), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@@ -4790,7 +4787,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
// Make the local replica the as follower
- val followerTopicsDelta = topicsCreateDelta(localId, false, partition =
1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
+ val followerTopicsDelta = topicsCreateDelta(localId, false, partitions =
List(1), directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@@ -5787,23 +5784,26 @@ class ReplicaManagerTest {
}
}
- private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = {
+ private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean,
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty,
topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)
- delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
- val record = partitionRecord(startId, leader, partition)
- if (directoryIds.nonEmpty) {
- record.setDirectories(directoryIds.asJava)
+ delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
+
+ partitions.foreach { partition =>
+ val record = partitionRecord(startId, leader, partition, topicId)
+ if (directoryIds.nonEmpty) {
+ record.setDirectories(directoryIds.asJava)
+ }
+ delta.replay(record)
}
- delta.replay(record)
delta
}
- private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) =
{
+ private def partitionRecord(startId: Int, leader: Int, partition: Int = 0,
topicId: Uuid = FOO_UUID) = {
new PartitionRecord()
.setPartitionId(partition)
- .setTopicId(FOO_UUID)
+ .setTopicId(topicId)
.setReplicas(util.Arrays.asList(startId, startId + 1))
.setIsr(util.Arrays.asList(startId, startId + 1))
.setRemovingReplicas(Collections.emptyList())