This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 505adef KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503) 505adef is described below commit 505adefcecc6e7282b73f934aa8c7a59adcb675e Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri Feb 2 16:31:58 2018 -0800 KAFKA-6494; ConfigCommand update to use AdminClient for broker configs (#4503) Use new AdminClient for describing and altering broker configs using ConfigCommand. Broker quota configs as well as other configs will continue to be processed directly using ZooKeeper until KIP-248 is implemented. Reviewers: Manikumar Reddy O <manikumar.re...@gmail.com>, Jason Gustafson <ja...@confluent.io> --- .../common/requests/DescribeConfigsResponse.java | 2 +- .../kafka/common/requests/RequestResponseTest.java | 33 ++++ .../src/main/scala/kafka/admin/ConfigCommand.scala | 173 ++++++++++++++++++--- .../src/main/scala/kafka/server/AdminManager.scala | 33 ++-- .../server/DynamicBrokerReconfigurationTest.scala | 95 ++++++----- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 88 ++++++++++- 6 files changed, 341 insertions(+), 83 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index 62012f4..e463618 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -74,7 +74,7 @@ public class DescribeConfigsResponse extends AbstractResponse { new Field(CONFIG_NAME_KEY_NAME, STRING), new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING), new Field(READ_ONLY_KEY_NAME, BOOLEAN), - new Field(IS_DEFAULT_KEY_NAME, BOOLEAN), + new Field(CONFIG_SOURCE_KEY_NAME, INT8), new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN), new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1))); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index b5420b5..0f7429e 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -65,12 +65,14 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; + import static java.util.Arrays.asList; import static java.util.Collections.singletonList; import static org.apache.kafka.test.TestUtils.toBuffer; @@ -253,6 +255,7 @@ public class RequestResponseTest { checkRequest(createDescribeConfigsRequestWithConfigEntries(1)); checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException()); checkResponse(createDescribeConfigsResponse(), 1); + checkDescribeConfigsResponseVersions(); checkRequest(createCreatePartitionsRequest()); checkRequest(createCreatePartitionsRequestWithAssignments()); checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException()); @@ -288,6 +291,36 @@ public class RequestResponseTest { } } + private void verifyDescribeConfigsResponse(DescribeConfigsResponse expected, DescribeConfigsResponse actual, int version) throws Exception { + for (org.apache.kafka.common.requests.Resource resource : expected.configs().keySet()) { + Collection<DescribeConfigsResponse.ConfigEntry> deserializedEntries1 = actual.config(resource).entries(); + Iterator<DescribeConfigsResponse.ConfigEntry> expectedEntries = expected.config(resource).entries().iterator(); + for (DescribeConfigsResponse.ConfigEntry entry : deserializedEntries1) { + DescribeConfigsResponse.ConfigEntry expectedEntry = expectedEntries.next(); + assertEquals(expectedEntry.name(), entry.name()); + assertEquals(expectedEntry.value(), entry.value()); + assertEquals(expectedEntry.isReadOnly(), entry.isReadOnly()); + assertEquals(expectedEntry.isSensitive(), entry.isSensitive()); + if (version == 1 || (expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG && + expectedEntry.source() != DescribeConfigsResponse.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) + assertEquals(expectedEntry.source(), entry.source()); + else + assertEquals(DescribeConfigsResponse.ConfigSource.STATIC_BROKER_CONFIG, entry.source()); + } + } + } + + private void checkDescribeConfigsResponseVersions() throws Exception { + DescribeConfigsResponse response = createDescribeConfigsResponse(); + DescribeConfigsResponse deserialized0 = (DescribeConfigsResponse) deserialize(response, + response.toStruct((short) 0), (short) 0); + verifyDescribeConfigsResponse(response, deserialized0, 0); + + DescribeConfigsResponse deserialized1 = (DescribeConfigsResponse) deserialize(response, + response.toStruct((short) 1), (short) 1); + verifyDescribeConfigsResponse(response, deserialized1, 1); + } + private void checkErrorResponse(AbstractRequest req, Throwable e) throws Exception { checkResponse(req.getErrorResponse(e), req.version()); } diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index cf01a5f..9034dba 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -17,7 +17,8 @@ package kafka.admin -import java.util.Properties +import java.util.concurrent.TimeUnit +import java.util.{Collections, Properties} import joptsimple._ import kafka.common.Config @@ -27,6 +28,9 @@ import kafka.server.{ConfigEntityName, ConfigType, DynamicConfig} import kafka.utils.CommandLineUtils import kafka.utils.Implicits._ import kafka.zk.{AdminZkClient, KafkaZkClient} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{AlterConfigsOptions, Config => JConfig, ConfigEntry, DescribeConfigsOptions, AdminClient => JAdminClient} +import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram._ import org.apache.kafka.common.utils.{Sanitizer, Time, Utils} @@ -52,6 +56,13 @@ import scala.collection.JavaConverters._ object ConfigCommand extends Config { val DefaultScramIterations = 4096 + // Dynamic broker configs can only be updated using the new AdminClient since they may require + // password encryption currently implemented only in the broker. For consistency with older versions, + // quota-related broker configs can still be updated using ZooKeeper. ConfigCommand will be migrated + // fully to the new AdminClient later (KIP-248). + val BrokerConfigsUpdatableUsingZooKeeper = Set(DynamicConfig.Broker.LeaderReplicationThrottledRateProp, + DynamicConfig.Broker.FollowerReplicationThrottledRateProp, + DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp) def main(args: Array[String]): Unit = { @@ -63,21 +74,26 @@ object ConfigCommand extends Config { opts.checkArgs() val time = Time.SYSTEM - val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000, - Int.MaxValue, time) - val adminZkClient = new AdminZkClient(zkClient) - try { - if (opts.options.has(opts.alterOpt)) - alterConfig(zkClient, opts, adminZkClient) - else if (opts.options.has(opts.describeOpt)) - describeConfig(zkClient, opts, adminZkClient) - } catch { - case e: Throwable => - println("Error while executing config command " + e.getMessage) - println(Utils.stackTrace(e)) - } finally { - zkClient.close() + if (opts.options.has(opts.zkConnectOpt)) { + val zkClient = KafkaZkClient(opts.options.valueOf(opts.zkConnectOpt), JaasUtils.isZkSecurityEnabled, 30000, 30000, + Int.MaxValue, time) + val adminZkClient = new AdminZkClient(zkClient) + + try { + if (opts.options.has(opts.alterOpt)) + alterConfig(zkClient, opts, adminZkClient) + else if (opts.options.has(opts.describeOpt)) + describeConfig(zkClient, opts, adminZkClient) + } catch { + case e: Throwable => + println("Error while executing config command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + zkClient.close() + } + } else { + processBrokerConfig(opts) } } @@ -90,6 +106,10 @@ object ConfigCommand extends Config { if (entityType == ConfigType.User) preProcessScramCredentials(configsToBeAdded) + if (entityType == ConfigType.Broker) { + require(configsToBeAdded.asScala.keySet.forall(BrokerConfigsUpdatableUsingZooKeeper.contains), + s"--bootstrap-server option must be specified to update broker configs $configsToBeAdded") + } // compile the final set of configs val configs = adminZkClient.fetchEntityConfig(entityType, entityName) @@ -172,6 +192,95 @@ object ConfigCommand extends Config { Seq.empty } + private def processBrokerConfig(opts: ConfigCommandOptions): Unit = { + val props = if (opts.options.has(opts.commandConfigOpt)) + Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) + else + new Properties() + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) + val adminClient = JAdminClient.create(props) + val entityName = if (opts.options.has(opts.entityName)) + opts.options.valueOf(opts.entityName) + else if (opts.options.has(opts.entityDefault)) + "" + else + throw new IllegalArgumentException("At least one of --entity-name or --entity-default must be specified with --bootstrap-server") + + val entityTypes = opts.options.valuesOf(opts.entityType).asScala + if (entityTypes.size != 1) + throw new IllegalArgumentException("Exactly one --entity-type must be specified with --bootstrap-server") + if (entityTypes.head != ConfigType.Broker) + throw new IllegalArgumentException(s"--zookeeper option must be specified for entity-type $entityTypes") + + try { + if (opts.options.has(opts.alterOpt)) + alterBrokerConfig(adminClient, opts, entityName) + else if (opts.options.has(opts.describeOpt)) + describeBrokerConfig(adminClient, opts, entityName) + } catch { + case e: Throwable => + println("Error while executing config command " + e.getMessage) + println(Utils.stackTrace(e)) + } finally { + adminClient.close() + } + + } + + private[admin] def alterBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) { + val configsToBeAdded = parseConfigsToBeAdded(opts).asScala.map { case (k, v) => (k, new ConfigEntry(k, v)) } + val configsToBeDeleted = parseConfigsToBeDeleted(opts) + + // compile the final set of configs + val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName) + val oldConfig = brokerConfig(adminClient, entityName, includeSynonyms = false) + .map { entry => (entry.name, entry) }.toMap + + // fail the command if any of the configs to be deleted does not exist + val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) + if (invalidConfigs.nonEmpty) + throw new InvalidConfigException(s"Invalid config(s): ${invalidConfigs.mkString(",")}") + + val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted + val sensitiveEntries = newEntries.filter(_._2.value == null) + if (sensitiveEntries.nonEmpty) + throw new InvalidConfigException(s"All sensitive broker config entries must be specified for --alter, missing entries: ${sensitiveEntries.keySet}") + val newConfig = new JConfig(newEntries.asJava.values) + + val alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false) + adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) + + if (entityName.nonEmpty) + println(s"Completed updating config for broker: $entityName.") + else + println(s"Completed updating default config for brokers in the cluster,") + } + + private def describeBrokerConfig(adminClient: JAdminClient, opts: ConfigCommandOptions, entityName: String) { + val configs = brokerConfig(adminClient, entityName, includeSynonyms = true) + if (entityName.nonEmpty) + println(s"Configs for broker $entityName are:") + else + println(s"Default config for brokers in the cluster are:") + configs.foreach { config => + val synonyms = config.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ") + println(s" ${config.name}=${config.value} sensitive=${config.isSensitive} synonyms={$synonyms}") + } + } + + private def brokerConfig(adminClient: JAdminClient, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = { + val configResource = new ConfigResource(ConfigResource.Type.BROKER, entityName) + val configSource = if (!entityName.isEmpty) + ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG + else + ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG + val describeOpts = new DescribeConfigsOptions().includeSynonyms(includeSynonyms) + val configs = adminClient.describeConfigs(Collections.singleton(configResource), describeOpts).all.get(30, TimeUnit.SECONDS) + configs.get(configResource).entries.asScala + .filter(entry => entry.source == configSource) + .toSeq + } + case class Entity(entityType: String, sanitizedName: Option[String]) { val entityPath = sanitizedName match { case Some(n) => entityType + "/" + n @@ -249,12 +358,16 @@ object ConfigCommand extends Config { } } + private def entityNames(opts: ConfigCommandOptions): Seq[String] = { + val namesIterator = opts.options.valuesOf(opts.entityName).iterator + opts.options.specs.asScala + .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) + .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "") + } + private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = { val types = opts.options.valuesOf(opts.entityType).asScala - val namesIterator = opts.options.valuesOf(opts.entityName).iterator - val names = opts.options.specs.asScala - .filter(spec => spec.options.contains("entity-name") || spec.options.contains("entity-default")) - .map(spec => if (spec.options.contains("entity-name")) namesIterator.next else "") + val names = entityNames(opts) if (opts.options.has(opts.alterOpt) && names.size != types.size) throw new IllegalArgumentException("--entity-name or --entity-default must be specified with each --entity-type for --alter") @@ -285,6 +398,16 @@ object ConfigCommand extends Config { .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val bootstrapServerOpt = parser.accepts("bootstrap-server", "The Kafka server to connect to. " + + "This is required for describing and altering broker configs.") + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) + val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client. " + + "This is used only with --bootstrap-server option for describing and altering broker configs.") + .withRequiredArg + .describedAs("command config property file") + .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers)") @@ -293,7 +416,7 @@ object ConfigCommand extends Config { val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name/broker id)") .withRequiredArg .ofType(classOf[String]) - val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users (applies to corresponding entity type in command line)") + val entityDefault = parser.accepts("entity-default", "Default entity name for clients/users/brokers (applies to corresponding entity type in command line)") val nl = System.getProperty("line.separator") val addConfig = parser.accepts("add-config", "Key Value pairs of configs to add. Square brackets can be used to group values which contain commas: 'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " + @@ -321,14 +444,18 @@ object ConfigCommand extends Config { CommandLineUtils.printUsageAndDie(parser, "Command must include exactly one action: --describe, --alter") // check required args - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType) CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, Set(describeOpt)) CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, Set(alterOpt, addConfig, deleteConfig)) val entityTypeVals = options.valuesOf(entityType).asScala + + if (options.has(bootstrapServerOpt) == options.has(zkConnectOpt)) + throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") + if (entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Topic) || entityTypeVals.contains(ConfigType.User)) + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, entityType) if(options.has(alterOpt)) { - if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client)) { + if (entityTypeVals.contains(ConfigType.User) || entityTypeVals.contains(ConfigType.Client) || entityTypeVals.contains(ConfigType.Broker)) { if (!options.has(entityName) && !options.has(entityDefault)) - throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users/clients") + throw new IllegalArgumentException("--entity-name or --entity-default must be specified with --alter of users, clients or brokers") } else if (!options.has(entityName)) throw new IllegalArgumentException(s"--entity-name must be specified with --alter of ${entityTypeVals}") diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 596dde0..8264f7c 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -284,9 +284,12 @@ class AdminManager(val config: KafkaConfig, def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = { resourceToConfigNames.map { case (resource, configNames) => - def createResponseConfig(config: AbstractConfig, createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = { - val allConfigs = config.originals.asScala.filter(_._2 != null) ++ config.values.asScala - val filteredConfigPairs = allConfigs.filter { case (configName, _) => + def allConfigs(config: AbstractConfig) = { + config.originals.asScala.filter(_._2 != null) ++ config.values.asScala + } + def createResponseConfig(configs: Map[String, Any], + createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = { + val filteredConfigPairs = configs.filter { case (configName, _) => /* Always returns true if configNames is None */ configNames.forall(_.contains(configName)) }.toIndexedSeq @@ -304,14 +307,17 @@ class AdminManager(val config: KafkaConfig, // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic) val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps) - createResponseConfig(logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms)) + createResponseConfig(allConfigs(logConfig), createTopicConfigEntry(logConfig, topicProps, includeSynonyms)) case ResourceType.BROKER => - val brokerId = resourceNameToBrokerId(resource.name) - if (brokerId == config.brokerId) - createResponseConfig(config, createBrokerConfigEntry(includeSynonyms)) + if (resource.name == null || resource.name.isEmpty) + createResponseConfig(config.dynamicConfig.currentDynamicDefaultConfigs, + createBrokerConfigEntry(perBrokerConfig = false, includeSynonyms)) + else if (resourceNameToBrokerId(resource.name) == config.brokerId) + createResponseConfig(allConfigs(config), + createBrokerConfigEntry(perBrokerConfig = true, includeSynonyms)) else - throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId") + throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId} or empty string, but received $resource.name") case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") } @@ -361,8 +367,12 @@ class AdminManager(val config: KafkaConfig, case ResourceType.BROKER => val brokerId = if (resource.name == null || resource.name.isEmpty) None - else - Some(resourceNameToBrokerId(resource.name)) + else { + val id = resourceNameToBrokerId(resource.name) + if (id != this.config.brokerId) + throw new InvalidRequestException(s"Unexpected broker id, expected ${this.config.brokerId}, but received $resource.name") + Some(id) + } val configProps = new Properties config.entries.asScala.foreach { configEntry => configProps.setProperty(configEntry.name, configEntry.value) @@ -459,13 +469,14 @@ class AdminManager(val config: KafkaConfig, new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava) } - private def createBrokerConfigEntry(includeSynonyms: Boolean) + private def createBrokerConfigEntry(perBrokerConfig: Boolean, includeSynonyms: Boolean) (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = { val allNames = brokerSynonyms(name) val configEntryType = configType(name, allNames) val isSensitive = configEntryType == ConfigDef.Type.PASSWORD val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType) val allSynonyms = configSynonyms(name, allNames, isSensitive) + .filter(perBrokerConfig || _.source == ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG) val synonyms = if (!includeSynonyms) List.empty else allSynonyms val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 49d9953..1224274 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -18,8 +18,7 @@ package kafka.server -import java.io.Closeable -import java.io.File +import java.io.{Closeable, File, FileOutputStream, FileWriter} import java.nio.file.{Files, StandardCopyOption} import java.lang.management.ManagementFactory import java.util @@ -27,6 +26,7 @@ import java.util.{Collections, Properties} import java.util.concurrent.{ConcurrentLinkedQueue, ExecutionException, TimeUnit} import javax.management.ObjectName +import kafka.admin.ConfigCommand import kafka.api.SaslSetup import kafka.log.LogConfig import kafka.coordinator.group.OffsetConfig @@ -136,32 +136,6 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } @Test - def testKeystoreUpdate(): Unit = { - val producer = createProducer(trustStoreFile1, retries = 0) - val consumer = createConsumer("group1", trustStoreFile1) - verifyProduceConsume(producer, consumer, 10) - - // Producer with new truststore should fail to connect before keystore update - val producer2 = createProducer(trustStoreFile2, retries = 0) - verifyAuthenticationFailure(producer2) - - // Update broker keystore - configureDynamicKeystoreInZooKeeper(servers.head.config, servers.map(_.config.brokerId), sslProperties2) - waitForKeystore(sslProperties2) - - // New producer with old truststore should fail to connect - val producer1 = createProducer(trustStoreFile1, retries = 0) - verifyAuthenticationFailure(producer1) - - // New producer with new truststore should work - val producer3 = createProducer(trustStoreFile2, retries = 0) - verifyProduceConsume(producer3, consumer, 10) - - // Old producer with old truststore should continue to work (with their old connections) - verifyProduceConsume(producer, consumer, 10) - } - - @Test def testKeyStoreDescribeUsingAdminClient(): Unit = { def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = { @@ -220,7 +194,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet } @Test - def testKeyStoreAlterUsingAdminClient(): Unit = { + def testKeyStoreAlter(): Unit = { val topic2 = "testtopic2" TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers) @@ -229,20 +203,28 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val (producerThread, consumerThread) = startProduceConsume(retries = 0) TestUtils.waitUntilTrue(() => consumerThread.received >= 10, "Messages not received") + // Producer with new truststore should fail to connect before keystore update + val producer1 = createProducer(trustStoreFile2, retries = 0) + verifyAuthenticationFailure(producer1) + // Update broker keystore for external listener - val adminClient = adminClients.head - alterSslKeystore(adminClient, sslProperties2, SecureExternal) + alterSslKeystoreUsingConfigCommand(sslProperties2, SecureExternal) - // Produce/consume should work with new truststore + // New producer with old truststore should fail to connect + val producer2 = createProducer(trustStoreFile1, retries = 0) + verifyAuthenticationFailure(producer2) + + // Produce/consume should work with new truststore with new producer/consumer val producer = createProducer(trustStoreFile2, retries = 0) val consumer = createConsumer("group1", trustStoreFile2, topic2) verifyProduceConsume(producer, consumer, 10, topic2) // Broker keystore update for internal listener with incompatible keystore should fail without update + val adminClient = adminClients.head alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true) verifyProduceConsume(producer, consumer, 10, topic2) - // Broker keystore update for internal listener with incompatible keystore should succeed + // Broker keystore update for internal listener with compatible keystore should succeed val sslPropertiesCopy = sslProperties1.clone().asInstanceOf[Properties] val oldFile = new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)) val newFile = File.createTempFile("keystore", ".jks") @@ -613,15 +595,48 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet configDescription } + private def sslProperties(props: Properties, configPrefix: String): Properties = { + val sslProps = new Properties + sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)) + sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG)) + sslProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value) + sslProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value) + sslProps + } + private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean = false): Unit = { - val newProps = new Properties val configPrefix = new ListenerName(listener).configPrefix - val keystoreLocation = props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG) - newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation) - newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG)) - newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value) - newProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value) - reconfigureServers(newProps, perBrokerConfig = true, (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation), expectFailure) + val newProps = sslProperties(props, configPrefix) + reconfigureServers(newProps, perBrokerConfig = true, + (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)), expectFailure) + } + + private def alterSslKeystoreUsingConfigCommand(props: Properties, listener: String): Unit = { + val configPrefix = new ListenerName(listener).configPrefix + val newProps = sslProperties(props, configPrefix) + + val securityProps: util.Map[Object, Object] = TestUtils.adminClientSecurityConfigs(SecurityProtocol.SSL, Some(trustStoreFile1), None) + val propsFile = TestUtils.tempFile() + val propsWriter = new FileWriter(propsFile) + try { + securityProps.asScala.foreach { + case (k, v: Password) => propsWriter.write(s"$k=${v.value}\n") + case (k, v: util.List[_]) => propsWriter.write(s"""$k=${v.asScala.mkString(",")}\n""") + case (k, v) => propsWriter.write(s"$k=$v\n") + } + } finally { + propsWriter.close() + } + + servers.foreach { server => + val args = Array("--bootstrap-server", TestUtils.bootstrapServers(servers, new ListenerName(SecureInternal)), + "--command-config", propsFile.getAbsolutePath, + "--alter", "--add-config", newProps.asScala.map { case (k, v) => s"$k=$v" }.mkString(","), + "--entity-type", "brokers", + "--entity-name", server.config.brokerId.toString) + ConfigCommand.main(args) + } + waitForConfig(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)) } private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = { diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index acac907..6e78423 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -16,6 +16,7 @@ */ package kafka.admin +import java.util import java.util.Properties import kafka.admin.ConfigCommand.ConfigCommandOptions @@ -23,9 +24,13 @@ import kafka.common.InvalidConfigException import kafka.server.ConfigEntityName import kafka.utils.Logging import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness} +import org.apache.kafka.clients.admin._ +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.internals.KafkaFutureImpl +import org.apache.kafka.common.{KafkaFuture, Node} import org.apache.kafka.common.security.scram.ScramCredentialUtils import org.apache.kafka.common.utils.Sanitizer -import org.easymock.EasyMock +import org.easymock.{EasyMock, IAnswer} import org.junit.Assert._ import org.junit.Test @@ -137,22 +142,79 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test - def shouldAddBrokerConfig(): Unit = { - val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + def shouldAddBrokerQuotaConfig(): Unit = { + val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, "--entity-name", "1", "--entity-type", "brokers", "--alter", - "--add-config", "a=b,c=d")) + "--add-config", "leader.replication.throttled.rate=10,follower.replication.throttled.rate=20")) case class TestAdminZkClient(val zkClient: KafkaZkClient) extends AdminZkClient(zkClient) { override def changeBrokerConfig(brokerIds: Seq[Int], configChange: Properties): Unit = { assertEquals(Seq(1), brokerIds) - assertEquals("b", configChange.get("a")) - assertEquals("d", configChange.get("c")) + assertEquals("10", configChange.get("leader.replication.throttled.rate")) + assertEquals("20", configChange.get("follower.replication.throttled.rate")) } } - ConfigCommand.alterConfig(null, createOpts, new TestAdminZkClient(zkClient)) + ConfigCommand.alterConfig(null, alterOpts, new TestAdminZkClient(zkClient)) + } + + @Test + def shouldAddBrokerDynamicConfig(): Unit = { + val node = new Node(1, "localhost", 9092) + verifyAlterBrokerConfig(node, "1", List("--entity-name", "1")) + } + + @Test + def shouldAddDefaultBrokerDynamicConfig(): Unit = { + val node = new Node(1, "localhost", 9092) + verifyAlterBrokerConfig(node, "", List("--entity-default")) + } + + def verifyAlterBrokerConfig(node: Node, resourceName: String, resourceOpts: List[String]): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", "brokers", + "--alter", + "--add-config", "message.max.bytes=10") ++ resourceOpts + val alterOpts = new ConfigCommandOptions(optsList.toArray) + val brokerConfigs = mutable.Map[String, String]("num.io.threads" -> "5") + + val resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName) + val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5")) + val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]] + future.complete(util.Collections.singletonMap(resource, new Config(configEntries))) + val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult]) + EasyMock.expect(describeResult.all()).andReturn(future).once() + + val alterFuture = new KafkaFutureImpl[Void] + alterFuture.complete(null) + val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult]) + EasyMock.expect(alterResult.all()).andReturn(alterFuture) + + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) { + override def describeConfigs(resources: util.Collection[ConfigResource], options: DescribeConfigsOptions): DescribeConfigsResult = { + assertEquals(1, resources.size) + val resource = resources.iterator.next + assertEquals(ConfigResource.Type.BROKER, resource.`type`) + assertEquals(resourceName, resource.name) + describeResult + } + + override def alterConfigs(configs: util.Map[ConfigResource, Config], options: AlterConfigsOptions): AlterConfigsResult = { + assertEquals(1, configs.size) + val entry = configs.entrySet.iterator.next + val resource = entry.getKey + val config = entry.getValue + assertEquals(ConfigResource.Type.BROKER, resource.`type`) + config.entries.asScala.foreach { e => brokerConfigs.put(e.name, e.value) } + alterResult + } + } + EasyMock.replay(alterResult, describeResult) + ConfigCommand.alterBrokerConfig(mockAdminClient, alterOpts, resourceName) + assertEquals(Map("message.max.bytes" -> "10", "num.io.threads" -> "5"), brokerConfigs.toMap) + EasyMock.reset(alterResult, describeResult) } @Test @@ -183,7 +245,17 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { "--entity-name", "1,2,3", //Don't support multiple brokers currently "--entity-type", "brokers", "--alter", - "--add-config", "a=b")) + "--add-config", "leader.replication.throttled.rate=10")) + ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) + } + + @Test (expected = classOf[IllegalArgumentException]) + def shouldNotUpdateDynamicBrokerConfigUsingZooKeeper(): Unit = { + val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, + "--entity-name", "1", + "--entity-type", "brokers", + "--alter", + "--add-config", "message.max.size=100000")) ConfigCommand.alterConfig(null, createOpts, new DummyAdminZkClient(zkClient)) } -- To stop receiving notification emails like this one, please contact j...@apache.org.