This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 71314739f9b KAFKA-15995: Initial API + make Producer/Consumer plugins 
Monitorable (#17511)
71314739f9b is described below

commit 71314739f9b51fa5b443258d68019f58583704eb
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Jan 31 10:40:10 2025 +0100

    KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable 
(#17511)
    
    
    Reviewers: Greg Harris <[email protected]>, Luke Chen 
<[email protected]>
---
 checkstyle/import-control.xml                      |   5 +
 .../clients/consumer/ConsumerInterceptor.java      |   2 +
 .../consumer/internals/AsyncKafkaConsumer.java     |  10 +-
 .../consumer/internals/ClassicKafkaConsumer.java   |  10 +-
 .../clients/consumer/internals/CompletedFetch.java |   4 +-
 .../consumer/internals/ConsumerInterceptors.java   |  23 ++--
 .../clients/consumer/internals/Deserializers.java  |  52 ++++---
 .../consumer/internals/ShareCompletedFetch.java    |   4 +-
 .../consumer/internals/ShareConsumerImpl.java      |   8 +-
 .../kafka/clients/producer/KafkaProducer.java      |  60 +++++----
 .../apache/kafka/clients/producer/Partitioner.java |   3 +
 .../clients/producer/ProducerInterceptor.java      |   2 +
 .../producer/internals/ProducerInterceptors.java   |  27 ++--
 .../org/apache/kafka/common/internals/Plugin.java  |  86 ++++++++++++
 .../apache/kafka/common/metrics/Monitorable.java   |  32 +++++
 .../apache/kafka/common/metrics/PluginMetrics.java |  75 +++++++++++
 .../metrics/internals/PluginMetricsImpl.java       | 113 ++++++++++++++++
 .../kafka/common/serialization/Deserializer.java   |   3 +-
 .../kafka/common/serialization/Serializer.java     |   3 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  81 +++++++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |   9 +-
 .../consumer/internals/CompletedFetchTest.java     |   4 +-
 .../internals/ConsumerInterceptorsTest.java        |   4 +-
 .../consumer/internals/FetchCollectorTest.java     |   5 +-
 .../internals/FetchRequestManagerTest.java         |   2 +-
 .../clients/consumer/internals/FetcherTest.java    |   4 +-
 .../consumer/internals/OffsetFetcherTest.java      |   2 +-
 .../internals/ShareCompletedFetchTest.java         |   4 +-
 .../internals/ShareConsumeRequestManagerTest.java  |   2 +-
 .../internals/ShareFetchCollectorTest.java         |   2 +-
 .../kafka/clients/producer/KafkaProducerTest.java  | 109 ++++++++++++++-
 .../internals/ProducerInterceptorsTest.java        |   6 +-
 .../apache/kafka/common/internals/PluginTest.java  | 149 +++++++++++++++++++++
 .../metrics/internals/PluginMetricsImplTest.java   | 120 +++++++++++++++++
 34 files changed, 908 insertions(+), 117 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5129d174223..32c276aa486 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -87,6 +87,11 @@
       <allow class="org.apache.kafka.common.record.RecordBatch" 
exact-match="true" />
     </subpackage>
 
+    <subpackage name="internals">
+      <allow pkg="org.apache.kafka.common.metrics" />
+      <allow pkg="org.apache.kafka.common.metrics.internals" />
+    </subpackage>
+
     <subpackage name="message">
       <allow pkg="com.fasterxml.jackson" />
       <allow pkg="org.apache.kafka.common.protocol" />
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index c04afccd8aa..206e6d04a2c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -39,6 +39,8 @@ import java.util.Map;
  * {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to 
<code>interceptor.classes</code>, and <code>class</code> set to the 
ConsumerInterceptor class name.
  */
 public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable 
{
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 2c65b44a7a0..69067670c30 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -326,12 +326,12 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 
             List<ConsumerInterceptor<K, V>> interceptorList = 
configuredConsumerInterceptors(config);
-            this.interceptors = new ConsumerInterceptors<>(interceptorList);
-            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
+            this.interceptors = new ConsumerInterceptors<>(interceptorList, 
metrics);
+            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
             this.subscriptions = createSubscriptionState(config, logContext);
             ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(metrics.reporters(),
                     interceptorList,
-                    Arrays.asList(deserializers.keyDeserializer, 
deserializers.valueDeserializer));
+                    Arrays.asList(deserializers.keyDeserializer(), 
deserializers.valueDeserializer()));
             this.metadata = metadataFactory.build(config, subscriptions, 
logContext, clusterResourceListeners);
             final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             metadata.bootstrap(addresses);
@@ -494,13 +494,13 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.autoCommitEnabled = 
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
         this.fetchBuffer = new FetchBuffer(logContext);
         this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
-        this.interceptors = new 
ConsumerInterceptors<>(Collections.emptyList());
         this.time = time;
         this.metrics = new Metrics(time);
+        this.interceptors = new 
ConsumerInterceptors<>(Collections.emptyList(), metrics);
         this.metadata = metadata;
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.defaultApiTimeoutMs = 
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
-        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
+        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.clientTelemetryReporter = Optional.empty();
 
         ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 28b82f3b3ad..9a8c637d778 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -179,13 +179,13 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
 
             List<ConsumerInterceptor<K, V>> interceptorList = 
configuredConsumerInterceptors(config);
-            this.interceptors = new ConsumerInterceptors<>(interceptorList);
-            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
+            this.interceptors = new ConsumerInterceptors<>(interceptorList, 
metrics);
+            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
             this.subscriptions = createSubscriptionState(config, logContext);
             ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
                     metrics.reporters(),
                     interceptorList,
-                    Arrays.asList(this.deserializers.keyDeserializer, 
this.deserializers.valueDeserializer));
+                    Arrays.asList(this.deserializers.keyDeserializer(), 
this.deserializers.valueDeserializer()));
             this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             this.metadata.bootstrap(addresses);
@@ -289,12 +289,12 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.metrics = new Metrics(time);
         this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
         this.groupId = 
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
-        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
+        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
         this.defaultApiTimeoutMs = 
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
         this.assignors = assignors;
         this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
-        this.interceptors = new 
ConsumerInterceptors<>(Collections.emptyList());
+        this.interceptors = new 
ConsumerInterceptors<>(Collections.emptyList(), metrics);
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.retryBackoffMaxMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
         this.requestTimeoutMs = 
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 2cba76588e5..a505de2dc12 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -319,13 +319,13 @@ public class CompletedFetch {
         K key;
         V value;
         try {
-            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer().deserialize(partition.topic(), headers, 
keyBytes);
         } catch (RuntimeException e) {
             log.error("Key Deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(DeserializationExceptionOrigin.KEY, 
partition, timestampType, record, e, headers);
         }
         try {
-            value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
+            value = valueBytes == null ? null : 
deserializers.valueDeserializer().deserialize(partition.topic(), headers, 
valueBytes);
         } catch (RuntimeException e) {
             log.error("Value Deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(DeserializationExceptionOrigin.VALUE, 
partition, timestampType, record, e, headers);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
index c56ea1a03e9..c58b60ba0f2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
@@ -17,10 +17,13 @@
 package org.apache.kafka.clients.consumer.internals;
 
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,15 +38,15 @@ import java.util.Map;
  */
 public class ConsumerInterceptors<K, V> implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
-    private final List<ConsumerInterceptor<K, V>> interceptors;
+    private final List<Plugin<ConsumerInterceptor<K, V>>> interceptorPlugins;
 
-    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
-        this.interceptors = interceptors;
+    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors, 
Metrics metrics) {
+        this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, 
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG);
     }
 
     /** Returns true if no interceptors are defined. All other methods will be 
no-ops in this case. */
     public boolean isEmpty() {
-        return interceptors.isEmpty();
+        return interceptorPlugins.isEmpty();
     }
 
     /**
@@ -62,9 +65,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
      */
     public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
         ConsumerRecords<K, V> interceptRecords = records;
-        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptRecords = interceptor.onConsume(interceptRecords);
+                interceptRecords = 
interceptorPlugin.get().onConsume(interceptRecords);
             } catch (Exception e) {
                 // do not propagate interceptor exception, log and continue 
calling other interceptors
                 log.warn("Error executing interceptor onConsume callback", e);
@@ -83,9 +86,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
      * @param offsets A map of offsets by partition with associated metadata
      */
     public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
-        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptor.onCommit(offsets);
+                interceptorPlugin.get().onCommit(offsets);
             } catch (Exception e) {
                 // do not propagate interceptor exception, just log
                 log.warn("Error executing interceptor onCommit callback", e);
@@ -98,9 +101,9 @@ public class ConsumerInterceptors<K, V> implements Closeable 
{
      */
     @Override
     public void close() {
-        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptor.close();
+                interceptorPlugin.close();
             } catch (Exception e) {
                 log.error("Failed to close consumer interceptor ", e);
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
index 5de2a888775..0926c720c0c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.Utils;
 
@@ -28,44 +30,54 @@ import java.util.concurrent.atomic.AtomicReference;
 
 public class Deserializers<K, V> implements AutoCloseable {
 
-    public final Deserializer<K> keyDeserializer;
-    public final Deserializer<V> valueDeserializer;
+    private final Plugin<Deserializer<K>> keyDeserializerPlugin;
+    private final Plugin<Deserializer<V>> valueDeserializerPlugin;
 
-    public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V> 
valueDeserializer) {
-        this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Key 
deserializer provided to Deserializers should not be null");
-        this.valueDeserializer = Objects.requireNonNull(valueDeserializer, 
"Value deserializer provided to Deserializers should not be null");
-    }
-
-    public Deserializers(ConsumerConfig config) {
-        this(config, null, null);
+    public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V> 
valueDeserializer, Metrics metrics) {
+        this.keyDeserializerPlugin = Plugin.wrapInstance(
+                Objects.requireNonNull(keyDeserializer, "Key deserializer 
provided to Deserializers should not be null"),
+                metrics,
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+        this.valueDeserializerPlugin = Plugin.wrapInstance(
+                Objects.requireNonNull(valueDeserializer, "Value deserializer 
provided to Deserializers should not be null"),
+                metrics,
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
     }
 
     @SuppressWarnings("unchecked")
-    public Deserializers(ConsumerConfig config, Deserializer<K> 
keyDeserializer, Deserializer<V> valueDeserializer) {
+    public Deserializers(ConsumerConfig config, Deserializer<K> 
keyDeserializer, Deserializer<V> valueDeserializer, Metrics metrics) {
         String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
 
         if (keyDeserializer == null) {
-            this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
-            
this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
 clientId)), true);
+            keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
+            
keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
 clientId)), true);
         } else {
             config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-            this.keyDeserializer = keyDeserializer;
         }
+        this.keyDeserializerPlugin = Plugin.wrapInstance(keyDeserializer, 
metrics, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
 
         if (valueDeserializer == null) {
-            this.valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
-            
this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
 clientId)), false);
+            valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
Deserializer.class);
+            
valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
 clientId)), false);
         } else {
             config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-            this.valueDeserializer = valueDeserializer;
         }
