AndrewJSchofield commented on code in PR #14621:
URL: https://github.com/apache/kafka/pull/14621#discussion_r1369865768


##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsConfig.scala:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * <p>
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * <p>
+ * {
+ * <ul>
+ *   <li> subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *   <li> subscribedMetrics: List of metric prefixes
+ *   <li> pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *   <li> matchingPatternsList: List of client matching patterns, that are 
used by broker to match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *  <li> "metrics" value should be comma separated metrics list. An empty list 
means no metrics subscribed.
+ *      A list containing just an empty string means all metrics subscribed.
+ *      Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *  <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the 
interval at which the client
+ *      should push the metrics to the broker.
+ *  <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *      pattern specified then broker considers that as all match which means 
the associated metrics
+ *      applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,
+                         subscribedMetrics: List[String],
+                         pushIntervalMs: Int,
+                         var matchingPatternsList: List[String]) {
+    def getId: String = subscriptionId
+    def getPushIntervalMs: Int = pushIntervalMs
+    def getClientMatchingPatterns: Map[String, String] = 
ClientMetricsMetadata.parseMatchingPatterns(matchingPatternsList)
+    def getSubscribedMetrics: List[String] = subscribedMetrics
+  }
+
+  object ClientMatchingParams {
+    val CLIENT_ID = "client_id"
+    val CLIENT_INSTANCE_ID = "client_instance_id"
+    val CLIENT_SOFTWARE_NAME = "client_software_name"
+    val CLIENT_SOFTWARE_VERSION = "client_software_version"
+    val CLIENT_SOURCE_ADDRESS = "client_source_address"
+    val CLIENT_SOURCE_PORT = "client_source_port"
+
+    val matchersList = List(CLIENT_ID, CLIENT_INSTANCE_ID, 
CLIENT_SOFTWARE_NAME,
+                            CLIENT_SOFTWARE_VERSION, CLIENT_SOURCE_ADDRESS, 
CLIENT_SOURCE_PORT)
+
+    def isValidParam(param: String) = matchersList.contains(param)
+  }
+
+  object ClientMetrics {
+    // Properties that are used to create the subscription for client_metrics.
+    val SubscriptionMetrics = "metrics"
+    val PushIntervalMs = "interval.ms"
+    val ClientMatchPattern = "match"
+
+    val DEFAULT_PUSH_INTERVAL = 30 * 1000 // 5 minutes
+
+    // Definitions of accepted values
+    val configDef = new ConfigDef()
+      .define(SubscriptionMetrics, LIST, "", MEDIUM, "List of the subscribed 
metrics")
+      .define(ClientMatchPattern, LIST, "", MEDIUM, "Pattern used to find the 
matching clients")
+      .define(PushIntervalMs, INT, DEFAULT_PUSH_INTERVAL, MEDIUM, "Interval 
that a client can push the metrics")
+
+    def names = configDef.names
+
+    def validate(subscriptionId :String, properties :Properties): Unit = {
+      if (subscriptionId.isEmpty) {
+        throw new InvalidRequestException("subscriptionId can't be empty")
+      }
+      validateProperties(properties)
+    }
+
+    def validateProperties(properties: Properties) = {
+      // Make sure that all the properties are valid.
+      properties.keySet().forEach(x =>
+        if (!names.contains(x))
+          throw new InvalidRequestException(s"Unknown client metric 
configuration: $x")
+      )
+
+      // Make sure that push interval is between 100ms and 1 hour.
+      if (properties.containsKey(PushIntervalMs)) {
+        val pushIntervalMs = 
Integer.parseInt(properties.getProperty(PushIntervalMs))
+        if (pushIntervalMs < 100 || pushIntervalMs > 3600000)

Review Comment:
   There have to be constants for these min and max values that you can use.



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsConfig.scala:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * <p>
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * <p>
+ * {
+ * <ul>
+ *   <li> subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *   <li> subscribedMetrics: List of metric prefixes
+ *   <li> pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *   <li> matchingPatternsList: List of client matching patterns, that are 
used by broker to match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *  <li> "metrics" value should be comma separated metrics list. An empty list 
means no metrics subscribed.
+ *      A list containing just an empty string means all metrics subscribed.
+ *      Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *  <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the 
interval at which the client
+ *      should push the metrics to the broker.
+ *  <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *      pattern specified then broker considers that as all match which means 
the associated metrics
+ *      applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,
+                         subscribedMetrics: List[String],
+                         pushIntervalMs: Int,
+                         var matchingPatternsList: List[String]) {
+    def getId: String = subscriptionId
+    def getPushIntervalMs: Int = pushIntervalMs
+    def getClientMatchingPatterns: Map[String, String] = 
ClientMetricsMetadata.parseMatchingPatterns(matchingPatternsList)
+    def getSubscribedMetrics: List[String] = subscribedMetrics
+  }
+
+  object ClientMatchingParams {
+    val CLIENT_ID = "client_id"
+    val CLIENT_INSTANCE_ID = "client_instance_id"
+    val CLIENT_SOFTWARE_NAME = "client_software_name"
+    val CLIENT_SOFTWARE_VERSION = "client_software_version"
+    val CLIENT_SOURCE_ADDRESS = "client_source_address"
+    val CLIENT_SOURCE_PORT = "client_source_port"
+
+    val matchersList = List(CLIENT_ID, CLIENT_INSTANCE_ID, 
CLIENT_SOFTWARE_NAME,
+                            CLIENT_SOFTWARE_VERSION, CLIENT_SOURCE_ADDRESS, 
CLIENT_SOURCE_PORT)
+
+    def isValidParam(param: String) = matchersList.contains(param)
+  }
+
+  object ClientMetrics {
+    // Properties that are used to create the subscription for client_metrics.
+    val SubscriptionMetrics = "metrics"
+    val PushIntervalMs = "interval.ms"
+    val ClientMatchPattern = "match"
+
+    val DEFAULT_PUSH_INTERVAL = 30 * 1000 // 5 minutes

Review Comment:
   I would call it `DEFAULT_INTERVAL_MS` so it's clear it's milliseconds. Also, 
the config is `interval.ms`.



##########
core/src/main/scala/kafka/server/ConfigHelper.scala:
##########
@@ -57,13 +58,15 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
           authHelper.authorize(request.context, DESCRIBE_CONFIGS, CLUSTER, 
CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authHelper.authorize(request.context, DESCRIBE_CONFIGS, TOPIC, 
resource.resourceName)
+        case ConfigResource.Type.CLIENT_METRICS =>
+          authHelper.authorize(request.context, DESCRIBE_CONFIGS, 
CLIENT_METRICS, resource.resourceName)

Review Comment:
   The KIP says that the ACL checked is operation `DESCRIBE_CONFIGS` and 
resource `CLUSTER`.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2754,6 +2754,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, 
resource.name)
+        case ConfigResource.Type.CLIENT_METRICS =>
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLIENT_METRICS, resource.name)

Review Comment:
   See earlier comment. There are no ACLs for `CLIENT_METRICS` resource type 
and checking is performed against the `CLUSTER` resource.



##########
clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java:
##########
@@ -68,7 +68,12 @@ public enum ResourceType {
     /**
      * A user principal
      */
-    USER((byte) 7);
+    USER((byte) 7),
+
+    /**
+     *  Client metrics
+     */
+    CLIENT_METRICS((byte) 8);

Review Comment:
   The KIP does not define this constant. Checking is against the `CLUSTER` 
resource type.



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsMetadata.scala:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import kafka.Kafka.info
+import 
kafka.metrics.clientmetrics.ClientMetricsConfig.ClientMatchingParams.{CLIENT_ID,
 CLIENT_INSTANCE_ID, CLIENT_SOFTWARE_NAME, CLIENT_SOFTWARE_VERSION, 
CLIENT_SOURCE_ADDRESS, CLIENT_SOURCE_PORT, isValidParam}
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+    val instance = new ClientMetricsMetadata
+    val ctx = request.context
+    val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+    val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+    instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+                  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+    instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+            softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+    val instance = new ClientMetricsMetadata
+    instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+    instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   *     occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return

Review Comment:
   `@return` what?



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsConfig.scala:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * <p>
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * <p>
+ * {
+ * <ul>
+ *   <li> subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *   <li> subscribedMetrics: List of metric prefixes
+ *   <li> pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *   <li> matchingPatternsList: List of client matching patterns, that are 
used by broker to match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *  <li> "metrics" value should be comma separated metrics list. An empty list 
means no metrics subscribed.
+ *      A list containing just an empty string means all metrics subscribed.
+ *      Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *  <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the 
interval at which the client
+ *      should push the metrics to the broker.
+ *  <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *      pattern specified then broker considers that as all match which means 
the associated metrics
+ *      applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,
+                         subscribedMetrics: List[String],
+                         pushIntervalMs: Int,
+                         var matchingPatternsList: List[String]) {
+    def getId: String = subscriptionId
+    def getPushIntervalMs: Int = pushIntervalMs
+    def getClientMatchingPatterns: Map[String, String] = 
ClientMetricsMetadata.parseMatchingPatterns(matchingPatternsList)
+    def getSubscribedMetrics: List[String] = subscribedMetrics
+  }
+
+  object ClientMatchingParams {
+    val CLIENT_ID = "client_id"
+    val CLIENT_INSTANCE_ID = "client_instance_id"
+    val CLIENT_SOFTWARE_NAME = "client_software_name"
+    val CLIENT_SOFTWARE_VERSION = "client_software_version"
+    val CLIENT_SOURCE_ADDRESS = "client_source_address"
+    val CLIENT_SOURCE_PORT = "client_source_port"
+
+    val matchersList = List(CLIENT_ID, CLIENT_INSTANCE_ID, 
CLIENT_SOFTWARE_NAME,
+                            CLIENT_SOFTWARE_VERSION, CLIENT_SOURCE_ADDRESS, 
CLIENT_SOURCE_PORT)
+
+    def isValidParam(param: String) = matchersList.contains(param)
+  }
+
+  object ClientMetrics {
+    // Properties that are used to create the subscription for client_metrics.
+    val SubscriptionMetrics = "metrics"
+    val PushIntervalMs = "interval.ms"
+    val ClientMatchPattern = "match"
+
+    val DEFAULT_PUSH_INTERVAL = 30 * 1000 // 5 minutes
+
+    // Definitions of accepted values
+    val configDef = new ConfigDef()
+      .define(SubscriptionMetrics, LIST, "", MEDIUM, "List of the subscribed 
metrics")
+      .define(ClientMatchPattern, LIST, "", MEDIUM, "Pattern used to find the 
matching clients")
+      .define(PushIntervalMs, INT, DEFAULT_PUSH_INTERVAL, MEDIUM, "Interval 
that a client can push the metrics")
+
+    def names = configDef.names
+
+    def validate(subscriptionId :String, properties :Properties): Unit = {

Review Comment:
   Tiny formatting improvement. `subscriptionId: String, properties: 
Properties)` please.



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsConfig.scala:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics

Review Comment:
   I would move to the parent `metrics` package.



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsConfig.scala:
##########
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import org.apache.kafka.common.config.ConfigDef
+import org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM
+import org.apache.kafka.common.config.ConfigDef.Type.{INT, LIST}
+import org.apache.kafka.common.errors.InvalidRequestException
+
+import java.util.Properties
+
+/**
+ * Client metric configuration related parameters and the supporting methods 
like validation and update methods
+ * are defined in this class.
+ * <p>
+ * SubscriptionInfo: Contains the client metric subscription information. 
Supported operations from the CLI are
+ * add/delete/update operations. Every subscription object contains the 
following parameters that are populated
+ * during the creation of the subscription.
+ * <p>
+ * {
+ * <ul>
+ *   <li> subscriptionId: Name/ID supplied by CLI during the creation of the 
client metric subscription.
+ *   <li> subscribedMetrics: List of metric prefixes
+ *   <li> pushIntervalMs: A positive integer value >=0  tells the client that 
how often a client can push the metrics
+ *   <li> matchingPatternsList: List of client matching patterns, that are 
used by broker to match the client instance
+ *   with the subscription.
+ * </ul>
+ * }
+ * <p>
+ * At present, CLI can pass the following parameters in request to 
add/delete/update the client metrics
+ * subscription:
+ * <ul>
+ *  <li> "metrics" value should be comma separated metrics list. An empty list 
means no metrics subscribed.
+ *      A list containing just an empty string means all metrics subscribed.
+ *      Ex: 
"org.apache.kafka.producer.partition.queue.,org.apache.kafka.producer.partition.latency"
+ *  <li> "interval.ms" should be between 100 and 3600000 (1 hour). This is the 
interval at which the client
+ *      should push the metrics to the broker.
+ *  <li> "match" is a comma separated list of client match patterns, in case 
if there is no matching
+ *      pattern specified then broker considers that as all match which means 
the associated metrics
+ *      applies to all the clients. Ex: "client_software_name = Java, 
client_software_version = 11.1.*"
+ * </ul>
+ * For more information please look at kip-714:
+ * 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientmetricsconfiguration
+ */
+object ClientMetricsConfig {
+
+  class SubscriptionInfo(subscriptionId: String,
+                         subscribedMetrics: List[String],
+                         pushIntervalMs: Int,
+                         var matchingPatternsList: List[String]) {
+    def getId: String = subscriptionId
+    def getPushIntervalMs: Int = pushIntervalMs
+    def getClientMatchingPatterns: Map[String, String] = 
ClientMetricsMetadata.parseMatchingPatterns(matchingPatternsList)
+    def getSubscribedMetrics: List[String] = subscribedMetrics
+  }
+
+  object ClientMatchingParams {
+    val CLIENT_ID = "client_id"
+    val CLIENT_INSTANCE_ID = "client_instance_id"
+    val CLIENT_SOFTWARE_NAME = "client_software_name"
+    val CLIENT_SOFTWARE_VERSION = "client_software_version"
+    val CLIENT_SOURCE_ADDRESS = "client_source_address"
+    val CLIENT_SOURCE_PORT = "client_source_port"
+
+    val matchersList = List(CLIENT_ID, CLIENT_INSTANCE_ID, 
CLIENT_SOFTWARE_NAME,
+                            CLIENT_SOFTWARE_VERSION, CLIENT_SOURCE_ADDRESS, 
CLIENT_SOURCE_PORT)
+
+    def isValidParam(param: String) = matchersList.contains(param)
+  }
+
+  object ClientMetrics {
+    // Properties that are used to create the subscription for client_metrics.
+    val SubscriptionMetrics = "metrics"
+    val PushIntervalMs = "interval.ms"
+    val ClientMatchPattern = "match"
+
+    val DEFAULT_PUSH_INTERVAL = 30 * 1000 // 5 minutes
+
+    // Definitions of accepted values
+    val configDef = new ConfigDef()
+      .define(SubscriptionMetrics, LIST, "", MEDIUM, "List of the subscribed 
metrics")
+      .define(ClientMatchPattern, LIST, "", MEDIUM, "Pattern used to find the 
matching clients")
+      .define(PushIntervalMs, INT, DEFAULT_PUSH_INTERVAL, MEDIUM, "Interval 
that a client can push the metrics")
+
+    def names = configDef.names
+
+    def validate(subscriptionId :String, properties :Properties): Unit = {
+      if (subscriptionId.isEmpty) {
+        throw new InvalidRequestException("subscriptionId can't be empty")
+      }
+      validateProperties(properties)
+    }
+
+    def validateProperties(properties: Properties) = {
+      // Make sure that all the properties are valid.
+      properties.keySet().forEach(x =>
+        if (!names.contains(x))
+          throw new InvalidRequestException(s"Unknown client metric 
configuration: $x")
+      )
+
+      // Make sure that push interval is between 100ms and 1 hour.
+      if (properties.containsKey(PushIntervalMs)) {
+        val pushIntervalMs = 
Integer.parseInt(properties.getProperty(PushIntervalMs))
+        if (pushIntervalMs < 100 || pushIntervalMs > 3600000)
+          throw new InvalidRequestException(s"Invalid parameter 
${PushIntervalMs}")

Review Comment:
   This exception would not mention which parameter was invalid. It would say 
something like `Invalid parameter 100`. Probably ought to improve it.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2916,6 +2918,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLUSTER, CLUSTER_NAME)
         case ConfigResource.Type.TOPIC =>
           authHelper.authorize(originalRequest.context, ALTER_CONFIGS, TOPIC, 
resource.name)
+        case ConfigResource.Type.CLIENT_METRICS =>
+          authHelper.authorize(originalRequest.context, ALTER_CONFIGS, 
CLIENT_METRICS, resource.name)

Review Comment:
   And `CLUSTER` again here too.



##########
core/src/main/scala/kafka/server/ControllerApis.scala:
##########
@@ -483,6 +483,12 @@ class ControllerApis(
         } else {
           new ApiError(TOPIC_AUTHORIZATION_FAILED)
         }
+      case ConfigResource.Type.CLIENT_METRICS =>
+        if (authHelper.authorize(requestContext, ALTER_CONFIGS, 
CLIENT_METRICS, resource.name)) {

Review Comment:
   The KIP says that the ACL check is operation `ALTER` and resource `CLUSTER`. 
I think that perhaps the KIP should actually say `ALTER_CONFIGS` but the 
resource type should remain `CLUSTER`. OK?



##########
core/src/main/scala/kafka/metrics/clientmetrics/ClientMetricsMetadata.scala:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.metrics.clientmetrics
+
+import kafka.Kafka.info
+import 
kafka.metrics.clientmetrics.ClientMetricsConfig.ClientMatchingParams.{CLIENT_ID,
 CLIENT_INSTANCE_ID, CLIENT_SOFTWARE_NAME, CLIENT_SOFTWARE_VERSION, 
CLIENT_SOURCE_ADDRESS, CLIENT_SOURCE_PORT, isValidParam}
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.InvalidConfigurationException
+
+import java.util.regex.{Pattern, PatternSyntaxException}
+import scala.collection.mutable
+
+/**
+ * Information from the client's metadata is gathered from the client's 
request.
+ */
+object ClientMetricsMetadata {
+  def apply(request: RequestChannel.Request, clientInstanceId: String): 
ClientMetricsMetadata = {
+    val instance = new ClientMetricsMetadata
+    val ctx = request.context
+    val softwareName = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareName() else ""
+    val softwareVersion = if (ctx.clientInformation != null) 
ctx.clientInformation.softwareVersion() else ""
+    instance.init(clientInstanceId, ctx.clientId(), softwareName, 
softwareVersion,
+                  ctx.clientAddress.getHostAddress, 
ctx.clientAddress.getHostAddress)
+    instance
+  }
+
+  def apply(clientInstanceId: String, clientId: String, softwareName: String,
+            softwareVersion: String, clientHostAddress: String, clientPort: 
String): ClientMetricsMetadata = {
+    val instance = new ClientMetricsMetadata
+    instance.init(clientInstanceId, clientId, softwareName, softwareVersion, 
clientHostAddress, clientPort)
+    instance
+  }
+
+  /**
+   * Parses the client matching patterns and builds a map with entries that has
+   * (PatternName, PatternValue) as the entries.
+   *  Ex: "VERSION=1.2.3" would be converted to a map entry of (Version, 1.2.3)
+   *
+   *  NOTES:
+   *  1. Client match pattern splits the input into two parts separated by 
first
+   *     occurrence of the character '='
+   *  2. '*' is considered as invalid client match pattern
+   * @param patterns List of client matching pattern strings
+   * @return
+   */
+  def parseMatchingPatterns(patterns: List[String]) : Map[String, String] = {
+    val patternsMap = mutable.Map[String, String]()
+    if (patterns != null) {
+      patterns.foreach(x => {
+        val nameValuePair = x.split("=", 2).map(x => x.trim)
+        if (nameValuePair.size == 2 && isValidParam(nameValuePair(0)) && 
validRegExPattern(nameValuePair(1))) {
+          patternsMap += (nameValuePair(0) -> nameValuePair(1))
+        } else {
+          throw new InvalidConfigurationException("Illegal client matching 
pattern: " + x)
+        }
+      })
+    }
+    patternsMap.toMap
+  }
+
+  private def validRegExPattern(inputPattern :String): Boolean = {
+    try {
+      Pattern.compile(inputPattern)

Review Comment:
   The KIP says specifically that the regex is compiled with Google RE2/J. This 
is generally what is used broker-side for regex for reasons of safety.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to