This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 38f2faf83f7 KAFKA-15681: Add support of client-metrics in
kafka-configs.sh (KIP-714) (#14632)
38f2faf83f7 is described below
commit 38f2faf83f7e83823767aa71b8fd0850dae93b6e
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Nov 28 22:54:25 2023 +0530
KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714)
(#14632)
The PR adds support of alter/describe configs for client-metrics as defined
in KIP-714
Reviewers: Andrew Schofield <[email protected]>, Jun Rao
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/admin/MockAdminClient.java | 37 +++++++
.../src/main/scala/kafka/admin/ConfigCommand.scala | 38 +++++--
.../main/scala/kafka/server/DynamicConfig.scala | 7 ++
.../main/scala/kafka/server/ZkConfigManager.scala | 3 +-
.../kafka/server/metadata/ZkConfigRepository.scala | 2 +
.../scala/unit/kafka/admin/ConfigCommandTest.scala | 117 +++++++++++++++++++++
.../server/metadata/ZkConfigRepositoryTest.scala | 5 +-
8 files changed, 200 insertions(+), 11 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cd6ac4a13dc..bcdfede03f3 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
<suppress checks="CyclomaticComplexity"
-
files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+
files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
<suppress checks="JavaNCSS"
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 515b02e6965..52188c68df4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -90,6 +90,7 @@ public class MockAdminClient extends AdminClient {
private final String clusterId;
private final List<List<String>> brokerLogDirs;
private final List<Map<String, String>> brokerConfigs;
+ private final Map<String, Map<String, String>> clientMetricsConfigs;
private Node controller;
private int timeoutNextRequests = 0;
@@ -238,6 +239,7 @@ public class MockAdminClient extends AdminClient {
this.defaultReplicationFactor = defaultReplicationFactor;
this.brokerLogDirs = brokerLogDirs;
this.brokerConfigs = new ArrayList<>();
+ this.clientMetricsConfigs = new HashMap<>();
for (int i = 0; i < brokers.size(); i++) {
final Map<String, String> config = new HashMap<>();
config.put("default.replication.factor",
String.valueOf(defaultReplicationFactor));
@@ -823,6 +825,13 @@ public class MockAdminClient extends AdminClient {
}
throw new UnknownTopicOrPartitionException("Resource " +
resource + " not found.");
}
+ case CLIENT_METRICS: {
+ String resourceName = resource.name();
+ if (resourceName.isEmpty()) {
+ throw new InvalidRequestException("Empty resource name");
+ }
+ return toConfigObject(clientMetricsConfigs.get(resourceName));
+ }
default:
throw new UnsupportedOperationException("Not implemented yet");
}
@@ -916,6 +925,34 @@ public class MockAdminClient extends AdminClient {
topicMetadata.configs = newMap;
return null;
}
+ case CLIENT_METRICS: {
+ String resourceName = resource.name();
+
+ if (resourceName.isEmpty()) {
+ return new InvalidRequestException("Empty resource name");
+ }
+
+ if (!clientMetricsConfigs.containsKey(resourceName)) {
+ clientMetricsConfigs.put(resourceName, new HashMap<>());
+ }
+
+ HashMap<String, String> newMap = new
HashMap<>(clientMetricsConfigs.get(resourceName));
+ for (AlterConfigOp op : ops) {
+ switch (op.opType()) {
+ case SET:
+ newMap.put(op.configEntry().name(),
op.configEntry().value());
+ break;
+ case DELETE:
+ newMap.remove(op.configEntry().name());
+ break;
+ default:
+ return new InvalidRequestException(
+ "Unsupported op type " + op.opType());
+ }
+ }
+ clientMetricsConfigs.put(resourceName, newMap);
+ return null;
+ }
default:
return new UnsupportedOperationException();
}
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 25d400918f6..37d41458c39 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions,
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions,
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo,
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig,
ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.errors.{InvalidConfigurationException,
InvalidRequestException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.quota.{ClientQuotaAlteration,
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
import org.apache.kafka.common.security.JaasUtils
@@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._
import scala.collection._
/**
- * This script can be used to change configs for
topics/clients/users/brokers/ips dynamically
+ * This script can be used to change configs for
topics/clients/users/brokers/ips/client-metrics dynamically
* An entity described or altered by the command may be one of:
* <ul>
* <li> topic: --topic <topic> OR --entity-type topics --entity-name
<topic>
@@ -55,6 +55,7 @@ import scala.collection._
* <li> broker: --broker <broker-id> OR --entity-type brokers
--entity-name <broker-id>
* <li> broker-logger: --broker-logger <broker-id> OR --entity-type
broker-loggers --entity-name <broker-id>
* <li> ip: --ip <ip> OR --entity-type ips --entity-name <ip>
+ * <li> client-metrics: --client-metrics <name> OR --entity-type
client-metrics --entity-name <name>
* </ul>
* --entity-type <users|clients|brokers|ips> --entity-default may be specified
in place of --entity-type <users|clients|brokers|ips> --entity-name <entityName>
* when describing or altering default configuration for users, clients,
brokers, or ips, respectively.
@@ -76,7 +77,7 @@ object ConfigCommand extends Logging {
val BrokerDefaultEntityName = ""
val BrokerLoggerConfigType = "broker-loggers"
- val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
+ val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+
ConfigType.ClientMetrics
val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
val DefaultScramIterations = 4096
@@ -84,7 +85,7 @@ object ConfigCommand extends Logging {
try {
val opts = new ConfigCommandOptions(args)
- CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
manipulate and describe entity config for a topic, client, user, broker or ip")
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
manipulate and describe entity config for a topic, client, user, broker, ip or
client-metrics")
opts.checkArgs()
@@ -445,6 +446,21 @@ object ConfigCommand extends Logging {
if (unknownConfigs.nonEmpty)
throw new IllegalArgumentException(s"Only connection quota configs
can be added for '${ConfigType.Ip}' using --bootstrap-server. Unexpected config
names: ${unknownConfigs.mkString(",")}")
alterQuotaConfigs(adminClient, entityTypes, entityNames,
configsToBeAddedMap, configsToBeDeleted)
+ case ConfigType.ClientMetrics =>
+ val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
+ .map { entry => (entry.name, entry) }.toMap
+
+ // fail the command if any of the configs to be deleted does not exist
+ val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+ if (invalidConfigs.nonEmpty)
+ throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
+
+ val configResource = new
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
+ val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+ val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_,
AlterConfigOp.OpType.SET))
+ ++ configsToBeDeleted.map { k => new AlterConfigOp(new
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+ ).asJavaCollection
+ adminClient.incrementalAlterConfigs(Map(configResource ->
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
case _ => throw new IllegalArgumentException(s"Unsupported entity type:
$entityTypeHead")
}
@@ -518,7 +534,7 @@ object ConfigCommand extends Logging {
val describeAll = opts.options.has(opts.allOpt)
entityTypes.head match {
- case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType =>
+ case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType |
ConfigType.ClientMetrics =>
describeResourceConfig(adminClient, entityTypes.head,
entityNames.headOption, describeAll)
case ConfigType.User | ConfigType.Client =>
describeClientQuotaAndUserScramCredentialConfigs(adminClient,
entityTypes, entityNames)
@@ -536,6 +552,8 @@ object ConfigCommand extends Logging {
adminClient.listTopics(new
ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
case ConfigType.Broker | BrokerLoggerConfigType =>
adminClient.describeCluster(new
DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+
BrokerDefaultEntityName
+ case ConfigType.ClientMetrics =>
+ throw new InvalidRequestException("Client metrics entity-name is
required")
case entityType => throw new IllegalArgumentException(s"Invalid entity
type: $entityType")
})
@@ -576,6 +594,8 @@ object ConfigCommand extends Logging {
if (entityName.nonEmpty)
validateBrokerId()
(ConfigResource.Type.BROKER_LOGGER, None)
+ case ConfigType.ClientMetrics =>
+ (ConfigResource.Type.CLIENT_METRICS,
Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))
case entityType => throw new IllegalArgumentException(s"Invalid entity
type: $entityType")
}
@@ -791,10 +811,10 @@ object ConfigCommand extends Logging {
val describeOpt = parser.accepts("describe", "List configs for the given
entity.")
val allOpt = parser.accepts("all", "List all configs for the given topic,
broker, or broker-logger entity (includes static configuration when the entity
type is brokers)")
- val entityType = parser.accepts("entity-type", "Type of entity
(topics/clients/users/brokers/broker-loggers/ips)")
+ val entityType = parser.accepts("entity-type", "Type of entity
(topics/clients/users/brokers/broker-loggers/ips/client-metrics)")
.withRequiredArg
.ofType(classOf[String])
- val entityName = parser.accepts("entity-name", "Name of entity (topic
name/client id/user principal name/broker id/ip)")
+ val entityName = parser.accepts("entity-name", "Name of entity (topic
name/client id/user principal name/broker id/ip/client metrics)")
.withRequiredArg
.ofType(classOf[String])
val entityDefault = parser.accepts("entity-default", "Default entity name
for clients/users/brokers/ips (applies to corresponding entity type in command
line)")
@@ -806,6 +826,7 @@ object ConfigCommand extends Logging {
"For entity-type '" + ConfigType.User + "': " +
DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl,
nl) +
"For entity-type '" + ConfigType.Client + "': " +
DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl,
nl) +
"For entity-type '" + ConfigType.Ip + "': " +
DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
+ "For entity-type '" + ConfigType.ClientMetrics + "': " +
DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" +
_).mkString(nl, nl, nl) +
s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be
specified together to update config for clients of a specific user.")
.withRequiredArg
.ofType(classOf[String])
@@ -939,7 +960,8 @@ object ConfigCommand extends Logging {
}
}
- if (options.has(describeOpt) &&
entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
+ if (options.has(describeOpt) &&
(entityTypeVals.contains(BrokerLoggerConfigType) ||
+ entityTypeVals.contains(ConfigType.ClientMetrics)) && !hasEntityName)
throw new IllegalArgumentException(s"an entity name must be specified
with --describe of ${entityTypeVals.mkString(",")}")
if (options.has(alterOpt)) {
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala
b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 8af2dece042..d91a06d8bd3 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import org.apache.kafka.storage.internals.log.LogConfig
+import java.util
import scala.jdk.CollectionConverters._
/**
@@ -111,6 +112,12 @@ object DynamicConfig {
}
}
+ object ClientMetrics {
+ private val clientConfigs =
org.apache.kafka.server.metrics.ClientMetricsConfigs.configDef()
+
+ def names: util.Set[String] = clientConfigs.names
+ }
+
private def validate(configDef: ConfigDef, props: Properties,
customPropsAllowed: Boolean) = {
// Validate Names
val names = configDef.names()
diff --git a/core/src/main/scala/kafka/server/ZkConfigManager.scala
b/core/src/main/scala/kafka/server/ZkConfigManager.scala
index c7820859807..abead88b07c 100644
--- a/core/src/main/scala/kafka/server/ZkConfigManager.scala
+++ b/core/src/main/scala/kafka/server/ZkConfigManager.scala
@@ -39,7 +39,8 @@ object ConfigType {
val Broker = "brokers"
val Ip = "ips"
val ClientMetrics = "client-metrics"
- val all = Seq(Topic, Client, User, Broker, Ip, ClientMetrics)
+ // Do not include ClientMetrics in `all` as ClientMetrics is not supported
on ZK.
+ val all = Seq(Topic, Client, User, Broker, Ip)
}
object ConfigEntityName {
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 16842bcd11f..a2aabe656c5 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -23,6 +23,7 @@ import kafka.server.{ConfigEntityName, ConfigType}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.errors.InvalidRequestException
object ZkConfigRepository {
@@ -35,6 +36,7 @@ class ZkConfigRepository(adminZkClient: AdminZkClient)
extends ConfigRepository
val configTypeForZk = configResource.`type` match {
case Type.TOPIC => ConfigType.Topic
case Type.BROKER => ConfigType.Broker
+ case Type.CLIENT_METRICS => throw new InvalidRequestException("Config
type client-metrics is only supported on KRaft clusters")
case tpe => throw new IllegalArgumentException(s"Unsupported config
type: $tpe")
}
// ZK stores cluster configs under "<default>".
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 12e460a0bd3..069b79a5e0d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -1628,6 +1628,123 @@ class ConfigCommandTest extends Logging {
Seq("<default>/clients/client-3", sanitizedPrincipal +
"/clients/client-2"))
}
+ @Test
+ def shouldAlterClientMetricsConfig(): Unit = {
+ val node = new Node(1, "localhost", 9092)
+ verifyAlterClientMetricsConfig(node, "1", List("--entity-name", "1"))
+ }
+
+ private def verifyAlterClientMetricsConfig(node: Node, resourceName: String,
resourceOpts: List[String]): Unit = {
+ val optsList = List("--bootstrap-server", "localhost:9092",
+ "--entity-type", "client-metrics",
+ "--alter",
+ "--delete-config", "interval.ms",
+ "--add-config", "metrics=org.apache.kafka.consumer.," +
+
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]")
++ resourceOpts
+ val alterOpts = new ConfigCommandOptions(optsList.toArray)
+
+ val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS,
resourceName)
+ val configEntries = util.Collections.singletonList(new
ConfigEntry("interval.ms", "1000",
+ ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false,
util.Collections.emptyList[ConfigEntry.ConfigSynonym],
+ ConfigEntry.ConfigType.UNKNOWN, null))
+ val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+ future.complete(util.Collections.singletonMap(resource, new
Config(configEntries)))
+ val describeResult: DescribeConfigsResult =
mock(classOf[DescribeConfigsResult])
+ when(describeResult.all()).thenReturn(future)
+
+ val alterFuture = new KafkaFutureImpl[Void]
+ alterFuture.complete(null)
+ val alterResult: AlterConfigsResult = mock(classOf[AlterConfigsResult])
+ when(alterResult.all()).thenReturn(alterFuture)
+
+ val mockAdminClient = new
MockAdminClient(util.Collections.singletonList(node), node) {
+ override def describeConfigs(resources: util.Collection[ConfigResource],
options: DescribeConfigsOptions): DescribeConfigsResult = {
+ assertFalse(options.includeSynonyms(), "Config synonyms requested
unnecessarily")
+ assertEquals(1, resources.size)
+ val resource = resources.iterator.next
+ assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+ assertEquals(resourceName, resource.name)
+ describeResult
+ }
+
+ override def incrementalAlterConfigs(configs: util.Map[ConfigResource,
util.Collection[AlterConfigOp]], options: AlterConfigsOptions):
AlterConfigsResult = {
+ assertEquals(1, configs.size)
+ val entry = configs.entrySet.iterator.next
+ val resource = entry.getKey
+ val alterConfigOps = entry.getValue
+ assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+ assertEquals(3, alterConfigOps.size)
+
+ val expectedConfigOps = List(
+ new AlterConfigOp(new ConfigEntry("match",
"client_software_name=kafka.python,client_software_version=1\\.2\\..*"),
AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("metrics",
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
+ new AlterConfigOp(new ConfigEntry("interval.ms", ""),
AlterConfigOp.OpType.DELETE)
+ )
+ assertEquals(expectedConfigOps, alterConfigOps.asScala.toList)
+ alterResult
+ }
+ }
+ ConfigCommand.alterConfig(mockAdminClient, alterOpts)
+ verify(describeResult).all()
+ verify(alterResult).all()
+ }
+
+ @Test
+ def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+ val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "client-metrics",
+ "--describe"))
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () =>
describeOpts.checkArgs())
+ assertEquals("an entity name must be specified with --describe of
client-metrics", exception.getMessage)
+ }
+
+ @Test
+ def shouldNotAlterClientMetricsConfigWithoutEntityName(): Unit = {
+ val alterOpts = new ConfigCommandOptions(Array("--bootstrap-server",
"localhost:9092",
+ "--entity-type", "client-metrics",
+ "--alter",
+ "--add-config", "interval.ms=1000"))
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () =>
alterOpts.checkArgs())
+ assertEquals("an entity name must be specified with --alter of
client-metrics", exception.getMessage)
+ }
+
+ @Test
+ def shouldNotSupportAlterClientMetricsWithZookeeperArg(): Unit = {
+ val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "sub",
+ "--entity-type", "client-metrics",
+ "--alter",
+ "--add-config", "interval.ms=1000"))
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () =>
alterOpts.checkArgs())
+ assertEquals("Invalid entity type client-metrics, the entity type must be
one of users, brokers with a --zookeeper argument", exception.getMessage)
+ }
+
+ @Test
+ def shouldNotSupportDescribeClientMetricsWithZookeeperArg(): Unit = {
+ val describeOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "sub",
+ "--entity-type", "client-metrics",
+ "--describe"))
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () =>
describeOpts.checkArgs())
+ assertEquals("Invalid entity type client-metrics, the entity type must be
one of users, brokers with a --zookeeper argument", exception.getMessage)
+ }
+
+ @Test
+ def shouldNotSupportAlterClientMetricsWithZookeeper(): Unit = {
+ val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+ "--entity-name", "sub",
+ "--entity-type", "client-metrics",
+ "--alter",
+ "--add-config", "interval.ms=1000"))
+
+ val exception = assertThrows(classOf[IllegalArgumentException], () =>
ConfigCommand.alterConfigWithZk(null, alterOpts, dummyAdminZkClient))
+ assertEquals("client-metrics is not a known entityType. Should be one of
List(topics, clients, users, brokers, ips)", exception.getMessage)
+ }
+
class DummyAdminZkClient(zkClient: KafkaZkClient) extends
AdminZkClient(zkClient) {
override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties):
Unit = {}
override def fetchEntityConfig(entityType: String, entityName: String):
Properties = {new Properties}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
index f8737751fa5..fbb2c9e8cf9 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
@@ -22,6 +22,7 @@ import kafka.server.metadata.ZkConfigRepository
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.errors.InvalidRequestException
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.api.Test
import org.mockito.Mockito.{mock, when}
@@ -48,7 +49,9 @@ class ZkConfigRepositoryTest {
def testUnsupportedTypes(): Unit = {
val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
val zkConfigRepository = ZkConfigRepository(zkClient)
- Type.values().foreach(value => if (value != Type.BROKER && value !=
Type.TOPIC)
+ Type.values().foreach(value => if (value != Type.BROKER && value !=
Type.TOPIC && value != Type.CLIENT_METRICS)
assertThrows(classOf[IllegalArgumentException], () =>
zkConfigRepository.config(new ConfigResource(value, value.toString))))
+ // Validate exception for CLIENT_METRICS.
+ assertThrows(classOf[InvalidRequestException], () =>
zkConfigRepository.config(new ConfigResource(Type.CLIENT_METRICS,
Type.CLIENT_METRICS.toString)))
}
}