This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 38f2faf83f7 KAFKA-15681: Add support of client-metrics in 
kafka-configs.sh (KIP-714) (#14632)
38f2faf83f7 is described below

commit 38f2faf83f7e83823767aa71b8fd0850dae93b6e
Author: Apoorv Mittal <[email protected]>
AuthorDate: Tue Nov 28 22:54:25 2023 +0530

    KAFKA-15681: Add support of client-metrics in kafka-configs.sh (KIP-714) 
(#14632)
    
    The PR adds support of alter/describe configs for client-metrics as defined 
in KIP-714
    
    Reviewers: Andrew Schofield <[email protected]>, Jun Rao 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../kafka/clients/admin/MockAdminClient.java       |  37 +++++++
 .../src/main/scala/kafka/admin/ConfigCommand.scala |  38 +++++--
 .../main/scala/kafka/server/DynamicConfig.scala    |   7 ++
 .../main/scala/kafka/server/ZkConfigManager.scala  |   3 +-
 .../kafka/server/metadata/ZkConfigRepository.scala |   2 +
 .../scala/unit/kafka/admin/ConfigCommandTest.scala | 117 +++++++++++++++++++++
 .../server/metadata/ZkConfigRepositoryTest.scala   |   5 +-
 8 files changed, 200 insertions(+), 11 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index cd6ac4a13dc..bcdfede03f3 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -89,7 +89,7 @@
               
files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData|JoinGroupRequest).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              
files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler).java"/>
+              
files="(AbstractFetch|ConsumerCoordinator|FetchCollector|OffsetFetcherUtils|KafkaProducer|Sender|ConfigDef|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|SaslServerAuthenticator|AbstractCoordinator|TransactionManager|AbstractStickyAssignor|DefaultSslEngineFactory|Authorizer|RecordAccumulator|MemoryRecords|FetchSessionHandler|MockAdminClient).java"/>
 
     <suppress checks="JavaNCSS"
               
files="(AbstractRequest|AbstractResponse|KerberosLogin|WorkerSinkTaskTest|TransactionManagerTest|SenderTest|KafkaAdminClient|ConsumerCoordinatorTest|KafkaAdminClientTest|KafkaRaftClientTest).java"/>
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 515b02e6965..52188c68df4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -90,6 +90,7 @@ public class MockAdminClient extends AdminClient {
     private final String clusterId;
     private final List<List<String>> brokerLogDirs;
     private final List<Map<String, String>> brokerConfigs;
+    private final Map<String, Map<String, String>> clientMetricsConfigs;
 
     private Node controller;
     private int timeoutNextRequests = 0;
@@ -238,6 +239,7 @@ public class MockAdminClient extends AdminClient {
         this.defaultReplicationFactor = defaultReplicationFactor;
         this.brokerLogDirs = brokerLogDirs;
         this.brokerConfigs = new ArrayList<>();
+        this.clientMetricsConfigs = new HashMap<>();
         for (int i = 0; i < brokers.size(); i++) {
             final Map<String, String> config = new HashMap<>();
             config.put("default.replication.factor", 
String.valueOf(defaultReplicationFactor));
@@ -823,6 +825,13 @@ public class MockAdminClient extends AdminClient {
                 }
                 throw new UnknownTopicOrPartitionException("Resource " + 
resource + " not found.");
             }
+            case CLIENT_METRICS: {
+                String resourceName = resource.name();
+                if (resourceName.isEmpty()) {
+                    throw new InvalidRequestException("Empty resource name");
+                }
+                return toConfigObject(clientMetricsConfigs.get(resourceName));
+            }
             default:
                 throw new UnsupportedOperationException("Not implemented yet");
         }
@@ -916,6 +925,34 @@ public class MockAdminClient extends AdminClient {
                 topicMetadata.configs = newMap;
                 return null;
             }
+            case CLIENT_METRICS: {
+                String resourceName = resource.name();
+
+                if (resourceName.isEmpty()) {
+                    return new InvalidRequestException("Empty resource name");
+                }
+
+                if (!clientMetricsConfigs.containsKey(resourceName)) {
+                    clientMetricsConfigs.put(resourceName, new HashMap<>());
+                }
+
+                HashMap<String, String> newMap = new 
HashMap<>(clientMetricsConfigs.get(resourceName));
+                for (AlterConfigOp op : ops) {
+                    switch (op.opType()) {
+                        case SET:
+                            newMap.put(op.configEntry().name(), 
op.configEntry().value());
+                            break;
+                        case DELETE:
+                            newMap.remove(op.configEntry().name());
+                            break;
+                        default:
+                            return new InvalidRequestException(
+                                "Unsupported op type " + op.opType());
+                    }
+                }
+                clientMetricsConfigs.put(resourceName, newMap);
+                return null;
+            }
             default:
                 return new UnsupportedOperationException();
         }
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 25d400918f6..37d41458c39 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -29,7 +29,7 @@ import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, 
AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, 
DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, 
UserScramCredentialDeletion, UserScramCredentialUpsertion, Config => JConfig, 
ScramMechanism => PublicScramMechanism}
 import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.errors.InvalidConfigurationException