+        this.valueDeserializerPlugin = Plugin.wrapInstance(valueDeserializer, 
metrics, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+    }
+
+    public Deserializer<K> keyDeserializer() {
+        return keyDeserializerPlugin.get();
+    }
+
+    public Deserializer<V> valueDeserializer() {
+        return valueDeserializerPlugin.get();
     }
 
     @Override
     public void close() {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        Utils.closeQuietly(keyDeserializer, "key deserializer", 
firstException);
-        Utils.closeQuietly(valueDeserializer, "value deserializer", 
firstException);
+        Utils.closeQuietly(keyDeserializerPlugin, "key deserializer", 
firstException);
+        Utils.closeQuietly(valueDeserializerPlugin, "value deserializer", 
firstException);
         Throwable exception = firstException.get();
 
         if (exception != null) {
@@ -79,8 +91,8 @@ public class Deserializers<K, V> implements AutoCloseable {
     @Override
     public String toString() {
         return "Deserializers{" +
-                "keyDeserializer=" + keyDeserializer +
-                ", valueDeserializer=" + valueDeserializer +
+                "keyDeserializer=" + keyDeserializerPlugin.get() +
+                ", valueDeserializer=" + valueDeserializerPlugin.get() +
                 '}';
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 74760beec6d..838416b8428 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -296,13 +296,13 @@ public class ShareCompletedFetch {
         K key;
         V value;
         try {
-            key = keyBytes == null ? null : 
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+            key = keyBytes == null ? null : 
deserializers.keyDeserializer().deserialize(partition.topic(), headers, 
keyBytes);
         } catch (RuntimeException e) {
             log.error("Key Deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,
 partition.topicPartition(), timestampType, record, e, headers);
         }
         try {
-            value = valueBytes == null ? null : 
deserializers.valueDeserializer.deserialize(partition.topic(), headers, 
valueBytes);
+            value = valueBytes == null ? null : 
deserializers.valueDeserializer().deserialize(partition.topic(), headers, 
valueBytes);
         } catch (RuntimeException e) {
             log.error("Value Deserializers with error: {}", deserializers);
             throw 
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE,
 partition.topicPartition(), timestampType, record, e, headers);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index de737dde3bf..667f2d00033 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -257,12 +257,12 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.metrics = createMetrics(config, time, reporters);
             this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
 
-            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
+            this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
             this.currentFetch = ShareFetch.empty();
             this.subscriptions = createSubscriptionState(config, logContext);
             ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
                     metrics.reporters(),
-                    Arrays.asList(deserializers.keyDeserializer, 
deserializers.valueDeserializer));
+                    Arrays.asList(deserializers.keyDeserializer(), 
deserializers.valueDeserializer()));
             this.metadata = new ConsumerMetadata(config, subscriptions, 
logContext, clusterResourceListeners);
             final List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config);
             metadata.bootstrap(addresses);
@@ -363,7 +363,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.time = time;
         this.metrics = new Metrics(time);
         this.clientTelemetryReporter = Optional.empty();
-        this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
+        this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer, metrics);
         this.currentFetch = ShareFetch.empty();
         this.subscriptions = subscriptions;
         this.metadata = metadata;
@@ -462,7 +462,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.metrics = metrics;
         this.metadata = metadata;
         this.defaultApiTimeoutMs = defaultApiTimeoutMs;
-        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer);
+        this.deserializers = new Deserializers<>(keyDeserializer, 
valueDeserializer, metrics);
         this.currentFetch = ShareFetch.empty();
         this.applicationEventHandler = applicationEventHandler;
         this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 130cfcb7bfb..7ef51fcd2f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.KafkaMetricsContext;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -249,7 +250,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     // Visible for testing
     final Metrics metrics;
     private final KafkaProducerMetrics producerMetrics;
