mumrah commented on code in PR #13735: URL: https://github.com/apache/kafka/pull/13735#discussion_r1212119774
########## core/src/test/scala/unit/kafka/zk/migration/ZkMigrationClientTest.scala: ########## @@ -259,4 +259,60 @@ class ZkMigrationClientTest extends ZkMigrationTestHarness { assertEquals(TopicConfig.RETENTION_MS_CONFIG, configs.last.name()) assertEquals("300000", configs.last.value()) } + + @Test + def testUpdateExistingTopicWithNewAndChangedPartitions(): Unit = { + assertEquals(0, migrationState.migrationZkVersion()) + + val topicId = Uuid.randomUuid() + val partitions = Map( + 0 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1), + 1 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1) + ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava + migrationState = migrationClient.topicClient().createTopic("test", topicId, partitions, migrationState) + assertEquals(1, migrationState.migrationZkVersion()) + + // Change assignment in partitions and update the topic assignment. See the change is + // reflected. + val changedPartitions = Map( + 0 -> new PartitionRegistration(Array(1, 2, 3), Array(1, 2, 3), Array(), Array(), 0, LeaderRecoveryState.RECOVERED, 0, -1), + 1 -> new PartitionRegistration(Array(0, 1, 2), Array(0, 1, 2), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1) + ).map { case (k, v) => Integer.valueOf(k) -> v }.asJava + migrationState = migrationClient.topicClient().updateTopic("test", topicId, changedPartitions, migrationState) + assertEquals(2, migrationState.migrationZkVersion()) + + // Read the changed partition with zkClient. + val topicReplicaAssignmentFromZk = zkClient.getReplicaAssignmentAndTopicIdForTopics(Set("test")) + assertEquals(1, topicReplicaAssignmentFromZk.size) + assertEquals(Some(topicId), topicReplicaAssignmentFromZk.head.topicId); + topicReplicaAssignmentFromZk.head.assignment.foreach { case (tp, assignment) => + tp.partition() match { + case p if p <=1 => + assertEquals(changedPartitions.get(p).replicas.toSeq, assignment.replicas) + assertEquals(changedPartitions.get(p).addingReplicas.toSeq, assignment.addingReplicas) + assertEquals(changedPartitions.get(p).removingReplicas.toSeq, assignment.removingReplicas) + case p => fail(s"Found unknown partition $p") + } + } + + // Add a new Partition. + val newPartition = Map( + 2 -> new PartitionRegistration(Array(2, 3, 4), Array(2, 3, 4), Array(), Array(), 1, LeaderRecoveryState.RECOVERED, 0, -1) + ).map { case (k, v) => int2Integer(k) -> v }.asJava + migrationState = migrationClient.topicClient().createTopicPartitions(Map("test" -> newPartition).asJava, migrationState) + assertEquals(3, migrationState.migrationZkVersion()) + + // Read new partition from Zk. + val newPartitionFromZk = zkClient.getTopicPartitionState(new TopicPartition("test", 2)) + assertTrue(newPartitionFromZk.isDefined) + newPartitionFromZk.foreach { part => + val expectedPartition = newPartition.get(2) + assertEquals(expectedPartition.leader, part.leaderAndIsr.leader) + // Since KRaft increments partition epoch on change. + assertEquals(expectedPartition.partitionEpoch + 1, part.leaderAndIsr.partitionEpoch) + assertEquals(expectedPartition.leaderEpoch, part.leaderAndIsr.leaderEpoch) + assertEquals(expectedPartition.leaderRecoveryState, part.leaderAndIsr.leaderRecoveryState) + assertEquals(expectedPartition.isr.toList, part.leaderAndIsr.isr) + } Review Comment: Nevermind, I see you have that test case in ZkMigrationIntegrationTest -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org