mumrah commented on code in PR #13735:
URL: https://github.com/apache/kafka/pull/13735#discussion_r1212119774


##########
core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala:
##########
@@ -259,4 +259,60 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness 
{
     assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name())
     assertEquals("300000", configs.last.value())
   }
+
+  @Test
+  def testUpdateExistingTopicWithNewAndChangedPartitions(): Unit = {
+    assertEquals(0, migrationState.migrationZkVersion())
+
+    val topicId = Uuid.randomUuid()
+    val partitions = Map(
+      0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), 
Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), 
Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().createTopic("test", 
topicId, partitions, migrationState)
+    assertEquals(1, migrationState.migrationZkVersion())
+
+    // Change assignment in partitions and update the topic assignment. See 
the change is
+    // reflected.
+    val changedPartitions = Map(
+      0 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), 
Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1),
+      1 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), 
Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava
+    migrationState = migrationClient.topicClient().updateTopic("test", 
topicId, changedPartitions, migrationState)
+    assertEquals(2, migrationState.migrationZkVersion())
+
+    // Read the changed partition with zkClient.
+    val topicReplicaAssignmentFromZk = 
zkClient.getReplicaAssignmentAndTopicIdForTopics(Set("test"))
+    assertEquals(1, topicReplicaAssignmentFromZk.size)
+    assertEquals(Some(topicId), topicReplicaAssignmentFromZk.head.topicId);
+    topicReplicaAssignmentFromZk.head.assignment.foreach { case (tp, 
assignment) =>
+      tp.partition() match {
+        case p if p <=1 =>
+          assertEquals(changedPartitions.get(p).replicas.toSeq, 
assignment.replicas)
+          assertEquals(changedPartitions.get(p).addingReplicas.toSeq, 
assignment.addingReplicas)
+          assertEquals(changedPartitions.get(p).removingReplicas.toSeq, 
assignment.removingReplicas)
+        case p => fail(s"Found unknown partition $p")
+      }
+    }
+
+    // Add a new Partition.
+    val newPartition = Map(
+      2 -> new PartitionRegistration(Array(2, 3, 4), Array(2, 3, 4), Array(), 
Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1)
+    ).map { case (k, v) => int2Integer(k) -> v }.asJava
+    migrationState = 
migrationClient.topicClient().createTopicPartitions(Map("test" -> 
newPartition).asJava, migrationState)
+    assertEquals(3, migrationState.migrationZkVersion())
+
+    // Read new partition from Zk.
+    val newPartitionFromZk = zkClient.getTopicPartitionState(new 
TopicPartition("test", 2))
+    assertTrue(newPartitionFromZk.isDefined)
+    newPartitionFromZk.foreach { part =>
+      val expectedPartition = newPartition.get(2)
+      assertEquals(expectedPartition.leader, part.leaderAndIsr.leader)
+      // Since KRaft increments partition epoch on change.
+      assertEquals(expectedPartition.partitionEpoch + 1, 
part.leaderAndIsr.partitionEpoch)
+      assertEquals(expectedPartition.leaderEpoch, 
part.leaderAndIsr.leaderEpoch)
+      assertEquals(expectedPartition.leaderRecoveryState, 
part.leaderAndIsr.leaderRecoveryState)
+      assertEquals(expectedPartition.isr.toList, part.leaderAndIsr.isr)
+    }

Review Comment:
   Nevermind, I see you have that test case in ZkMigrationIntegrationTest



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to