-    private final Partitioner partitioner;
+    private final Plugin<Partitioner> partitionerPlugin;
     private final int maxRequestSize;
     private final long totalMemorySize;
     private final ProducerMetadata metadata;
@@ -259,8 +260,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final Compression compression;
     private final Sensor errors;
     private final Time time;
-    private final Serializer<K> keySerializer;
-    private final Serializer<V> valueSerializer;
+    private final Plugin<Serializer<K>> keySerializerPlugin;
+    private final Plugin<Serializer<V>> valueSerializerPlugin;
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final boolean partitionerIgnoreKeys;
@@ -366,29 +367,32 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
                     
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time, 
metricsContext);
             this.producerMetrics = new KafkaProducerMetrics(metrics);
-            this.partitioner = config.getConfiguredInstance(
-                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
-                    Partitioner.class,
-                    Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, 
clientId));
+            this.partitionerPlugin = Plugin.wrapInstance(
+                    config.getConfiguredInstance(
+                        ProducerConfig.PARTITIONER_CLASS_CONFIG,
+                        Partitioner.class,
+                        
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
+                    metrics,
+                    ProducerConfig.PARTITIONER_CLASS_CONFIG);
             this.partitionerIgnoreKeys = 
config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
             long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
             long retryBackoffMaxMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
             if (keySerializer == null) {
-                this.keySerializer = 
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                                                                               
          Serializer.class);
-                
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
 clientId)), true);
+                keySerializer = 
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
Serializer.class);
+                
keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
 clientId)), true);
             } else {
                 config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-                this.keySerializer = keySerializer;
             }
+            this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, 
metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+
             if (valueSerializer == null) {
-                this.valueSerializer = 
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                                                                               
            Serializer.class);
-                
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
 clientId)), false);
+                valueSerializer = 
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
Serializer.class);
+                
valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
 clientId)), false);
             } else {
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-                this.valueSerializer = valueSerializer;
             }
+            this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, 
metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
 
             List<ProducerInterceptor<K, V>> interceptorList = 
ClientUtils.configuredInterceptors(config,
                     ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
@@ -396,11 +400,11 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             if (interceptors != null)
                 this.interceptors = interceptors;
             else
-                this.interceptors = new 
ProducerInterceptors<>(interceptorList);
+                this.interceptors = new 
ProducerInterceptors<>(interceptorList, metrics);
             ClusterResourceListeners clusterResourceListeners = 
ClientUtils.configureClusterResourceListeners(
                     interceptorList,
                     reporters,
-                    Arrays.asList(this.keySerializer, this.valueSerializer));
+                    Arrays.asList(this.keySerializerPlugin.get(), 
this.valueSerializerPlugin.get()));
             this.maxRequestSize = 
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compression = configureCompression(config);
@@ -411,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.apiVersions = apiVersions;
             this.transactionManager = configureTransactionState(config, 
logContext);
             // There is no need to do work required for adaptive partitioning, 
if we use a custom partitioner.
-            boolean enableAdaptivePartitioning = partitioner == null &&
+            boolean enableAdaptivePartitioning = partitionerPlugin.get() == 
null &&
                 
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
             RecordAccumulator.PartitionerConfig partitionerConfig = new 
