This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c5e06f6e7ab KAFKA-18486 Update 
testExceptionWhenUnverifiedTransactionHasMultipleProducerIds  (#19883)
c5e06f6e7ab is described below

commit c5e06f6e7ab2643f8a74b56dfa8bf111e27c0eb5
Author: Jing-Jia Hung <[email protected]>
AuthorDate: Sun Jun 8 00:55:20 2025 +0800

    KAFKA-18486 Update 
testExceptionWhenUnverifiedTransactionHasMultipleProducerIds  (#19883)
    
    - Replace the deprecated `becomeLeaderOrFollower` with the
    metadata-based `applyDelta` method.
    - Add overloaded `topicsCreateDelta` to support custom topic name and
    topicId.
    
    Reviewers: Ken Huang <[email protected]>, TengYao Chi
     <[email protected]>, Nick Guo <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 42 +++++++++++-----------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9077f664eff..3611af26488 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -2519,6 +2519,7 @@ class ReplicaManagerTest {
 
   @Test
   def testExceptionWhenUnverifiedTransactionHasMultipleProducerIds(): Unit = {
+    val localId = 1
     val tp0 = new TopicPartition(topic, 0)
     val tp1 = new TopicPartition(topic, 1)
     val transactionalId = "txn1"
@@ -2531,13 +2532,9 @@ class ReplicaManagerTest {
     val replicaManager = 
setUpReplicaManagerWithMockedAddPartitionsToTxnManager(addPartitionsToTxnManager,
 List(tp0, tp1))
 
     try {
-      replicaManager.becomeLeaderOrFollower(1,
-        makeLeaderAndIsrRequest(topicIds(tp0.topic), tp0, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
-        (_, _) => ())
-
-      replicaManager.becomeLeaderOrFollower(1,
-        makeLeaderAndIsrRequest(topicIds(tp1.topic), tp1, Seq(0, 1), new 
LeaderAndIsr(1, List(0, 1).map(Int.box).asJava)),
-        (_, _) => ())
+      val leaderDelta = topicsCreateDelta(localId, isStartIdLeader = true, 
partitions = List(0, 1), List.empty, topic, topicIds(topic))
+      val leaderImage = imageFromTopics(leaderDelta.apply())
+      replicaManager.applyDelta(leaderDelta, leaderImage)
 
       // Append some transactional records with different producer IDs
       val transactionalRecords = mutable.Map[TopicPartition, MemoryRecords]()
@@ -4651,7 +4648,7 @@ class ReplicaManagerTest {
     try {
       val directoryIds = replicaManager.logManager.directoryIdsSet.toList
       assertEquals(directoryIds.size, 2)
-      val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, 
partition = 0, directoryIds = directoryIds)
+      val leaderTopicsDelta: TopicsDelta = topicsCreateDelta(localId, true, 
partitions = List(0), directoryIds = directoryIds)
       val (partition: Partition, isNewWhenCreatedForFirstTime: Boolean) = 
replicaManager.getOrCreatePartition(topicPartition0.topicPartition(), 
leaderTopicsDelta, FOO_UUID).get
       
partition.makeLeader(leaderAndIsrPartitionState(topicPartition0.topicPartition(),
 1, localId, Seq(1, 2)),
         new 
LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava),
@@ -4698,7 +4695,7 @@ class ReplicaManagerTest {
       // Test applying delta as leader
       val directoryIds = replicaManager.logManager.directoryIdsSet.toList
       // Make the local replica the leader
-      val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, 
directoryIds = directoryIds)
+      val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = 
List(0), directoryIds = directoryIds)
       val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
@@ -4708,7 +4705,7 @@ class ReplicaManagerTest {
       assertEquals(directoryIds.head, logDirIdHostingPartition0)
 
       // Test applying delta as follower
-      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = directoryIds)
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = 
List(1), directoryIds = directoryIds)
       val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
 
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
@@ -4736,7 +4733,7 @@ class ReplicaManagerTest {
 
     try {
       // Make the local replica the leader
-      val leaderTopicsDelta = topicsCreateDelta(localId, true, partition = 0, 
directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+      val leaderTopicsDelta = topicsCreateDelta(localId, true, partitions = 
List(0), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
       val leaderMetadataImage = imageFromTopics(leaderTopicsDelta.apply())
       val topicId = leaderMetadataImage.topics().topicsByName.get("foo").id
       val topicIdPartition0 = new TopicIdPartition(topicId, topicPartition0)
@@ -4744,7 +4741,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Make the local replica the as follower
-      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = 
List(1), directoryIds = List(DirectoryId.UNASSIGNED, DirectoryId.UNASSIGNED))
       val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
@@ -4790,7 +4787,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(leaderTopicsDelta, leaderMetadataImage)
 
       // Make the local replica the as follower
-      val followerTopicsDelta = topicsCreateDelta(localId, false, partition = 
1, directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
+      val followerTopicsDelta = topicsCreateDelta(localId, false, partitions = 
List(1), directoryIds = List(DirectoryId.LOST, DirectoryId.LOST))
       val followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
       replicaManager.applyDelta(followerTopicsDelta, followerMetadataImage)
 
@@ -5787,23 +5784,26 @@ class ReplicaManagerTest {
     }
   }
 
-  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partition:Int = 0, directoryIds: List[Uuid] = List.empty): TopicsDelta = {
+  private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean, 
partitions:List[Int] = List(0), directoryIds: List[Uuid] = List.empty, 
topicName: String = "foo", topicId: Uuid = FOO_UUID): TopicsDelta = {
     val leader = if (isStartIdLeader) startId else startId + 1
     val delta = new TopicsDelta(TopicsImage.EMPTY)
-    delta.replay(new TopicRecord().setName("foo").setTopicId(FOO_UUID))
-    val record = partitionRecord(startId, leader, partition)
-    if (directoryIds.nonEmpty) {
-      record.setDirectories(directoryIds.asJava)
+    delta.replay(new TopicRecord().setName(topicName).setTopicId(topicId))
+
+    partitions.foreach { partition =>
+      val record = partitionRecord(startId, leader, partition, topicId)
+      if (directoryIds.nonEmpty) {
+        record.setDirectories(directoryIds.asJava)
+      }
+      delta.replay(record)
     }
-    delta.replay(record)
 
     delta
   }
 
-  private def partitionRecord(startId: Int, leader: Int, partition: Int = 0) = 
{
+  private def partitionRecord(startId: Int, leader: Int, partition: Int = 0, 
topicId: Uuid = FOO_UUID) = {
     new PartitionRecord()
       .setPartitionId(partition)
-      .setTopicId(FOO_UUID)
+      .setTopicId(topicId)
       .setReplicas(util.Arrays.asList(startId, startId + 1))
       .setIsr(util.Arrays.asList(startId, startId + 1))
       .setRemovingReplicas(Collections.emptyList())

Reply via email to