Repository: kafka Updated Branches: refs/heads/trunk 996e29cfe -> b3847f76b
KAFKA-3270; Added some Happy Path Tests for the Reassign Partitions Command with help from enothereska :) Author: Ben Stopford <[email protected]> Reviewers: Jun Rao <[email protected]>, Eno Thereska <[email protected]>, Ismael Juma <[email protected]> Closes #956 from benstopford/KAFKA-3270-ReassignPartitionsCommand-Tests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3847f76 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3847f76 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3847f76 Branch: refs/heads/trunk Commit: b3847f76b571deac5c7da7287a642c8b354a2a8c Parents: 996e29c Author: Ben Stopford <[email protected]> Authored: Tue Apr 26 06:31:00 2016 -0700 Committer: Ismael Juma <[email protected]> Committed: Tue Apr 26 06:31:00 2016 -0700 ---------------------------------------------------------------------- .../kafka/admin/ReassignPartitionsCommand.scala | 19 ++-- core/src/main/scala/kafka/utils/ZkUtils.scala | 4 +- .../admin/ReassignPartitionsClusterTest.scala | 112 +++++++++++++++++++ 3 files changed, 126 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 446ab9f..1bf351a 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -39,7 +39,7 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) - val zkUtils = ZkUtils(zkConnect, + val zkUtils = ZkUtils(zkConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled()) @@ -93,8 +93,8 @@ object ReassignPartitionsCommand extends Logging { val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) val disableRackAware = opts.options.has(opts.disableRackAware) val (proposedAssignments, currentAssignments) = generateAssignment(zkUtils, brokerListToReassign, topicsToMoveJsonString, disableRackAware) - println("Current partition replica assignment\n\n%s".format(zkUtils.getPartitionReassignmentZkData(currentAssignments))) - println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.getPartitionReassignmentZkData(proposedAssignments))) + println("Current partition replica assignment\n\n%s".format(zkUtils.formatAsReassignmentJson(currentAssignments))) + println("Proposed partition reassignment configuration\n\n%s".format(zkUtils.formatAsReassignmentJson(proposedAssignments))) } def generateAssignment(zkUtils: ZkUtils, brokerListToReassign: Seq[Int], topicsToMoveJsonString: String, disableRackAware: Boolean): (Map[TopicAndPartition, Seq[Int]], Map[TopicAndPartition, Seq[Int]]) = { @@ -125,9 +125,14 @@ object ReassignPartitionsCommand extends Logging { CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) + executeAssignment(zkUtils, reassignmentJsonString) + } + + def executeAssignment(zkUtils: ZkUtils,reassignmentJsonString: String){ + val partitionsToBeReassigned = zkUtils.parsePartitionReassignmentDataWithoutDedup(reassignmentJsonString) if (partitionsToBeReassigned.isEmpty) - throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(reassignmentJsonFile)) + throw new AdminCommandFailedException("Partition reassignment data file is empty") val duplicateReassignedPartitions = CoreUtils.duplicates(partitionsToBeReassigned.map{ case(tp,replicas) => tp}) if (duplicateReassignedPartitions.nonEmpty) throw new AdminCommandFailedException("Partition reassignment contains duplicate topic partitions: %s".format(duplicateReassignedPartitions.mkString(","))) @@ -144,10 +149,10 @@ object ReassignPartitionsCommand extends Logging { // before starting assignment, output the current replica assignment to facilitate rollback val currentPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(partitionsToBeReassigned.map(_._1.topic)) println("Current partition replica assignment\n\n%s\n\nSave this to use as the --reassignment-json-file option during rollback" - .format(zkUtils.getPartitionReassignmentZkData(currentPartitionReplicaAssignment))) + .format(zkUtils.formatAsReassignmentJson(currentPartitionReplicaAssignment))) // start the reassignment if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(zkUtils.getPartitionReassignmentZkData(partitionsToBeReassigned.toMap))) + println("Successfully started reassignment of partitions %s".format(zkUtils.formatAsReassignmentJson(partitionsToBeReassigned.toMap))) else println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) } @@ -228,7 +233,7 @@ class ReassignPartitionsCommand(zkUtils: ZkUtils, partitions: collection.Map[Top false } else { - val jsonReassignmentData = zkUtils.getPartitionReassignmentZkData(validPartitions) + val jsonReassignmentData = zkUtils.formatAsReassignmentJson(validPartitions) zkUtils.createPersistentPath(ZkUtils.ReassignPartitionsPath, jsonReassignmentData) true } http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index 155b3fd..83ff517 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -710,7 +710,7 @@ class ZkUtils(val zkClient: ZkClient, topics } - def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { + def formatAsReassignmentJson(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { Json.encode(Map("version" -> 1, "partitions" -> partitionsToBeReassigned.map(e => Map("topic" -> e._1.topic, "partition" -> e._1.partition, "replicas" -> e._2)))) } @@ -722,7 +722,7 @@ class ZkUtils(val zkClient: ZkClient, deletePath(zkPath) info("No more partitions need to be reassigned. Deleting zk path %s".format(zkPath)) case _ => - val jsonData = getPartitionReassignmentZkData(partitionsToBeReassigned) + val jsonData = formatAsReassignmentJson(partitionsToBeReassigned) try { updatePersistentPath(zkPath, jsonData) debug("Updated partition reassignment path with %s".format(jsonData)) http://git-wip-us.apache.org/repos/asf/kafka/blob/b3847f76/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala new file mode 100644 index 0000000..ac2c1ae --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsClusterTest.scala @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package unit.kafka.admin + +import kafka.admin.ReassignPartitionsCommand +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.TestUtils._ +import kafka.utils.ZkUtils._ +import kafka.utils.{CoreUtils, Logging} +import kafka.zk.ZooKeeperTestHarness +import org.junit.{After, Before, Test} +import org.junit.Assert.assertEquals +import scala.collection.Seq + + +class ReassignPartitionsClusterTest extends ZooKeeperTestHarness with Logging { + val partitionId = 0 + var servers: Seq[KafkaServer] = null + val topicName = "my-topic" + + @Before + override def setUp() { + super.setUp() + } + + def startBrokers(brokerIds: Seq[Int]) { + servers = brokerIds.map(i => createBrokerConfig(i, zkConnect)) + .map(c => createServer(KafkaConfig.fromProps(c))) + } + + @After + override def tearDown() { + servers.foreach(_.shutdown()) + servers.foreach(server => CoreUtils.delete(server.config.logDirs)) + super.tearDown() + } + + @Test + def shouldMoveSinglePartition { + //Given a single replica on server 100 + startBrokers(Seq(100, 101)) + val partition = 0 + createTopic(zkUtils, topicName, Map(partition -> Seq(100)), servers = servers) + + //When we move the replica on 100 to broker 101 + ReassignPartitionsCommand.executeAssignment(zkUtils, s"""{"version":1,"partitions":[{"topic":"$topicName","partition":0,"replicas":[101]}]}""") + waitForReasignmentToComplete() + + //Then the replica should be on 101 + assertEquals(zkUtils.getPartitionAssignmentForTopics(Seq(topicName)).get(topicName).get(partition), Seq(101)) + } + + @Test + def shouldExpandCluster() { + //Given partitions on 2 of 3 brokers + val brokers = Array(100, 101, 102) + startBrokers(brokers) + createTopic(zkUtils, topicName, Map( + 0 -> Seq(100, 101), + 1 -> Seq(100, 101), + 2 -> Seq(100, 101) + ), servers = servers) + + //When rebalancing + val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, brokers, json(topicName), true)._1 + ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment)) + waitForReasignmentToComplete() + + //Then the replicas should span all three brokers + val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) + assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101, 102)) + } + + @Test + def shouldShrinkCluster() { + //Given partitions on 3 of 3 brokers + val brokers = Array(100, 101, 102) + startBrokers(brokers) + createTopic(zkUtils, topicName, Map( + 0 -> Seq(100, 101), + 1 -> Seq(101, 102), + 2 -> Seq(102, 100) + ), servers = servers) + + //When rebalancing + val newAssignment = ReassignPartitionsCommand.generateAssignment(zkUtils, Array(100, 101), json(topicName), true)._1 + ReassignPartitionsCommand.executeAssignment(zkUtils, zkUtils.formatAsReassignmentJson(newAssignment)) + waitForReasignmentToComplete() + + //Then replicas should only span the first two brokers + val actual = zkUtils.getPartitionAssignmentForTopics(Seq(topicName))(topicName) + assertEquals(actual.values.flatten.toSeq.distinct.sorted, Seq(100, 101)) + } + + def waitForReasignmentToComplete() { + waitUntilTrue(() => !zkUtils.pathExists(ReassignPartitionsPath), s"Znode $zkUtils.ReassignPartitionsPath wasn't deleted") + } + + def json(topic: String): String = { + s"""{"topics": [{"topic": "$topic"}],"version":1}""" + } +}