+import org.apache.kafka.common.errors.{InvalidConfigurationException, 
InvalidRequestException}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
 import org.apache.kafka.common.security.JaasUtils
@@ -44,7 +44,7 @@ import scala.jdk.CollectionConverters._
 import scala.collection._
 
 /**
- * This script can be used to change configs for 
topics/clients/users/brokers/ips dynamically
+ * This script can be used to change configs for 
topics/clients/users/brokers/ips/client-metrics dynamically
  * An entity described or altered by the command may be one of:
  * <ul>
  *     <li> topic: --topic <topic> OR --entity-type topics --entity-name 
<topic>
@@ -55,6 +55,7 @@ import scala.collection._
  *     <li> broker: --broker <broker-id> OR --entity-type brokers 
--entity-name <broker-id>
  *     <li> broker-logger: --broker-logger <broker-id> OR --entity-type 
broker-loggers --entity-name <broker-id>
  *     <li> ip: --ip <ip> OR --entity-type ips --entity-name <ip>
+ *     <li> client-metrics: --client-metrics <name> OR --entity-type 
client-metrics --entity-name <name>
  * </ul>
  * --entity-type <users|clients|brokers|ips> --entity-default may be specified 
in place of --entity-type <users|clients|brokers|ips> --entity-name <entityName>
  * when describing or altering default configuration for users, clients, 
brokers, or ips, respectively.
@@ -76,7 +77,7 @@ object ConfigCommand extends Logging {
 
   val BrokerDefaultEntityName = ""
   val BrokerLoggerConfigType = "broker-loggers"
-  val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType
+  val BrokerSupportedConfigTypes = ConfigType.all :+ BrokerLoggerConfigType :+ 
ConfigType.ClientMetrics
   val ZkSupportedConfigTypes = Seq(ConfigType.User, ConfigType.Broker)
   val DefaultScramIterations = 4096
 
@@ -84,7 +85,7 @@ object ConfigCommand extends Logging {
     try {
       val opts = new ConfigCommandOptions(args)
 
-      CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
manipulate and describe entity config for a topic, client, user, broker or ip")
+      CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
manipulate and describe entity config for a topic, client, user, broker, ip or 
client-metrics")
 
       opts.checkArgs()
 
@@ -445,6 +446,21 @@ object ConfigCommand extends Logging {
         if (unknownConfigs.nonEmpty)
           throw new IllegalArgumentException(s"Only connection quota configs 
can be added for '${ConfigType.Ip}' using --bootstrap-server. Unexpected config 
names: ${unknownConfigs.mkString(",")}")
         alterQuotaConfigs(adminClient, entityTypes, entityNames, 
configsToBeAddedMap, configsToBeDeleted)
+      case ConfigType.ClientMetrics =>
+        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
+          .map { entry => (entry.name, entry) }.toMap
+
+        // fail the command if any of the configs to be deleted does not exist
+        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
+        if (invalidConfigs.nonEmpty)
+          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
+
+        val configResource = new 
ConfigResource(ConfigResource.Type.CLIENT_METRICS, entityNameHead)
+        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
+        val alterEntries = (configsToBeAdded.values.map(new AlterConfigOp(_, 
AlterConfigOp.OpType.SET))
+          ++ configsToBeDeleted.map { k => new AlterConfigOp(new 
ConfigEntry(k, ""), AlterConfigOp.OpType.DELETE) }
+          ).asJavaCollection
+        adminClient.incrementalAlterConfigs(Map(configResource -> 
alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)
       case _ => throw new IllegalArgumentException(s"Unsupported entity type: 
$entityTypeHead")
     }
 
@@ -518,7 +534,7 @@ object ConfigCommand extends Logging {
     val describeAll = opts.options.has(opts.allOpt)
 
     entityTypes.head match {
-      case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType =>
+      case ConfigType.Topic | ConfigType.Broker | BrokerLoggerConfigType | 
ConfigType.ClientMetrics =>
         describeResourceConfig(adminClient, entityTypes.head, 
entityNames.headOption, describeAll)
       case ConfigType.User | ConfigType.Client =>
         describeClientQuotaAndUserScramCredentialConfigs(adminClient, 
entityTypes, entityNames)
@@ -536,6 +552,8 @@ object ConfigCommand extends Logging {
           adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
         case ConfigType.Broker | BrokerLoggerConfigType =>
           adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ 
BrokerDefaultEntityName
+        case ConfigType.ClientMetrics =>
+          throw new InvalidRequestException("Client metrics entity-name is 
required")
         case entityType => throw new IllegalArgumentException(s"Invalid entity 
type: $entityType")
       })
 
@@ -576,6 +594,8 @@ object ConfigCommand extends Logging {
         if (entityName.nonEmpty)
           validateBrokerId()
         (ConfigResource.Type.BROKER_LOGGER, None)
+      case ConfigType.ClientMetrics =>
+        (ConfigResource.Type.CLIENT_METRICS, 
Some(ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG))
       case entityType => throw new IllegalArgumentException(s"Invalid entity 
type: $entityType")
     }
 
@@ -791,10 +811,10 @@ object ConfigCommand extends Logging {
     val describeOpt = parser.accepts("describe", "List configs for the given 
entity.")
     val allOpt = parser.accepts("all", "List all configs for the given topic, 
broker, or broker-logger entity (includes static configuration when the entity 
type is brokers)")
 
-    val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/users/brokers/broker-loggers/ips)")
+    val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/users/brokers/broker-loggers/ips/client-metrics)")
       .withRequiredArg
       .ofType(classOf[String])
-    val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/user principal name/broker id/ip)")
+    val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/user principal name/broker id/ip/client metrics)")
       .withRequiredArg
       .ofType(classOf[String])
     val entityDefault = parser.accepts("entity-default", "Default entity name 
for clients/users/brokers/ips (applies to corresponding entity type in command 
line)")
@@ -806,6 +826,7 @@ object ConfigCommand extends Logging {
       "For entity-type '" + ConfigType.User + "': " + 
DynamicConfig.User.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, 
nl) +
       "For entity-type '" + ConfigType.Client + "': " + 
DynamicConfig.Client.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, 
nl) +
       "For entity-type '" + ConfigType.Ip + "': " + 
DynamicConfig.Ip.names.asScala.toSeq.sorted.map("\t" + _).mkString(nl, nl, nl) +
+      "For entity-type '" + ConfigType.ClientMetrics + "': " + 
DynamicConfig.ClientMetrics.names.asScala.toSeq.sorted.map("\t" + 
_).mkString(nl, nl, nl) +
       s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may be 
specified together to update config for clients of a specific user.")
       .withRequiredArg
       .ofType(classOf[String])
@@ -939,7 +960,8 @@ object ConfigCommand extends Logging {
         }
       }
 
-      if (options.has(describeOpt) && 
entityTypeVals.contains(BrokerLoggerConfigType) && !hasEntityName)
+      if (options.has(describeOpt) && 
(entityTypeVals.contains(BrokerLoggerConfigType) ||
+        entityTypeVals.contains(ConfigType.ClientMetrics)) && !hasEntityName)
         throw new IllegalArgumentException(s"an entity name must be specified 
with --describe of ${entityTypeVals.mkString(",")}")
 
       if (options.has(alterOpt)) {
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala 
b/core/src/main/scala/kafka/server/DynamicConfig.scala
index 8af2dece042..d91a06d8bd3 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -25,6 +25,7 @@ import org.apache.kafka.common.config.ConfigDef.Range._
 import org.apache.kafka.common.config.ConfigDef.Type._
 import org.apache.kafka.storage.internals.log.LogConfig
 
+import java.util
 import scala.jdk.CollectionConverters._
 
 /**
@@ -111,6 +112,12 @@ object DynamicConfig {
     }
   }
 
+  object ClientMetrics {
+    private val clientConfigs = 
org.apache.kafka.server.metrics.ClientMetricsConfigs.configDef()
+
+    def names: util.Set[String] = clientConfigs.names
+  }
+
   private def validate(configDef: ConfigDef, props: Properties, 
customPropsAllowed: Boolean) = {
     // Validate Names
     val names = configDef.names()
diff --git a/core/src/main/scala/kafka/server/ZkConfigManager.scala 
b/core/src/main/scala/kafka/server/ZkConfigManager.scala
index c7820859807..abead88b07c 100644
--- a/core/src/main/scala/kafka/server/ZkConfigManager.scala
+++ b/core/src/main/scala/kafka/server/ZkConfigManager.scala
@@ -39,7 +39,8 @@ object ConfigType {
   val Broker = "brokers"
   val Ip = "ips"
   val ClientMetrics = "client-metrics"
-  val all = Seq(Topic, Client, User, Broker, Ip, ClientMetrics)
+  // Do not include ClientMetrics in `all` as ClientMetrics is not supported 
on ZK.
+  val all = Seq(Topic, Client, User, Broker, Ip)
 }
 
 object ConfigEntityName {
diff --git a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala 
b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
index 16842bcd11f..a2aabe656c5 100644
--- a/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkConfigRepository.scala
@@ -23,6 +23,7 @@ import kafka.server.{ConfigEntityName, ConfigType}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.errors.InvalidRequestException
 
 
 object ZkConfigRepository {
@@ -35,6 +36,7 @@ class ZkConfigRepository(adminZkClient: AdminZkClient) 
extends ConfigRepository
     val configTypeForZk = configResource.`type` match {
       case Type.TOPIC => ConfigType.Topic
       case Type.BROKER => ConfigType.Broker
+      case Type.CLIENT_METRICS => throw new InvalidRequestException("Config 
type client-metrics is only supported on KRaft clusters")
       case tpe => throw new IllegalArgumentException(s"Unsupported config 
type: $tpe")
     }
     // ZK stores cluster configs under "<default>".
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index 12e460a0bd3..069b79a5e0d 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -1628,6 +1628,123 @@ class ConfigCommandTest extends Logging {
         Seq("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"))
   }
 
+  @Test
+  def shouldAlterClientMetricsConfig(): Unit = {
+    val node = new Node(1, "localhost", 9092)
+    verifyAlterClientMetricsConfig(node, "1", List("--entity-name", "1"))
+  }
+
+  private def verifyAlterClientMetricsConfig(node: Node, resourceName: String, 
resourceOpts: List[String]): Unit = {
+    val optsList = List("--bootstrap-server", "localhost:9092",
+      "--entity-type", "client-metrics",
+      "--alter",
+      "--delete-config", "interval.ms",
+      "--add-config", "metrics=org.apache.kafka.consumer.," +
+        
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]") 
++ resourceOpts
+    val alterOpts = new ConfigCommandOptions(optsList.toArray)
+
+    val resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, 
resourceName)
+    val configEntries = util.Collections.singletonList(new 
ConfigEntry("interval.ms", "1000",
+      ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, 
util.Collections.emptyList[ConfigEntry.ConfigSynonym],
+      ConfigEntry.ConfigType.UNKNOWN, null))
+    val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
+    future.complete(util.Collections.singletonMap(resource, new 
Config(configEntries)))
+    val describeResult: DescribeConfigsResult = 
mock(classOf[DescribeConfigsResult])
+    when(describeResult.all()).thenReturn(future)
+
+    val alterFuture = new KafkaFutureImpl[Void]
+    alterFuture.complete(null)
+    val alterResult: AlterConfigsResult = mock(classOf[AlterConfigsResult])
+    when(alterResult.all()).thenReturn(alterFuture)
+
+    val mockAdminClient = new 
MockAdminClient(util.Collections.singletonList(node), node) {
+      override def describeConfigs(resources: util.Collection[ConfigResource], 
options: DescribeConfigsOptions): DescribeConfigsResult = {
+        assertFalse(options.includeSynonyms(), "Config synonyms requested 
unnecessarily")
+        assertEquals(1, resources.size)
+        val resource = resources.iterator.next
+        assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+        assertEquals(resourceName, resource.name)
+        describeResult
+      }
+
+      override def incrementalAlterConfigs(configs: util.Map[ConfigResource, 
util.Collection[AlterConfigOp]], options: AlterConfigsOptions): 
AlterConfigsResult = {
+        assertEquals(1, configs.size)
+        val entry = configs.entrySet.iterator.next
+        val resource = entry.getKey
+        val alterConfigOps = entry.getValue
+        assertEquals(ConfigResource.Type.CLIENT_METRICS, resource.`type`)
+        assertEquals(3, alterConfigOps.size)
+
+        val expectedConfigOps = List(
+          new AlterConfigOp(new ConfigEntry("match", 
"client_software_name=kafka.python,client_software_version=1\\.2\\..*"), 
AlterConfigOp.OpType.SET),
+          new AlterConfigOp(new ConfigEntry("metrics", 
"org.apache.kafka.consumer."), AlterConfigOp.OpType.SET),
+          new AlterConfigOp(new ConfigEntry("interval.ms", ""), 
AlterConfigOp.OpType.DELETE)
+        )
+        assertEquals(expectedConfigOps, alterConfigOps.asScala.toList)
+        alterResult
+      }
+    }
+    ConfigCommand.alterConfig(mockAdminClient, alterOpts)
+    verify(describeResult).all()
+    verify(alterResult).all()
+  }
+
+  @Test
+  def shouldNotDescribeClientMetricsConfigWithoutEntityName(): Unit = {
+    val describeOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+      "--entity-type", "client-metrics",
+      "--describe"))
+
+    val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
+    assertEquals("an entity name must be specified with --describe of 
client-metrics", exception.getMessage)
+  }
+
+  @Test
+  def shouldNotAlterClientMetricsConfigWithoutEntityName(): Unit = {
+    val alterOpts = new ConfigCommandOptions(Array("--bootstrap-server", 
"localhost:9092",
+      "--entity-type", "client-metrics",
+      "--alter",
+      "--add-config", "interval.ms=1000"))
+
+    val exception = assertThrows(classOf[IllegalArgumentException], () => 
alterOpts.checkArgs())
+    assertEquals("an entity name must be specified with --alter of 
client-metrics", exception.getMessage)
+  }
+
+  @Test
+  def shouldNotSupportAlterClientMetricsWithZookeeperArg(): Unit = {
+    val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "sub",
+      "--entity-type", "client-metrics",
+      "--alter",
+      "--add-config", "interval.ms=1000"))
+
+    val exception = assertThrows(classOf[IllegalArgumentException], () => 
alterOpts.checkArgs())
+    assertEquals("Invalid entity type client-metrics, the entity type must be 
one of users, brokers with a --zookeeper argument", exception.getMessage)
+  }
+
+  @Test
+  def shouldNotSupportDescribeClientMetricsWithZookeeperArg(): Unit = {
+    val describeOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "sub",
+      "--entity-type", "client-metrics",
+      "--describe"))
+
+    val exception = assertThrows(classOf[IllegalArgumentException], () => 
describeOpts.checkArgs())
+    assertEquals("Invalid entity type client-metrics, the entity type must be 
one of users, brokers with a --zookeeper argument", exception.getMessage)
+  }
+
+  @Test
+  def shouldNotSupportAlterClientMetricsWithZookeeper(): Unit = {
+    val alterOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect,
+      "--entity-name", "sub",
+      "--entity-type", "client-metrics",
+      "--alter",
+      "--add-config", "interval.ms=1000"))
+
+    val exception = assertThrows(classOf[IllegalArgumentException], () => 
ConfigCommand.alterConfigWithZk(null, alterOpts, dummyAdminZkClient))
+    assertEquals("client-metrics is not a known entityType. Should be one of 
List(topics, clients, users, brokers, ips)", exception.getMessage)
+  }
+
   class DummyAdminZkClient(zkClient: KafkaZkClient) extends 
AdminZkClient(zkClient) {
     override def changeBrokerConfig(brokerIds: Seq[Int], configs: Properties): 
Unit = {}
     override def fetchEntityConfig(entityType: String, entityName: String): 
Properties = {new Properties}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala 
b/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
index f8737751fa5..fbb2c9e8cf9 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/ZkConfigRepositoryTest.scala
@@ -22,6 +22,7 @@ import kafka.server.metadata.ZkConfigRepository
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type
+import org.apache.kafka.common.errors.InvalidRequestException
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito.{mock, when}
@@ -48,7 +49,9 @@ class ZkConfigRepositoryTest {
   def testUnsupportedTypes(): Unit = {
     val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
     val zkConfigRepository = ZkConfigRepository(zkClient)
-    Type.values().foreach(value => if (value != Type.BROKER && value != 
Type.TOPIC)
+    Type.values().foreach(value => if (value != Type.BROKER && value != 
Type.TOPIC && value != Type.CLIENT_METRICS)
       assertThrows(classOf[IllegalArgumentException], () => 
zkConfigRepository.config(new ConfigResource(value, value.toString))))
+    // Validate exception for CLIENT_METRICS.
+    assertThrows(classOf[InvalidRequestException], () => 
zkConfigRepository.config(new ConfigResource(Type.CLIENT_METRICS, 
Type.CLIENT_METRICS.toString)))
   }
 }

Reply via email to