KAFKA-990 Fix ReassignPartitionCommand and improve usability; reviewed by Neha, Jun, Joel and Guozhang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/39fc578d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/39fc578d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/39fc578d Branch: refs/heads/trunk Commit: 39fc578d5da3f871fbcd5205de010f1f574507d1 Parents: 7640bee Author: Sriram Subramanian <[email protected]> Authored: Tue Aug 27 22:12:25 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Aug 27 22:12:35 2013 -0700 ---------------------------------------------------------------------- .../kafka/admin/ReassignPartitionsCommand.scala | 94 +++++++++++--- .../kafka/controller/KafkaController.scala | 130 +++++++++---------- .../kafka/controller/ReplicaStateMachine.scala | 7 +- core/src/main/scala/kafka/utils/ZkUtils.scala | 18 +++ 4 files changed, 162 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 8d287f4..aa61fa1 100644 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -18,6 +18,7 @@ package kafka.admin import joptsimple.OptionParser import kafka.utils._ +import collection._ import org.I0Itec.zkclient.ZkClient import org.I0Itec.zkclient.exception.ZkNodeExistsException import kafka.common.{TopicAndPartition, AdminCommandFailedException} @@ -26,21 +27,40 @@ object ReassignPartitionsCommand extends Logging { def main(args: Array[String]): Unit = { val parser = new OptionParser - val jsonFileOpt = parser.accepts("path-to-json-file", "REQUIRED: The JSON file with the list of partitions and the " + - "new replicas they should be reassigned to in the following format - \n" + - "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }]\n}") + val topicsToMoveJsonFileOpt = parser.accepts("topics-to-move-json-file", "The JSON file with the list of topics to reassign." + + "This option or manual-assignment-json-file needs to be specified. The format to use is - \n" + + "{\"topics\":\n\t[{\"topic\": \"foo\"},{\"topic\": \"foo1\"}],\n\"version\":1\n}") .withRequiredArg - .describedAs("partition reassignment json file path") + .describedAs("topics to reassign json file path") .ofType(classOf[String]) + + val manualAssignmentJsonFileOpt = parser.accepts("manual-assignment-json-file", "The JSON file with the list of manual reassignments" + + "This option or topics-to-move-json-file needs to be specified. The format to use is - \n" + + "{\"partitions\":\n\t[{\"topic\": \"foo\",\n\t \"partition\": 1,\n\t \"replicas\": [1,2,3] }],\n\"version\":1\n}") + .withRequiredArg + .describedAs("manual assignment json file path") + .ofType(classOf[String]) + + val brokerListOpt = parser.accepts("broker-list", "The list of brokers to which the partitions need to be reassigned" + + " in the form \"0,1,2\". This is required for automatic topic reassignment.") + .withRequiredArg + .describedAs("brokerlist") + .ofType(classOf[String]) + val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val executeOpt = parser.accepts("execute", "This option does the actual reassignment. By default, the tool does a dry run") + .withOptionalArg() + .describedAs("execute") + .ofType(classOf[String]) + val options = parser.parse(args : _*) - for(arg <- List(jsonFileOpt, zkConnectOpt)) { + for(arg <- List(zkConnectOpt)) { if(!options.has(arg)) { System.err.println("Missing required argument \"" + arg + "\"") parser.printHelpOn(System.err) @@ -48,24 +68,56 @@ object ReassignPartitionsCommand extends Logging { } } - val jsonFile = options.valueOf(jsonFileOpt) - val zkConnect = options.valueOf(zkConnectOpt) - val jsonString = Utils.readFileAsString(jsonFile) - var zkClient: ZkClient = null + if (options.has(topicsToMoveJsonFileOpt) && options.has(manualAssignmentJsonFileOpt)) { + System.err.println("Only one of the json files should be specified") + parser.printHelpOn(System.err) + System.exit(1) + } + val zkConnect = options.valueOf(zkConnectOpt) + var zkClient: ZkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) try { - // read the json file into a string - val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) - if (partitionsToBeReassigned.isEmpty) - throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(jsonFile)) - - zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer) - val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) - - if(reassignPartitionsCommand.reassignPartitions()) - println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) - else - println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + + var partitionsToBeReassigned : Map[TopicAndPartition, Seq[Int]] = new mutable.HashMap[TopicAndPartition, List[Int]]() + + if(options.has(topicsToMoveJsonFileOpt)) { + val topicsToMoveJsonFile = options.valueOf(topicsToMoveJsonFileOpt) + val brokerList = options.valueOf(brokerListOpt) + val topicsToMoveJsonString = Utils.readFileAsString(topicsToMoveJsonFile) + val brokerListToReassign = brokerList.split(',') map (_.toInt) + val topicsToReassign = ZkUtils.parseTopicsData(topicsToMoveJsonString) + val topicPartitionsToReassign = ZkUtils.getReplicaAssignmentForTopics(zkClient, topicsToReassign) + + val groupedByTopic = topicPartitionsToReassign.groupBy(tp => tp._1.topic) + groupedByTopic.foreach { topicInfo => + val assignedReplicas = AdminUtils.assignReplicasToBrokers(brokerListToReassign, topicInfo._2.size, + topicInfo._2.head._2.size) + partitionsToBeReassigned ++= assignedReplicas.map(replicaInfo => (TopicAndPartition(topicInfo._1, replicaInfo._1) -> replicaInfo._2)) + } + + } else if (options.has(manualAssignmentJsonFileOpt)) { + val manualAssignmentJsonFile = options.valueOf(manualAssignmentJsonFileOpt) + val manualAssignmentJsonString = Utils.readFileAsString(manualAssignmentJsonFile) + partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(manualAssignmentJsonString) + if (partitionsToBeReassigned.isEmpty) + throw new AdminCommandFailedException("Partition reassignment data file %s is empty".format(manualAssignmentJsonFileOpt)) + } else { + System.err.println("Missing json file. One of the file needs to be specified") + parser.printHelpOn(System.err) + System.exit(1) + } + + if (options.has(executeOpt)) { + val reassignPartitionsCommand = new ReassignPartitionsCommand(zkClient, partitionsToBeReassigned) + + if(reassignPartitionsCommand.reassignPartitions()) + println("Successfully started reassignment of partitions %s".format(partitionsToBeReassigned)) + else + println("Failed to reassign partitions %s".format(partitionsToBeReassigned)) + } else { + System.out.println("This is a dry run (Use --execute to do the actual reassignment. " + + "The replica assignment is \n" + partitionsToBeReassigned.toString()) + } } catch { case e => println("Partitions reassignment failed due to " + e.getMessage) http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index bde405a..ab18b7a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -356,13 +356,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg * Reassigning replicas for a partition goes through a few stages - * RAR = Reassigned replicas * AR = Original list of replicas for partition - * 1. Register listener for ISR changes to detect when the RAR is a subset of the ISR - * 2. Start new replicas RAR - AR. - * 3. Wait until new replicas are in sync with the leader - * 4. If the leader is not in RAR, elect a new leader from RAR - * 5. Stop old replicas AR - RAR - * 6. Write new AR - * 7. Remove partition from the /admin/reassign_partitions path + * 1. Start new replicas RAR - AR. + * 2. Wait until new replicas are in sync with the leader + * 3. If the leader is not in RAR, elect a new leader from RAR + * 4. Stop old replicas AR - RAR + * 5. Write new AR + * 6. Remove partition from the /admin/reassign_partitions path */ def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas @@ -395,6 +394,54 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg } } + private def watchIsrChangesForReassignedPartition(topic: String, + partition: Int, + reassignedPartitionContext: ReassignedPartitionsContext) { + val reassignedReplicas = reassignedPartitionContext.newReplicas + val isrChangeListener = new ReassignedPartitionsIsrChangeListener(this, topic, partition, + reassignedReplicas.toSet) + reassignedPartitionContext.isrChangeListener = isrChangeListener + // register listener on the leader and isr path to wait until they catch up with the current leader + zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) + } + + def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, + reassignedPartitionContext: ReassignedPartitionsContext) { + val newReplicas = reassignedPartitionContext.newReplicas + val topic = topicAndPartition.topic + val partition = topicAndPartition.partition + val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) + try { + val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) + assignedReplicasOpt match { + case Some(assignedReplicas) => + if(assignedReplicas == newReplicas) { + throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) + } else { + if(aliveNewReplicas == newReplicas) { + info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) + // first register ISR change listener + watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext) + controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext) + onPartitionReassignment(topicAndPartition, reassignedPartitionContext) + } else { + // some replica in RAR is not alive. Fail partition reassignment + throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + + " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + + "Failing partition reassignment") + } + } + case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" + .format(topicAndPartition)) + } + } catch { + case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) + // remove the partition from the admin path to unblock the admin client + removePartitionFromReassignedPartitions(topicAndPartition) + } + } + def onPreferredReplicaElection(partitions: Set[TopicAndPartition]) { info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(","))) try { @@ -501,12 +548,17 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg val reassignedPartitions = partitionsBeingReassigned.filter(partition => controllerContext.partitionReplicaAssignment(partition._1) == partition._2.newReplicas).map(_._1) reassignedPartitions.foreach(p => removePartitionFromReassignedPartitions(p)) - controllerContext.partitionsBeingReassigned ++= partitionsBeingReassigned - controllerContext.partitionsBeingReassigned --= reassignedPartitions + var partitionsToReassign: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap + partitionsToReassign ++= partitionsBeingReassigned + partitionsToReassign --= reassignedPartitions + info("Partitions being reassigned: %s".format(partitionsBeingReassigned.toString())) info("Partitions already reassigned: %s".format(reassignedPartitions.toString())) - info("Resuming reassignment of partitions: %s".format(controllerContext.partitionsBeingReassigned.toString())) - controllerContext.partitionsBeingReassigned.foreach(partition => onPartitionReassignment(partition._1, partition._2)) + info("Resuming reassignment of partitions: %s".format(partitionsToReassign.toString())) + + partitionsToReassign.foreach { topicPartitionToReassign => + initiateReassignReplicasForTopicPartition(topicPartitionToReassign._1, topicPartitionToReassign._2) + } } private def initializeAndMaybeTriggerPreferredReplicaElection() { @@ -595,8 +647,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // stop watching the ISR changes for this partition zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition), controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener) - // update the assigned replica list - controllerContext.partitionReplicaAssignment.put(topicAndPartition, reassignedReplicas) } private def startNewReplicasForReassignedPartition(topicAndPartition: TopicAndPartition, @@ -795,39 +845,8 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL val newPartitions = partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1)) newPartitions.foreach { partitionToBeReassigned => controllerContext.controllerLock synchronized { - val topic = partitionToBeReassigned._1.topic - val partition = partitionToBeReassigned._1.partition - val newReplicas = partitionToBeReassigned._2 - val topicAndPartition = partitionToBeReassigned._1 - val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r)) - try { - val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition) - assignedReplicasOpt match { - case Some(assignedReplicas) => - if(assignedReplicas == newReplicas) { - throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + - " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(","))) - } else { - if(aliveNewReplicas == newReplicas) { - info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(","))) - val context = createReassignmentContextForPartition(topic, partition, newReplicas) - controllerContext.partitionsBeingReassigned.put(topicAndPartition, context) - controller.onPartitionReassignment(topicAndPartition, context) - } else { - // some replica in RAR is not alive. Fail partition reassignment - throw new KafkaException("Only %s replicas out of the new set of replicas".format(aliveNewReplicas.mkString(",")) + - " %s for partition %s to be reassigned are alive. ".format(newReplicas.mkString(","), topicAndPartition) + - "Failing partition reassignment") - } - } - case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist" - .format(topicAndPartition)) - } - } catch { - case e => error("Error completing reassignment of partition %s".format(topicAndPartition), e) - // remove the partition from the admin path to unblock the admin client - controller.removePartitionFromReassignedPartitions(topicAndPartition) - } + val context = new ReassignedPartitionsContext(partitionToBeReassigned._2) + controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context) } } } @@ -840,25 +859,6 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL @throws(classOf[Exception]) def handleDataDeleted(dataPath: String) { } - - private def createReassignmentContextForPartition(topic: String, - partition: Int, - newReplicas: Seq[Int]): ReassignedPartitionsContext = { - val context = new ReassignedPartitionsContext(newReplicas) - // first register ISR change listener - watchIsrChangesForReassignedPartition(topic, partition, context) - context - } - - private def watchIsrChangesForReassignedPartition(topic: String, partition: Int, - reassignedPartitionContext: ReassignedPartitionsContext) { - val reassignedReplicas = reassignedPartitionContext.newReplicas - val isrChangeListener = new ReassignedPartitionsIsrChangeListener(controller, topic, partition, - reassignedReplicas.toSet) - reassignedPartitionContext.isrChangeListener = isrChangeListener - // register listener on the leader and isr path to wait until they catch up with the current leader - zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition), isrChangeListener) - } } class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic: String, partition: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 9f752f4..c964857 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -182,7 +182,12 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { case None => true } - else false + else { + replicaState.put((topic, partition, replicaId), OfflineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OfflineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + false + } case None => true } http://git-wip-us.apache.org/repos/asf/kafka/blob/39fc578d/core/src/main/scala/kafka/utils/ZkUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala index ba5eacc..ca1ce12 100644 --- a/core/src/main/scala/kafka/utils/ZkUtils.scala +++ b/core/src/main/scala/kafka/utils/ZkUtils.scala @@ -629,6 +629,24 @@ object ZkUtils extends Logging { reassignedPartitions } + def parseTopicsData(jsonData: String): Seq[String] = { + var topics = List.empty[String] + Json.parseFull(jsonData) match { + case Some(m) => + m.asInstanceOf[Map[String, Any]].get("topics") match { + case Some(partitionsSeq) => + val mapPartitionSeq = partitionsSeq.asInstanceOf[Seq[Map[String, Any]]] + mapPartitionSeq.foreach(p => { + val topic = p.get("topic").get.asInstanceOf[String] + topics ++= List(topic) + }) + case None => + } + case None => + } + topics + } + def getPartitionReassignmentZkData(partitionsToBeReassigned: Map[TopicAndPartition, Seq[Int]]): String = { var jsonPartitionsData: mutable.ListBuffer[String] = ListBuffer[String]() for (p <- partitionsToBeReassigned) {
