This is an automated email from the ASF dual-hosted git repository.
davidarthur pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 3f7af52 KAFKA-9661: Propagate includeSynonyms option to AdminClient
in ConfigCommand (#8229)
3f7af52 is described below
commit 3f7af5209705e73c3c6f030030288c21dcb011a9
Author: David Arthur <[email protected]>
AuthorDate: Thu Mar 5 11:46:37 2020 -0500
KAFKA-9661: Propagate includeSynonyms option to AdminClient in
ConfigCommand (#8229)
---
.../src/main/scala/kafka/admin/ConfigCommand.scala | 8 +++--
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 34 ++++++++++++++++++++--
2 files changed, 37 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index d8fc59d..562a91a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -28,7 +28,7 @@ import kafka.utils.{CommandDefaultOptions, CommandLineUtils,
Exit, PasswordEncod
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp,
AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, ListTopicsOptions,
Config => JConfig}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp,
AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, Config => JConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.errors.InvalidConfigurationException
@@ -367,7 +367,7 @@ object ConfigCommand extends Config {
println(s"Completed updating default config for $entityType in the
cluster.")
}
- private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions):
Unit = {
+ private[admin] def describeConfig(adminClient: Admin, opts:
ConfigCommandOptions): Unit = {
val entityType = opts.entityTypes.head
val entityName = opts.entityNames.headOption
val describeAll = opts.options.has(opts.allOpt)
@@ -426,7 +426,9 @@ object ConfigCommand extends Config {
dynamicConfigSource
val configResource = new ConfigResource(configResourceType, entityName)
- val configs =
adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30,
TimeUnit.SECONDS)
+ val describeOptions = new
DescribeConfigsOptions().includeSynonyms(includeSynonyms)
+ val configs =
adminClient.describeConfigs(Collections.singleton(configResource),
describeOptions)
+ .all.get(30, TimeUnit.SECONDS)
configs.get(configResource).entries.asScala
.filter(entry => configSourceFilter match {
case Some(configSource) => entry.source == configSource
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index a1f5f39..e938a6d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -359,6 +359,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
val node = new Node(1, "localhost", 9092)
val mockAdminClient = new
MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertFalse("Config synonyms requested unnecessarily",
options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(resource.`type`, ConfigResource.Type.TOPIC)
@@ -391,6 +392,34 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
}
@Test
+ def shouldDescribeConfigSynonyms(): Unit = {
+ val resourceName = "my-topic"
+ val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server",
"localhost:9092",
+ "--entity-name", resourceName,
+ "--entity-type", "topics",
+ "--describe",
+ "--all"))
+
+ val resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName)
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ future.complete(util.Collections.singletonMap(resource, new
Config(util.Collections.emptyList[ConfigEntry])))
+ val describeResult: DescribeConfigsResult =
EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+ EasyMock.expect(describeResult.all()).andReturn(future).once()
+
+ val node = new Node(1, "localhost", 9092)
+ val mockAdminClient = new
MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertTrue("Synonyms not requested", options.includeSynonyms())
+ assertEquals(Set(resource), resources.asScala.toSet)
+ describeResult
+ }
+ }
+ EasyMock.replay(describeResult)
+ ConfigCommand.describeConfig(mockAdminClient, describeOpts)
+ EasyMock.reset(describeResult)
+ }
+
+ @Test
def shouldAddBrokerQuotaConfig(): Unit = {
val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
"--entity-name", "1",
@@ -539,6 +568,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
val mockAdminClient = new
MockAdminClient(util.Collections.singletonList(node), node) {
override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertFalse("Config synonyms requested unnecessarily",
options.includeSynonyms())
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER, resource.`type`)
@@ -585,7 +615,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
EasyMock.expect(alterResult.all()).andReturn(alterFuture)
val mockAdminClient = new
MockAdminClient(util.Collections.singletonList(node), node) {
- override def describeConfigs(resources:
util.Collection[ConfigResource]): DescribeConfigsResult = {
+ override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult = {
assertEquals(1, resources.size)
val resource = resources.iterator.next
assertEquals(ConfigResource.Type.BROKER_LOGGER, resource.`type`)
@@ -1098,7 +1128,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with
Logging {
}
class DummyAdminClient(node: Node) extends
MockAdminClient(util.Collections.singletonList(node), node) {
- override def describeConfigs(resources: util.Collection[ConfigResource]):
DescribeConfigsResult =
+ override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult =
EasyMock.createNiceMock(classOf[DescribeConfigsResult])
override def incrementalAlterConfigs(configs: util.Map[ConfigResource,
util.Collection[AlterConfigOp]],
options: AlterConfigsOptions): AlterConfigsResult =
EasyMock.createNiceMock(classOf[AlterConfigsResult])