Repository: kafka Updated Branches: refs/heads/trunk a208478f3 -> 5d6408f6c
KAFKA-4200; Fix throttle argument in kafka-reassign-partitions.sh Simple jira which alters two things: 1. kafka-reassign-partitions --verify prints Throttle was removed regardless of whether a throttle was applied. It should only print this if the value was actually changed. 2. --verify should exception if the âthrottle argument. (check generate too) To test this I extracted all validation logic into a separate method and added a test which covers the majority of combinations. The validation logic was retained as is, other than implementing (2) and adding validation to the --broker-list option which you can currently apply to any of hte main actions (where it is ignored). Requirement 1 was tested manually (as it's just println). Testing: - Build passes locally. - System test reassign_partitions_test.py also passes. Author: Ben Stopford <[email protected]> Reviewers: Jun Rao <[email protected]> Closes #1896 from benstopford/KAFKA-4200 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d6408f6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d6408f6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d6408f6 Branch: refs/heads/trunk Commit: 5d6408f6cfda3f8ab366195f69e90de048cde25d Parents: a208478 Author: Ben Stopford <[email protected]> Authored: Tue Sep 27 14:00:44 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue Sep 27 14:00:44 2016 +0100 ---------------------------------------------------------------------- .../kafka/admin/ReassignPartitionsCommand.scala | 71 +++--- .../scala/kafka/utils/CommandLineUtils.scala | 18 +- .../ReassignPartitionsCommandArgsTest.scala | 232 +++++++++++++++++++ 3 files changed, 292 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5d6408f6/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index ad050b4..5059463 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -16,7 +16,6 @@ */ package kafka.admin -import java.text.NumberFormat._ import java.util.Properties import joptsimple.OptionParser import kafka.log.LogConfig @@ -30,19 +29,9 @@ import org.apache.kafka.common.security.JaasUtils object ReassignPartitionsCommand extends Logging { - //TODO Note to reviewer - this class needs a little more work (which I'll complete on Monday, or we could just revert this, but including here as an outline of what is intended) - def main(args: Array[String]): Unit = { - val opts = new ReassignPartitionsCommandOptions(args) - - // should have exactly one action - val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) - if(actions != 1) - CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify") - - CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) - + val opts = validateAndParseArgs(args) val zkConnect = opts.options.valueOf(opts.zkConnectOpt) val zkUtils = ZkUtils(zkConnect, 30000, @@ -63,15 +52,13 @@ object ReassignPartitionsCommand extends Logging { } def verifyAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) - CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") val jsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val jsonString = Utils.readFileAsString(jsonFile) verifyAssignment(zkUtils, jsonString) } def verifyAssignment(zkUtils: ZkUtils, jsonString: String): Unit = { - println("Status of partition reassignment:") + println("Status of partition reassignment: ") val partitionsToBeReassigned = ZkUtils.parsePartitionReassignmentData(jsonString) val reassignedPartitionsStatus = checkIfReassignmentSucceeded(zkUtils, partitionsToBeReassigned) reassignedPartitionsStatus.foreach { case (topicPartition, status) => @@ -87,30 +74,35 @@ object ReassignPartitionsCommand extends Logging { removeThrottle(zkUtils, partitionsToBeReassigned, reassignedPartitionsStatus) } - def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, scala.Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = { + private def removeThrottle(zkUtils: ZkUtils, partitionsToBeReassigned: Map[TopicAndPartition, scala.Seq[Int]], reassignedPartitionsStatus: Map[TopicAndPartition, ReassignmentStatus]): Unit = { + var changed = false + //If all partitions have completed remove the throttle if (reassignedPartitionsStatus.forall { case (topicPartition, status) => status == ReassignmentCompleted }) { //Remove the throttle limit from all brokers in the cluster for (brokerId <- zkUtils.getAllBrokersInCluster().map(_.id)) { val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Broker, brokerId.toString) - if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null) + if (configs.remove(KafkaConfig.ThrottledReplicationRateLimitProp) != null) { AdminUtils.changeBrokerConfig(zkUtils, Seq(brokerId), configs) + changed = true + } } //Remove the list of throttled replicas from all topics with partitions being moved val topics = partitionsToBeReassigned.keySet.map(tp => tp.topic).toSeq.distinct for (topic <- topics) { val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) - if (configs.remove(LogConfig.ThrottledReplicasListProp) != null) + if (configs.remove(LogConfig.ThrottledReplicasListProp) != null) { AdminUtils.changeTopicConfig(zkUtils, topic, configs) + changed = true + } } - println("Throttle was removed.") + if (changed) + println("Throttle was removed.") } } def generateAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) { - if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) - CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") val topicsToMoveJsonFile = opts.options.valueOf(opts.topicsToMoveJsonFileOpt) val brokerListToReassign = opts.options.valueOf(opts.brokerListOpt).split(',').map(_.toInt) val duplicateReassignments = CoreUtils.duplicates(brokerListToReassign) @@ -146,8 +138,6 @@ object ReassignPartitionsCommand extends Logging { } def executeAssignment(zkUtils: ZkUtils, opts: ReassignPartitionsCommandOptions) { - if(!opts.options.has(opts.reassignmentJsonFileOpt)) - CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") val reassignmentJsonFile = opts.options.valueOf(opts.reassignmentJsonFileOpt) val reassignmentJsonString = Utils.readFileAsString(reassignmentJsonFile) val throttle = if (opts.options.has(opts.throttleOpt)) opts.options.valueOf(opts.throttleOpt) else -1 @@ -228,6 +218,38 @@ object ReassignPartitionsCommand extends Logging { } } + def validateAndParseArgs(args: Array[String]): ReassignPartitionsCommandOptions = { + val opts = new ReassignPartitionsCommandOptions(args) + + if(args.length == 0) + CommandLineUtils.printUsageAndDie(opts.parser, "This command moves topic partitions between replicas.") + + // Should have exactly one action + val actions = Seq(opts.generateOpt, opts.executeOpt, opts.verifyOpt).count(opts.options.has _) + if(actions != 1) + CommandLineUtils.printUsageAndDie(opts.parser, "Command must include exactly one action: --generate, --execute or --verify") + + CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.zkConnectOpt) + + //Validate arguments for each action + if(opts.options.has(opts.verifyOpt)) { + if(!opts.options.has(opts.reassignmentJsonFileOpt)) + CommandLineUtils.printUsageAndDie(opts.parser, "If --verify option is used, command must include --reassignment-json-file that was used during the --execute option") + CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.verifyOpt, Set(opts.throttleOpt, opts.topicsToMoveJsonFileOpt, opts.disableRackAware, opts.brokerListOpt)) + } + else if(opts.options.has(opts.generateOpt)) { + if(!(opts.options.has(opts.topicsToMoveJsonFileOpt) && opts.options.has(opts.brokerListOpt))) + CommandLineUtils.printUsageAndDie(opts.parser, "If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options") + CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.generateOpt, Set(opts.throttleOpt, opts.reassignmentJsonFileOpt)) + } + else if (opts.options.has(opts.executeOpt)){ + if(!opts.options.has(opts.reassignmentJsonFileOpt)) + CommandLineUtils.printUsageAndDie(opts.parser, "If --execute option is used, command must include --reassignment-json-file that was output " + "during the --generate option") + CommandLineUtils.checkInvalidArgs(opts.parser, opts.options, opts.executeOpt, Set(opts.topicsToMoveJsonFileOpt, opts.disableRackAware, opts.brokerListOpt)) + } + opts + } + class ReassignPartitionsCommandOptions(args: Array[String]) { val parser = new OptionParser @@ -263,9 +285,6 @@ object ReassignPartitionsCommand extends Logging { .describedAs("throttle") .defaultsTo("-1") .ofType(classOf[Long]) - if(args.length == 0) - CommandLineUtils.printUsageAndDie(parser, "This command moves topic partitions between replicas.") - val options = parser.parse(args : _*) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d6408f6/core/src/main/scala/kafka/utils/CommandLineUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/CommandLineUtils.scala b/core/src/main/scala/kafka/utils/CommandLineUtils.scala index ba5a789..edc3621 100644 --- a/core/src/main/scala/kafka/utils/CommandLineUtils.scala +++ b/core/src/main/scala/kafka/utils/CommandLineUtils.scala @@ -25,6 +25,16 @@ import java.util.Properties */ object CommandLineUtils extends Logging { + trait ExitPolicy { + def exit(msg: String): Nothing + } + + val DEFAULT_EXIT_POLICY = new ExitPolicy { + override def exit(msg: String): Nothing = sys.exit(1) + } + + private var exitPolicy = DEFAULT_EXIT_POLICY + /** * Check that all the listed options are present */ @@ -34,7 +44,7 @@ object CommandLineUtils extends Logging { printUsageAndDie(parser, "Missing required argument \"" + arg + "\"") } } - + /** * Check that none of the listed options are present */ @@ -46,16 +56,18 @@ object CommandLineUtils extends Logging { } } } - + /** * Print usage and exit */ def printUsageAndDie(parser: OptionParser, message: String): Nothing = { System.err.println(message) parser.printHelpOn(System.err) - sys.exit(1) + exitPolicy.exit(message) } + def exitPolicy(policy: ExitPolicy): Unit = this.exitPolicy = policy + /** * Parse key-value pairs in the form key=value */ http://git-wip-us.apache.org/repos/asf/kafka/blob/5d6408f6/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala new file mode 100644 index 0000000..1685130 --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandArgsTest.scala @@ -0,0 +1,232 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import kafka.utils.CommandLineUtils +import kafka.utils.CommandLineUtils.ExitPolicy +import org.junit.Assert.assertTrue +import org.junit.{After, Before, Test} +import org.scalatest.junit.JUnitSuite + +class ReassignPartitionsCommandArgsTest extends JUnitSuite { + + @Before + def setUp() { + CommandLineUtils.exitPolicy(new ExitPolicy { + override def exit(msg: String): Nothing = throw new IllegalArgumentException(msg) + }) + } + + @After + def tearDown() { + CommandLineUtils.exitPolicy(CommandLineUtils.DEFAULT_EXIT_POLICY) + } + + /** + * HAPPY PATH + */ + + @Test + def shouldCorrectlyParseValidMinimumGenerateOptions(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json") + ReassignPartitionsCommand.validateAndParseArgs(args) + } + + @Test + def shouldCorrectlyParseValidMinimumExecuteOptions(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--execute", + "--reassignment-json-file", "myfile.json") + ReassignPartitionsCommand.validateAndParseArgs(args) + } + + @Test + def shouldCorrectlyParseValidMinimumVerifyOptions(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--verify", + "--reassignment-json-file", "myfile.json") + ReassignPartitionsCommand.validateAndParseArgs(args) + } + + @Test + def shouldAllowThrottleOptionOnExecute(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--execute", + "--throttle", "100", + "--reassignment-json-file", "myfile.json") + ReassignPartitionsCommand.validateAndParseArgs(args) + } + + /** + * NO ARGS + */ + + @Test + def shouldFailIfNoArgs(): Unit = { + val args: Array[String]= Array() + shouldFailWith("This command moves topic partitions between replicas.", args) + } + + @Test + def shouldFailIfBlankArg(): Unit = { + val args = Array(" ") + shouldFailWith("Command must include exactly one action: --generate, --execute or --verify", args) + } + + /** + * UNHAPPY PATH: EXECUTE ACTION + */ + + @Test + def shouldNotAllowExecuteWithTopicsOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--execute", + "--reassignment-json-file", "myfile.json", + "--topics-to-move-json-file", "myfile.json") + shouldFailWith("Option \"[execute]\" can't be used with option\"[topics-to-move-json-file]\"", args) + } + + @Test + def shouldNotAllowExecuteWithBrokers(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--execute", + "--reassignment-json-file", "myfile.json", + "--broker-list", "101,102" + ) + shouldFailWith("Option \"[execute]\" can't be used with option\"[broker-list]\"", args) + } + + @Test + def shouldNotAllowExecuteWithoutReassignmentOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--execute") + shouldFailWith("If --execute option is used, command must include --reassignment-json-file that was output during the --generate option", args) + } + + /** + * UNHAPPY PATH: GENERATE ACTION + */ + + @Test + def shouldNotAllowGenerateWithoutBrokersAndTopicsOptions(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--generate") + shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options", args) + } + + @Test + def shouldNotAllowGenerateWithoutBrokersOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--topics-to-move-json-file", "myfile.json", + "--generate") + shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options", args) + } + + @Test + def shouldNotAllowGenerateWithoutTopicsOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--broker-list", "101,102", + "--generate") + shouldFailWith("If --generate option is used, command must include both --topics-to-move-json-file and --broker-list options", args) + } + + @Test + def shouldNotAllowGenerateWithThrottleOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--throttle", "100", + "--topics-to-move-json-file", "myfile.json") + shouldFailWith("Option \"[generate]\" can't be used with option\"[throttle]\"", args) + } + + @Test + def shouldNotAllowGenerateWithReassignmentOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--generate", + "--broker-list", "101,102", + "--topics-to-move-json-file", "myfile.json", + "--reassignment-json-file", "myfile.json") + shouldFailWith("Option \"[generate]\" can't be used with option\"[reassignment-json-file]\"", args) + } + + /** + * UNHAPPY PATH: VERIFY ACTION + */ + + @Test + def shouldNotAllowVerifyWithoutReassignmentOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--verify") + shouldFailWith("If --verify option is used, command must include --reassignment-json-file that was used during the --execute option", args) + } + + @Test + def shouldNotAllowBrokersListWithVerifyOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--verify", + "--broker-list", "100,101", + "--reassignment-json-file", "myfile.json") + shouldFailWith("Option \"[verify]\" can't be used with option\"[broker-list]\"", args) + } + + @Test + def shouldNotAllowThrottleWithVerifyOption(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--verify", + "--throttle", "100", + "--reassignment-json-file", "myfile.json") + shouldFailWith("Option \"[verify]\" can't be used with option\"[throttle]\"", args) + } + + @Test + def shouldNotAllowTopicsOptionWithVerify(): Unit = { + val args = Array( + "--zookeeper", "localhost:1234", + "--verify", + "--reassignment-json-file", "myfile.json", + "--topics-to-move-json-file", "myfile.json") + shouldFailWith("Option \"[verify]\" can't be used with option\"[topics-to-move-json-file]\"", args) + } + + def shouldFailWith(msg: String, args: Array[String]): Unit = { + try { + ReassignPartitionsCommand.validateAndParseArgs(args) + fail(s"Should have failed with [$msg] but no failure occurred.") + } catch { + case e: Exception => assertTrue(s"Expected exception with message:\n[$msg]\nbut was\n[${e.getMessage}]", e.getMessage.startsWith(msg)) + } + } +}