RecordAccumulator.PartitionerConfig(
                 enableAdaptivePartitioning,
@@ -485,9 +489,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         this.log = logContext.logger(KafkaProducer.class);
         this.metrics = metrics;
         this.producerMetrics = new KafkaProducerMetrics(metrics);
-        this.partitioner = partitioner;
-        this.keySerializer = keySerializer;
-        this.valueSerializer = valueSerializer;
+        this.partitionerPlugin = Plugin.wrapInstance(partitioner, metrics, 
ProducerConfig.PARTITIONER_CLASS_CONFIG);
+        this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics, 
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+        this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer, 
metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
         this.interceptors = interceptors;
         this.maxRequestSize = 
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
         this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
@@ -972,7 +976,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             Cluster cluster = clusterAndWaitTime.cluster;
             byte[] serializedKey;
             try {
-                serializedKey = keySerializer.serialize(record.topic(), 
record.headers(), record.key());
+                serializedKey = 
keySerializerPlugin.get().serialize(record.topic(), record.headers(), 
record.key());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert key of class " 
+ record.key().getClass().getName() +
                         " to class " + 
producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
@@ -980,7 +984,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             byte[] serializedValue;
             try {
-                serializedValue = valueSerializer.serialize(record.topic(), 
record.headers(), record.value());
+                serializedValue = 
valueSerializerPlugin.get().serialize(record.topic(), record.headers(), 
record.value());
             } catch (ClassCastException cce) {
                 throw new SerializationException("Can't convert value of class 
" + record.value().getClass().getName() +
                         " to class " + 
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() 
+
@@ -1414,9 +1418,9 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         Utils.closeQuietly(interceptors, "producer interceptors", 
firstException);
         Utils.closeQuietly(producerMetrics, "producer metrics wrapper", 
firstException);
         Utils.closeQuietly(metrics, "producer metrics", firstException);
-        Utils.closeQuietly(keySerializer, "producer keySerializer", 
firstException);
-        Utils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
-        Utils.closeQuietly(partitioner, "producer partitioner", 
firstException);
+        Utils.closeQuietly(keySerializerPlugin, "producer keySerializer", 
firstException);
+        Utils.closeQuietly(valueSerializerPlugin, "producer valueSerializer", 
firstException);
+        Utils.closeQuietly(partitionerPlugin, "producer partitioner", 
firstException);
         clientTelemetryReporter.ifPresent(reporter -> 
Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         Throwable exception = firstException.get();
@@ -1443,8 +1447,8 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         if (record.partition() != null)
             return record.partition();
 
-        if (partitioner != null) {
-            int customPartition = partitioner.partition(
+        if (partitionerPlugin.get() != null) {
+            int customPartition = partitionerPlugin.get().partition(
                 record.topic(), record.key(), serializedKey, record.value(), 
serializedValue, cluster);
             if (customPartition < 0) {
                 throw new IllegalArgumentException(String.format(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index 96345d8f8b0..d1d1ad3ac55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -23,6 +23,9 @@ import java.io.Closeable;
 
 /**
  * Partitioner Interface
+ * <br/>
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
partitioner to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to 
<code>partitioner.class</code>, and <code>class</code> set to the Partitioner 
class name.
  */
 public interface Partitioner extends Configurable, Closeable {
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index 48caf98d44a..5bc4b2c2c85 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.Configurable;
  * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor implementation must ensure thread-safety, if needed.
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to 
<code>interceptor.classes</code>, and <code>class</code> set to the 
ProducerInterceptor class name.
  */
 public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable 
{
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 75bf8485e47..9936eef7609 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -17,10 +17,13 @@
 package org.apache.kafka.clients.producer.internals;
 
 
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.record.RecordBatch;
 
 import org.slf4j.Logger;
@@ -35,10 +38,10 @@ import java.util.List;
  */
 public class ProducerInterceptors<K, V> implements Closeable {
     private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
-    private final List<ProducerInterceptor<K, V>> interceptors;
+    private final List<Plugin<ProducerInterceptor<K, V>>> interceptorPlugins;
 
-    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
-        this.interceptors = interceptors;
+    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors, 
Metrics metrics) {
+        this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics, 
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);
     }
 
     /**
@@ -57,9 +60,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
      */
     public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
         ProducerRecord<K, V> interceptRecord = record;
-        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptRecord = interceptor.onSend(interceptRecord);
+                interceptRecord = 
interceptorPlugin.get().onSend(interceptRecord);
             } catch (Exception e) {
                 // do not propagate interceptor exception, log and continue 
calling other interceptors
                 // be careful not to throw exception from here
@@ -84,9 +87,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
     public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
-        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptor.onAcknowledgement(metadata, exception);
+                interceptorPlugin.get().onAcknowledgement(metadata, exception);
             } catch (Exception e) {
                 // do not propagate interceptor exceptions, just log
                 log.warn("Error executing interceptor onAcknowledgement 
callback", e);
@@ -105,15 +108,15 @@ public class ProducerInterceptors<K, V> implements 
Closeable {
      * @param exception The exception thrown during processing of this record.
      */
     public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
-        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
                 if (record == null && interceptTopicPartition == null) {
-                    interceptor.onAcknowledgement(null, exception);
+                    interceptorPlugin.get().onAcknowledgement(null, exception);
                 } else {
                     if (interceptTopicPartition == null) {
                         interceptTopicPartition = 
extractTopicPartition(record);
                     }
-                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
+                    interceptorPlugin.get().onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                     RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
                 }
             } catch (Exception e) {
@@ -132,9 +135,9 @@ public class ProducerInterceptors<K, V> implements 
Closeable {
      */
     @Override
     public void close() {
-        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+        for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : 
this.interceptorPlugins) {
             try {
-                interceptor.close();
+                interceptorPlugin.close();
             } catch (Exception e) {
                 log.error("Failed to close producer interceptor ", e);
             }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java 
b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
new file mode 100644
index 00000000000..620cd0c07ec
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class Plugin<T> implements Supplier<T>, AutoCloseable {
+
+    private final T instance;
+    private final Optional<PluginMetricsImpl> pluginMetrics;
+
+    private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
+        this.instance = instance;
+        this.pluginMetrics = Optional.ofNullable(pluginMetrics);
+    }
+
+    public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, 
String key) {
+        return wrapInstance(instance, metrics, () -> tags(key, instance));
+    }
+
+    private static <T> Map<String, String> tags(String key, T instance) {
+        Map<String, String> tags = new LinkedHashMap<>();
+        tags.put("config", key);
+        tags.put("class", instance.getClass().getSimpleName());
+        return tags;
+    }
+
+    public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics 
metrics, String key) {
+        List<Plugin<T>> plugins = new ArrayList<>();
+        for (T instance : instances) {
+            plugins.add(wrapInstance(instance, metrics, key));
+        }
+        return plugins;
+    }
+
+    public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics, 
Supplier<Map<String, String>> tagsSupplier) {
+        PluginMetricsImpl pluginMetrics = null;
+        if (instance instanceof Monitorable && metrics != null) {
+            pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
+            ((Monitorable) instance).withPluginMetrics(pluginMetrics);
+        }
+        return new Plugin<>(instance, pluginMetrics);
+    }
+
+    @Override
+    public T get() {
+        return instance;
+    }
+
+    @Override
+    public void close() throws Exception {
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
+        if (instance instanceof AutoCloseable) {
+            Utils.closeQuietly((AutoCloseable) instance, 
instance.getClass().getSimpleName(), firstException);
+        }
+        pluginMetrics.ifPresent(metrics -> Utils.closeQuietly(metrics, 
"pluginMetrics", firstException));
+        Throwable throwable = firstException.get();
+        if (throwable != null) throw new KafkaException("failed closing 
plugin", throwable);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java
new file mode 100644
index 00000000000..fa5a292bf31
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * Plugins can implement this interface to register their own metrics.
+ */
+public interface Monitorable {
+
+    /**
+     * Provides a {@link PluginMetrics} instance from the component that 
instantiates the plugin.
+     * PluginMetrics can be used by the plugin to register and unregister 
metrics
+     * at any point in their lifecycle prior to their close method being 
called.
+     * Any metrics registered will be automatically removed when the plugin is 
closed.
+     */
+    void withPluginMetrics(PluginMetrics metrics);
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java
new file mode 100644
index 00000000000..d296234105d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Map;
+
+/**
+ * This allows plugins to register metrics and sensors.
+ * Any metrics registered by the plugin are automatically removed when the 
plugin  closed.
+ */
+public interface PluginMetrics {
+
+    /**
+     * Create a {@link MetricName} with the given name, description and tags. 
The group will be set to "plugins"
+     * Tags to uniquely identify the plugins are automatically added to the 
provided tags
+     *
+     * @param name        The name of the metric
+     * @param description A human-readable description to include in the metric
+     * @param tags        Additional tags for the metric
+     * @throws IllegalArgumentException if any of the tag names collide with 
the default tags for the plugin
+     */
+    MetricName metricName(String name, String description, Map<String, String> 
tags);
+
+    /**
+     * Add a metric to monitor an object that implements {@link 
MetricValueProvider}. This metric won't be associated with any
+     * sensor. This is a way to expose existing values as metrics.
+     *
+     * @param metricName The name of the metric
+     * @param metricValueProvider The metric value provider associated with 
this metric
+     * @throws IllegalArgumentException if a metric with same name already 
exists
+     */
+    void addMetric(MetricName metricName, MetricValueProvider<?> 
metricValueProvider);
+
+    /**
+     * Remove a metric if it exists.
+     *
+     * @param metricName The name of the metric
+     * @throws IllegalArgumentException if a metric with this name does not 
exist
+     */
+    void removeMetric(MetricName metricName);
+
+    /**
+     * Create a {@link Sensor} with the given unique name. The name must only 
be unique for the plugin, so different
+     * plugins can use the same names.
+     *
+     * @param name The sensor name
+     * @return The sensor
+     * @throws IllegalArgumentException if a sensor with same name already 
exists for this plugin
+     */
+    Sensor addSensor(String name);
+
+    /**
+     * Remove a {@link Sensor} and its associated metrics.
+     *
+     * @param name The name of the sensor to be removed
+     * @throws IllegalArgumentException if a sensor with this name does not 
exist
+     */
+    void removeSensor(String name);
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
 
b/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
new file mode 100644
index 00000000000..c109b578962
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.PluginMetrics;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PluginMetricsImpl implements PluginMetrics, Closeable {
+
+    private static final String GROUP = "plugins";
+
+    private final Metrics metrics;
+    private final Map<String, String> tags;
+    private final Set<MetricName> metricNames = ConcurrentHashMap.newKeySet();
+    private final Set<String> sensors = ConcurrentHashMap.newKeySet();
+    private volatile boolean closing = false;
+
+    public PluginMetricsImpl(Metrics metrics, Map<String, String> tags) {
+        this.metrics = metrics;
+        this.tags = tags;
+    }
+
+    @Override
+    public MetricName metricName(String name, String description, Map<String, 
String> tags) {
+        if (closing) throw new IllegalStateException("This PluginMetrics 
instance is closed");
+        for (String tagName : tags.keySet()) {
+            if (this.tags.containsKey(tagName)) {
+                throw new IllegalArgumentException("Cannot use " + tagName + " 
as a tag name");
+            }
+        }
+        Map<String, String> metricsTags = new LinkedHashMap<>(this.tags);
+        metricsTags.putAll(tags);
+        return metrics.metricName(name, GROUP, description, metricsTags);
+    }
+
+    @Override
+    public void addMetric(MetricName metricName, MetricValueProvider<?> 
metricValueProvider) {
+        if (closing) throw new IllegalStateException("This PluginMetrics 
instance is closed");
+        if (metricNames.contains(metricName)) {
+            throw new IllegalArgumentException("Metric " + metricName + " 
already exists");
+        }
+        metrics.addMetric(metricName, metricValueProvider);
+        metricNames.add(metricName);
+    }
+
+    @Override
+    public void removeMetric(MetricName metricName) {
+        if (closing) throw new IllegalStateException("This PluginMetrics 
instance is closed");
+        if (metricNames.contains(metricName)) {
+            metrics.removeMetric(metricName);
+            metricNames.remove(metricName);
+        } else {
+            throw new IllegalArgumentException("Unknown metric " + metricName);
+        }
+    }
+
+    @Override
+    public Sensor addSensor(String name) {
+        if (closing) throw new IllegalStateException("This PluginMetrics 
instance is closed");
+        if (sensors.contains(name)) {
+            throw new IllegalArgumentException("Sensor " + name + " already 
exists");
+        }
+        Sensor sensor = metrics.sensor(name);
+        sensors.add(name);
+        return sensor;
+    }
+
+    @Override
+    public void removeSensor(String name) {
+        if (closing) throw new IllegalStateException("This PluginMetrics 
instance is closed");
+        if (sensors.contains(name)) {
+            metrics.removeSensor(name);
+            sensors.remove(name);
+        } else {
+            throw new IllegalArgumentException("Unknown sensor " + name);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        closing = true;
+        for (String sensor : sensors) {
+            metrics.removeSensor(sensor);
+        }
+        for (MetricName metricName : metricNames) {
+            metrics.removeMetric(metricName);
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index ad8d2bfe4d4..1997e66aad2 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -29,7 +29,8 @@ import java.util.Map;
  * A class that implements this interface is expected to have a constructor 
with no parameters.
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
- *
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
deserializer to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to either 
<code>key.deserializer</code> or <code>value.deserializer</code>, and 
<code>class</code> set to the Deserializer class name.
  * @param <T> Type to be deserialized into.
  */
 public interface Deserializer<T> extends Closeable {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 144b5ab945e..03eab512aa7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -27,7 +27,8 @@ import java.util.Map;
  * A class that implements this interface is expected to have a constructor 
with no parameter.
  * <p>
  * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
- *
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the 
serializer to register metrics. The following tags ae automatically added to
+ * all metrics registered: <code>config</code> set to either 
<code>key.serializer</code> or <code>value.serializer</code>, and 
<code>class</code> set to the Serializer class name.
  * @param <T> Type to be serialized from.
  */
 public interface Serializer<T> extends Closeable {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2749563df27..c2a3a4951e7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -66,6 +66,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
@@ -101,6 +103,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockDeserializer;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.TestUtils;
 
@@ -3685,4 +3688,82 @@ public void 
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
             
CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
         }
     }
+
+    @ParameterizedTest
+    @EnumSource(value = GroupProtocol.class)
+    void testMonitorablePlugins(GroupProtocol groupProtocol) {
+        try {
+            String clientId = "testMonitorablePlugins";
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+            configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
MonitorableDeserializer.class.getName());
+            configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
MonitorableDeserializer.class.getName());
+            configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MonitorableInterceptor.class.getName());
+
+            KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(configs);
+            Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+
+            MetricName expectedKeyDeserializerMetric = expectedMetricName(
+                    clientId,
+                    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                    MonitorableDeserializer.class);
+            assertTrue(metrics.containsKey(expectedKeyDeserializerMetric));
+            assertEquals(VALUE, 
metrics.get(expectedKeyDeserializerMetric).metricValue());
+
+            MetricName expectedValueDeserializerMetric = expectedMetricName(
+                    clientId,
+                    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                    MonitorableDeserializer.class);
+            assertTrue(metrics.containsKey(expectedValueDeserializerMetric));
+            assertEquals(VALUE, 
metrics.get(expectedValueDeserializerMetric).metricValue());
+
+            MetricName expectedInterceptorMetric = expectedMetricName(
+                    clientId,
+                    ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    MonitorableInterceptor.class);
+            assertTrue(metrics.containsKey(expectedInterceptorMetric));
+            assertEquals(VALUE, 
metrics.get(expectedInterceptorMetric).metricValue());
+
+            consumer.close(Duration.ZERO);
+            metrics = consumer.metrics();
+            assertFalse(metrics.containsKey(expectedKeyDeserializerMetric));
+            assertFalse(metrics.containsKey(expectedValueDeserializerMetric));
+            assertFalse(metrics.containsKey(expectedInterceptorMetric));
+        } finally {
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
+
+    private MetricName expectedMetricName(String clientId, String config, 
Class<?> clazz) {
+        Map<String, String> expectedTags = new LinkedHashMap<>();
+        expectedTags.put("client-id", clientId);
+        expectedTags.put("config", config);
+        expectedTags.put("class", clazz.getSimpleName());
+        expectedTags.putAll(TAGS);
+        return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
+    }
+
+    private static final String NAME = "name";
+    private static final String DESCRIPTION = "description";
+    private static final Map<String, String> TAGS = 
Collections.singletonMap("k", "v");
+    private static final double VALUE = 123.0;
+
+    public static class MonitorableDeserializer extends MockDeserializer 
implements Monitorable {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+            metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+        }
+    }
+
+    public static class MonitorableInterceptor extends MockConsumerInterceptor 
implements Monitorable {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+            metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 819365e9712..1fb8daae00d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -168,6 +168,7 @@ public class AsyncKafkaConsumerTest {
 
     private AsyncKafkaConsumer<String, String> consumer = null;
     private Time time = new MockTime(0);
+    private final Metrics metrics = new Metrics();
     private final FetchCollector<String, String> fetchCollector = 
mock(FetchCollector.class);
     private final ApplicationEventHandler applicationEventHandler = 
mock(ApplicationEventHandler.class);
     private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
@@ -248,7 +249,7 @@ public class AsyncKafkaConsumerTest {
         return new AsyncKafkaConsumer<>(
             new LogContext(),
             clientId,
-            new Deserializers<>(new StringDeserializer(), new 
StringDeserializer()),
+            new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), metrics),
             fetchBuffer,
             fetchCollector,
             interceptors,
@@ -257,7 +258,7 @@ public class AsyncKafkaConsumerTest {
             backgroundEventQueue,
             backgroundEventReaper,
             rebalanceListenerInvoker,
-            new Metrics(),
+            metrics,
             subscriptions,
             metadata,
             retryBackoffMs,
@@ -671,7 +672,7 @@ public class AsyncKafkaConsumerTest {
 
         consumer = spy(newConsumer(
             mock(FetchBuffer.class),
-            new ConsumerInterceptors<>(Collections.emptyList()),
+            new ConsumerInterceptors<>(Collections.emptyList(), metrics),
             invoker,
             subscriptions,
             "group-id",
@@ -1543,7 +1544,7 @@ public class AsyncKafkaConsumerTest {
         SubscriptionState subscriptions = new SubscriptionState(new 
LogContext(), AutoOffsetResetStrategy.NONE);
         consumer = newConsumer(
                 mock(FetchBuffer.class),
-                new ConsumerInterceptors<>(Collections.emptyList()),
+                new ConsumerInterceptors<>(Collections.emptyList(), metrics),
                 mock(ConsumerRebalanceListenerInvoker.class),
                 subscriptions,
                 "group-id",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index e12b0121fd4..b804b6b8ae2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -232,11 +232,11 @@ public class CompletedFetchTest {
     }
 
     private static Deserializers<UUID, UUID> newUuidDeserializers() {
-        return new Deserializers<>(new UUIDDeserializer(), new 
UUIDDeserializer());
+        return new Deserializers<>(new UUIDDeserializer(), new 
UUIDDeserializer(), null);
     }
 
     private static Deserializers<String, String> newStringDeserializers() {
-        return new Deserializers<>(new StringDeserializer(), new 
StringDeserializer());
+        return new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), null);
     }
 
     private static FetchConfig newFetchConfig(IsolationLevel isolationLevel, 
boolean checkCrcs) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 5a7d85369ea..e6b091e2fd2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -116,7 +116,7 @@ public class ConsumerInterceptorsTest {
         FilterConsumerInterceptor<Integer, Integer> interceptor2 = new 
FilterConsumerInterceptor<>(filterPartition2);
         interceptorList.add(interceptor1);
         interceptorList.add(interceptor2);
-        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList);
+        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList, null);
 
         // verify that onConsumer modifies ConsumerRecords
         Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = 
new HashMap<>();
@@ -177,7 +177,7 @@ public class ConsumerInterceptorsTest {
         FilterConsumerInterceptor<Integer, Integer> interceptor2 = new 
FilterConsumerInterceptor<>(filterPartition2);
         interceptorList.add(interceptor1);
         interceptorList.add(interceptor2);
-        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList);
+        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList, null);
 
         // verify that onCommit is called for all interceptors in the chain
         Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 68b9ecb528b..d7d53537ae6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -712,7 +712,7 @@ public class FetchCollectorTest {
             mock(ConsumerMetadata.class),
             subscriptions,
             new FetchConfig(new ConsumerConfig(consumerProps)),
-            new Deserializers<>(new StringDeserializer(), new 
StringDeserializer()),
+            new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), null),
             mock(FetchMetricsManager.class),
             new MockTime()
         );
@@ -741,12 +741,11 @@ public class FetchCollectorTest {
         Properties p = consumerProperties(maxPollRecords);
         ConsumerConfig config = new ConsumerConfig(p);
 
-        deserializers = new Deserializers<>(new StringDeserializer(), new 
StringDeserializer());
-
         subscriptions = createSubscriptionState(config, logContext);
         fetchConfig = createFetchConfig(config, isolationLevel);
         Metrics metrics = createMetrics(config, time);
         metricsManager = createFetchMetricsManager(metrics);
+        deserializers = new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), metrics);
         metadata = new ConsumerMetadata(
                 0,
                 1000,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index d6f19f4bf58..0ec0633b78b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -3936,7 +3936,7 @@ public class FetchRequestManagerTest {
                                      SubscriptionState subscriptionState,
                                      LogContext logContext) {
         buildDependencies(metricConfig, metadataExpireMs, subscriptionState, 
logContext);
-        Deserializers<K, V> deserializers = new 
Deserializers<>(keyDeserializer, valueDeserializer);
+        Deserializers<K, V> deserializers = new 
Deserializers<>(keyDeserializer, valueDeserializer, metrics);
         FetchConfig fetchConfig = new FetchConfig(
                 minBytes,
                 maxBytes,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index bc82aeae9fa..b24475300c8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2824,7 +2824,7 @@ public class FetcherTest {
                 isolationLevel,
                 apiVersions);
 
-        Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new 
ByteArrayDeserializer(), new ByteArrayDeserializer());
+        Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new 
ByteArrayDeserializer(), new ByteArrayDeserializer(), metrics);
         FetchConfig fetchConfig = new FetchConfig(
                 minBytes,
                 maxBytes,
@@ -3858,7 +3858,7 @@ public class FetcherTest {
                 metadata,
                 subscriptionState,
                 fetchConfig,
-                new Deserializers<>(keyDeserializer, valueDeserializer),
+                new Deserializers<>(keyDeserializer, valueDeserializer, 
metrics),
                 metricsManager,
                 time,
                 apiVersions));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 96d6e5e0b3d..3b7fe70ea4c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -1249,7 +1249,7 @@ public class OffsetFetcherTest {
                 metadata,
                 subscriptions,
                 fetchConfig,
-                new Deserializers<>(new ByteArrayDeserializer(), new 
ByteArrayDeserializer()),
+                new Deserializers<>(new ByteArrayDeserializer(), new 
ByteArrayDeserializer(), metrics),
                 new FetchMetricsManager(metrics, metricsRegistry),
                 time,
                 apiVersions);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index b117af177b1..3b488d3d002 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -374,11 +374,11 @@ public class ShareCompletedFetchTest {
     }
 
     private static Deserializers<UUID, UUID> newUuidDeserializers() {
-        return new Deserializers<>(new UUIDDeserializer(), new 
UUIDDeserializer());
+        return new Deserializers<>(new UUIDDeserializer(), new 
UUIDDeserializer(), null);
     }
 
     private static Deserializers<String, String> newStringDeserializers() {
-        return new Deserializers<>(new StringDeserializer(), new 
StringDeserializer());
+        return new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), null);
     }
 
     private Records newRecords(long baseOffset, int count) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 0e74a9768a8..efb190506ea 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1855,7 +1855,7 @@ public class ShareConsumeRequestManagerTest {
                                             SubscriptionState 
subscriptionState,
                                             LogContext logContext) {
         buildDependencies(metricConfig, subscriptionState, logContext);
-        Deserializers<K, V> deserializers = new 
Deserializers<>(keyDeserializer, valueDeserializer);
+        Deserializers<K, V> deserializers = new 
Deserializers<>(keyDeserializer, valueDeserializer, metrics);
         int maxWaitMs = 0;
         int maxBytes = Integer.MAX_VALUE;
         int fetchSize = 1000;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 893840de4c6..eb63c94673e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -236,8 +236,8 @@ public class ShareFetchCollectorTest {
 
         ConsumerConfig config = new ConsumerConfig(p);
 
-        deserializers = new Deserializers<>(new StringDeserializer(), new 
StringDeserializer());
         Metrics metrics = createMetrics(config, Time.SYSTEM);
+        deserializers = new Deserializers<>(new StringDeserializer(), new 
StringDeserializer(), metrics);
         ShareFetchMetricsManager shareFetchMetricsManager = 
createShareFetchMetricsManager(metrics);
         Set<TopicPartition> partitionSet = new HashSet<>();
         partitionSet.add(topicAPartition0.topicPartition());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 779a42656a9..3af4a7185cc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -60,6 +60,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
@@ -113,6 +115,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -1456,7 +1459,7 @@ public class KafkaProducerTest {
         ApiVersions apiVersions = new ApiVersions();
         apiVersions.update(NODE.idString(), nodeApiVersions);
 
-        ProducerInterceptors<String, String> interceptor = new 
ProducerInterceptors<>(Collections.emptyList());
+        ProducerInterceptors<String, String> interceptor = new 
ProducerInterceptors<>(Collections.emptyList(), null);
 
         
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
@@ -1730,7 +1733,7 @@ public class KafkaProducerTest {
 
         try (KafkaProducer<String, String> producer = new KafkaProducer<>(
             new ProducerConfig(properties), new StringSerializer(), new 
StringSerializer(), metadata, client,
-            new ProducerInterceptors<>(Collections.emptyList()), apiVersions, 
time)) {
+            new ProducerInterceptors<>(Collections.emptyList(), null), 
apiVersions, time)) {
             producer.initTransactions();
             producer.beginTransaction();
             producer.sendOffsetsToTransaction(Collections.singletonMap(
@@ -1782,7 +1785,7 @@ public class KafkaProducerTest {
         ApiVersions apiVersions = new ApiVersions();
         apiVersions.update(NODE.idString(), nodeApiVersions);
 
-        ProducerInterceptors<String, String> interceptor = new 
ProducerInterceptors<>(Collections.emptyList());
+        ProducerInterceptors<String, String> interceptor = new 
ProducerInterceptors<>(Collections.emptyList(), null);
 
         
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE, 
"some-txn", NODE));
         client.prepareResponse(initProducerIdResponse(1L, (short) 5, 
Errors.NONE));
@@ -2310,7 +2313,7 @@ public class KafkaProducerTest {
         String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
 
         ProducerInterceptors<String, String> producerInterceptors =
-                new ProducerInterceptors<>(Collections.singletonList(new 
MockProducerInterceptor()));
+                new ProducerInterceptors<>(Collections.singletonList(new 
MockProducerInterceptor()), null);
 
         try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
                 producerMetadata, client, producerInterceptors, time)) {
@@ -2605,7 +2608,7 @@ public class KafkaProducerTest {
             ProducerConfig producerConfig = new ProducerConfig(
                 ProducerConfig.appendSerializerToConfig(configs, serializer, 
serializer));
 
-            ProducerInterceptors<T, T> interceptors = new 
ProducerInterceptors<>(this.interceptors);
+            ProducerInterceptors<T, T> interceptors = new 
ProducerInterceptors<>(this.interceptors, metrics);
 
             return new KafkaProducer<>(
                 producerConfig,
@@ -2761,4 +2764,100 @@ public class KafkaProducerTest {
         KafkaMetric streamClientMetricTwo = new KafkaMetric(lock, 
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
         return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo, 
streamClientMetricTwo);
     }
+
+    @Test
+    void testMonitorablePlugins() {
+        try {
+            String clientId = "testMonitorablePlugins";
+            Map<String, Object> configs = new HashMap<>();
+            configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
MonitorableSerializer.class.getName());
+            configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
MonitorableSerializer.class.getName());
+            configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
MonitorablePartitioner.class.getName());
+            configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MonitorableInterceptor.class.getName());
+            configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "");
+
+            KafkaProducer<String, String> producer = new 
KafkaProducer<>(configs);
+            Map<MetricName, ? extends Metric> metrics = producer.metrics();
+
+            MetricName expectedKeySerializerMetric = expectedMetricName(
+                    clientId,
+                    ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                    MonitorableSerializer.class);
+            assertTrue(metrics.containsKey(expectedKeySerializerMetric));
+            assertEquals(VALUE, 
metrics.get(expectedKeySerializerMetric).metricValue());
+
+            MetricName expectedValueSerializerMetric = expectedMetricName(
+                    clientId,
+                    ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                    MonitorableSerializer.class);
+            assertTrue(metrics.containsKey(expectedValueSerializerMetric));
+            assertEquals(VALUE, 
metrics.get(expectedValueSerializerMetric).metricValue());
+
+            MetricName expectedPartitionerMetric = expectedMetricName(
+                    clientId,
+                    ProducerConfig.PARTITIONER_CLASS_CONFIG,
+                    MonitorablePartitioner.class);
+            assertTrue(metrics.containsKey(expectedPartitionerMetric));
+            assertEquals(VALUE, 
metrics.get(expectedPartitionerMetric).metricValue());
+
+            MetricName expectedInterceptorMetric = expectedMetricName(
+                    clientId,
+                    ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    MonitorableInterceptor.class);
+            assertTrue(metrics.containsKey(expectedInterceptorMetric));
+            assertEquals(VALUE, 
metrics.get(expectedInterceptorMetric).metricValue());
+
+            producer.close();
+            metrics = producer.metrics();
+            assertFalse(metrics.containsKey(expectedKeySerializerMetric));
+            assertFalse(metrics.containsKey(expectedValueSerializerMetric));
+            assertFalse(metrics.containsKey(expectedPartitionerMetric));
+            assertFalse(metrics.containsKey(expectedInterceptorMetric));
+        } finally {
+            MockProducerInterceptor.resetCounters();
+        }
+    }
+
+    private MetricName expectedMetricName(String clientId, String config, 
Class<?> clazz) {
+        Map<String, String> expectedTags = new LinkedHashMap<>();
+        expectedTags.put("client-id", clientId);
+        expectedTags.put("config", config);
+        expectedTags.put("class", clazz.getSimpleName());
+        expectedTags.putAll(TAGS);
+        return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
+    }
+
+    private static final String NAME = "name";
+    private static final String DESCRIPTION = "description";
+    private static final Map<String, String> TAGS = 
Collections.singletonMap("k", "v");
+    private static final double VALUE = 123.0;
+
+    public static class MonitorableSerializer extends MockSerializer 
implements Monitorable {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+            metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+        }
+    }
+
+    public static class MonitorablePartitioner extends MockPartitioner 
implements Monitorable {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+            metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+        }
+    }
+
+    public static class MonitorableInterceptor extends MockProducerInterceptor 
implements Monitorable {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+            metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 13d4957a78e..853b27b2551 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -104,7 +104,7 @@ public class ProducerInterceptorsTest {
         AppendProducerInterceptor interceptor2 = new 
AppendProducerInterceptor("Two");
         interceptorList.add(interceptor1);
         interceptorList.add(interceptor2);
-        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList, null);
 
         // verify that onSend() mutates the record as expected
         ProducerRecord<Integer, String> interceptedRecord = 
interceptors.onSend(producerRecord);
@@ -142,7 +142,7 @@ public class ProducerInterceptorsTest {
         AppendProducerInterceptor interceptor2 = new 
AppendProducerInterceptor("Two");
         interceptorList.add(interceptor1);
         interceptorList.add(interceptor2);
-        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList, null);
 
         // verify onAck is called on all interceptors
         RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
@@ -166,7 +166,7 @@ public class ProducerInterceptorsTest {
         List<ProducerInterceptor<Integer, String>> interceptorList = new 
ArrayList<>();
         AppendProducerInterceptor interceptor1 = new 
AppendProducerInterceptor("One");
         interceptorList.add(interceptor1);
-        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList, null);
 
         // verify that metadata contains both topic and partition
         interceptors.onSendError(producerRecord,
diff --git 
a/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java 
b/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java
new file mode 100644
index 00000000000..d414d77bdba
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PluginTest {
+
+    private static final String CONFIG = "some.config";
+    private static final Metrics METRICS = new Metrics();
+
+    static class SomePlugin implements Closeable {
+
+        PluginMetrics pluginMetrics;
+        boolean closed;
+        boolean throwOnClose = false;
+
+        @Override
+        public void close() throws IOException {
+            if (throwOnClose) throw new RuntimeException("throw on close");
+            closed = true;
+        }
+    }
+
+    static class SomeMonitorablePlugin extends SomePlugin implements 
Monitorable  {
+
+        @Override
+        public void withPluginMetrics(PluginMetrics metrics) {
+            pluginMetrics = metrics;
+        }
+    }
+
+    @Test
+    void testWrapInstance() throws Exception {
+        SomeMonitorablePlugin someMonitorablePlugin = new 
SomeMonitorablePlugin();
+        Plugin<SomeMonitorablePlugin> pluginMonitorable = 
Plugin.wrapInstance(someMonitorablePlugin, METRICS, CONFIG);
+        checkPlugin(pluginMonitorable, someMonitorablePlugin, true);
+
+        someMonitorablePlugin = new SomeMonitorablePlugin();
+        assertFalse(someMonitorablePlugin.closed);
+        pluginMonitorable = Plugin.wrapInstance(someMonitorablePlugin, null, 
CONFIG);
+        checkPlugin(pluginMonitorable, someMonitorablePlugin, false);
+
+        SomePlugin somePlugin = new SomePlugin();
+        assertFalse(somePlugin.closed);
+        Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, null, 
CONFIG);
+        assertSame(somePlugin, plugin.get());
+        assertNull(somePlugin.pluginMetrics);
+        plugin.close();
+        assertTrue(somePlugin.closed);
+    }
+
+    @Test
+    void testWrapInstances() throws Exception {
+        List<SomeMonitorablePlugin> someMonitorablePlugins = Arrays.asList(new 
SomeMonitorablePlugin(), new SomeMonitorablePlugin());
+        List<Plugin<SomeMonitorablePlugin>> pluginsMonitorable = 
Plugin.wrapInstances(someMonitorablePlugins, METRICS, CONFIG);
+        assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
+        for (int i = 0; i < pluginsMonitorable.size(); i++) {
+            Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
+            SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
+            checkPlugin(plugin, somePlugin, true);
+        }
+
+        someMonitorablePlugins = Arrays.asList(new SomeMonitorablePlugin(), 
new SomeMonitorablePlugin());
+        pluginsMonitorable = Plugin.wrapInstances(someMonitorablePlugins, 
null, CONFIG);
+        assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
+        for (int i = 0; i < pluginsMonitorable.size(); i++) {
+            Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
+            SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
+            checkPlugin(plugin, somePlugin, false);
+        }
+
+        List<SomePlugin> somePlugins = Arrays.asList(new SomePlugin(), new 
SomePlugin());
+        List<Plugin<SomePlugin>> plugins = Plugin.wrapInstances(somePlugins, 
METRICS, CONFIG);
+        assertEquals(somePlugins.size(), plugins.size());
+        for (int i = 0; i < plugins.size(); i++) {
+            Plugin<SomePlugin> plugin = plugins.get(i);
+            SomePlugin somePlugin = somePlugins.get(i);
+            assertSame(somePlugin, plugin.get());
+            assertNull(somePlugin.pluginMetrics);
+            plugin.close();
+            assertTrue(somePlugin.closed);
+        }
+    }
+
+    @Test
+    public void testCloseThrows() {
+        SomePlugin somePlugin = new SomePlugin();
+        somePlugin.throwOnClose = true;
+        Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, METRICS, 
CONFIG);
+        assertThrows(KafkaException.class, plugin::close);
+    }
+
+    @Test
+    public void testUsePluginMetricsAfterClose() throws Exception {
+        Plugin<SomeMonitorablePlugin> plugin = Plugin.wrapInstance(new 
SomeMonitorablePlugin(), METRICS, CONFIG);
+        PluginMetrics pluginMetrics = plugin.get().pluginMetrics;
+        plugin.close();
+        assertThrows(IllegalStateException.class, () -> 
pluginMetrics.metricName("", "", Collections.emptyMap()));
+        assertThrows(IllegalStateException.class, () -> 
pluginMetrics.addMetric(null, null));
+        assertThrows(IllegalStateException.class, () -> 
pluginMetrics.removeMetric(null));
+        assertThrows(IllegalStateException.class, () -> 
pluginMetrics.addSensor(""));
+        assertThrows(IllegalStateException.class, () -> 
pluginMetrics.removeSensor(""));
+    }
+
+    private void checkPlugin(Plugin<SomeMonitorablePlugin> plugin, 
SomeMonitorablePlugin instance, boolean metricsSet) throws Exception {
+        assertSame(instance, plugin.get());
+        if (metricsSet) {
+            assertNotNull(instance.pluginMetrics);
+        } else {
+            assertNull(instance.pluginMetrics);
+        }
+        plugin.close();
+        assertTrue(instance.closed);
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
 
b/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
new file mode 100644
index 00000000000..0ff349e361d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PluginMetricsImplTest {
+
+    private final Map<String, String> extraTags = 
Collections.singletonMap("my-tag", "my-value");
+    private Map<String, String> tags;
+    private Metrics metrics;
+    private int initialMetrics;
+
+    @BeforeEach
+    void setup() {
+        metrics = new Metrics();
+        initialMetrics = metrics.metrics().size();
+        tags = new LinkedHashMap<>();
+        tags.put("k1", "v1");
+        tags.put("k2", "v2");
+    }
+
+    @Test
+    void testMetricName() {
+        PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+        MetricName metricName = pmi.metricName("name", "description", 
extraTags);
+        assertEquals("name", metricName.name());
+        assertEquals("plugins", metricName.group());
+        assertEquals("description", metricName.description());
+        Map<String, String> expectedTags = new LinkedHashMap<>(tags);
+        expectedTags.putAll(extraTags);
+        assertEquals(expectedTags, metricName.tags());
+    }
+
+    @Test
+    void testDuplicateTagName() {
+        PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+        assertThrows(IllegalArgumentException.class,
+                () -> pmi.metricName("name", "description", 
Collections.singletonMap("k1", "value")));
+    }
+
+    @Test
+    void testAddRemoveMetrics() {
+        PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+        MetricName metricName = pmi.metricName("name", "description", 
extraTags);
+        pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0);
+        assertEquals(initialMetrics + 1, metrics.metrics().size());
+
+        assertThrows(IllegalArgumentException.class, () -> 
pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0));
+
+        pmi.removeMetric(metricName);
+        assertEquals(initialMetrics, metrics.metrics().size());
+
+        assertThrows(IllegalArgumentException.class, () -> 
pmi.removeMetric(metricName));
+    }
+
+    @Test
+    void testAddRemoveSensor() {
+        PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+        String sensorName = "my-sensor";
+        MetricName metricName = pmi.metricName("name", "description", 
extraTags);
+        Sensor sensor = pmi.addSensor(sensorName);
+        assertEquals(initialMetrics, metrics.metrics().size());
+        sensor.add(metricName, new Rate());
+        sensor.add(metricName, new Max());
+        assertEquals(initialMetrics + 1, metrics.metrics().size());
+
+        assertThrows(IllegalArgumentException.class, () -> 
pmi.addSensor(sensorName));
+
+        pmi.removeSensor(sensorName);
+        assertEquals(initialMetrics, metrics.metrics().size());
+
+        assertThrows(IllegalArgumentException.class, () -> 
pmi.removeSensor(sensorName));
+    }
+
+    @Test
+    void testClose() throws IOException {
+        PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+        String sensorName = "my-sensor";
+        MetricName metricName1 = pmi.metricName("name1", "description", 
extraTags);
+        Sensor sensor = pmi.addSensor(sensorName);
+        sensor.add(metricName1, new Rate());
+        MetricName metricName2 = pmi.metricName("name2", "description", 
extraTags);
+        pmi.addMetric(metricName2, (Measurable) (config, now) -> 1.0);
+
+        assertEquals(initialMetrics + 2, metrics.metrics().size());
+        pmi.close();
+        assertEquals(initialMetrics, metrics.metrics().size());
+    }
+}

Reply via email to