This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 71314739f9b KAFKA-15995: Initial API + make Producer/Consumer plugins
Monitorable (#17511)
71314739f9b is described below
commit 71314739f9b51fa5b443258d68019f58583704eb
Author: Mickael Maison <[email protected]>
AuthorDate: Fri Jan 31 10:40:10 2025 +0100
KAFKA-15995: Initial API + make Producer/Consumer plugins Monitorable
(#17511)
Reviewers: Greg Harris <[email protected]>, Luke Chen
<[email protected]>
---
checkstyle/import-control.xml | 5 +
.../clients/consumer/ConsumerInterceptor.java | 2 +
.../consumer/internals/AsyncKafkaConsumer.java | 10 +-
.../consumer/internals/ClassicKafkaConsumer.java | 10 +-
.../clients/consumer/internals/CompletedFetch.java | 4 +-
.../consumer/internals/ConsumerInterceptors.java | 23 ++--
.../clients/consumer/internals/Deserializers.java | 52 ++++---
.../consumer/internals/ShareCompletedFetch.java | 4 +-
.../consumer/internals/ShareConsumerImpl.java | 8 +-
.../kafka/clients/producer/KafkaProducer.java | 60 +++++----
.../apache/kafka/clients/producer/Partitioner.java | 3 +
.../clients/producer/ProducerInterceptor.java | 2 +
.../producer/internals/ProducerInterceptors.java | 27 ++--
.../org/apache/kafka/common/internals/Plugin.java | 86 ++++++++++++
.../apache/kafka/common/metrics/Monitorable.java | 32 +++++
.../apache/kafka/common/metrics/PluginMetrics.java | 75 +++++++++++
.../metrics/internals/PluginMetricsImpl.java | 113 ++++++++++++++++
.../kafka/common/serialization/Deserializer.java | 3 +-
.../kafka/common/serialization/Serializer.java | 3 +-
.../kafka/clients/consumer/KafkaConsumerTest.java | 81 +++++++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 9 +-
.../consumer/internals/CompletedFetchTest.java | 4 +-
.../internals/ConsumerInterceptorsTest.java | 4 +-
.../consumer/internals/FetchCollectorTest.java | 5 +-
.../internals/FetchRequestManagerTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../consumer/internals/OffsetFetcherTest.java | 2 +-
.../internals/ShareCompletedFetchTest.java | 4 +-
.../internals/ShareConsumeRequestManagerTest.java | 2 +-
.../internals/ShareFetchCollectorTest.java | 2 +-
.../kafka/clients/producer/KafkaProducerTest.java | 109 ++++++++++++++-
.../internals/ProducerInterceptorsTest.java | 6 +-
.../apache/kafka/common/internals/PluginTest.java | 149 +++++++++++++++++++++
.../metrics/internals/PluginMetricsImplTest.java | 120 +++++++++++++++++
34 files changed, 908 insertions(+), 117 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 5129d174223..32c276aa486 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -87,6 +87,11 @@
<allow class="org.apache.kafka.common.record.RecordBatch"
exact-match="true" />
</subpackage>
+ <subpackage name="internals">
+ <allow pkg="org.apache.kafka.common.metrics" />
+ <allow pkg="org.apache.kafka.common.metrics.internals" />
+ </subpackage>
+
<subpackage name="message">
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.kafka.common.protocol" />
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index c04afccd8aa..206e6d04a2c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -39,6 +39,8 @@ import java.util.Map;
* {@link
org.apache.kafka.clients.consumer.KafkaConsumer#poll(java.time.Duration)}.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to
receive cluster metadata once it's available. Please see the class
documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to
<code>interceptor.classes</code>, and <code>class</code> set to the
ConsumerInterceptor class name.
*/
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable
{
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 2c65b44a7a0..69067670c30 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -326,12 +326,12 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList =
configuredConsumerInterceptors(config);
- this.interceptors = new ConsumerInterceptors<>(interceptorList);
- this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList,
metrics);
+ this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners =
ClientUtils.configureClusterResourceListeners(metrics.reporters(),
interceptorList,
- Arrays.asList(deserializers.keyDeserializer,
deserializers.valueDeserializer));
+ Arrays.asList(deserializers.keyDeserializer(),
deserializers.valueDeserializer()));
this.metadata = metadataFactory.build(config, subscriptions,
logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
@@ -494,13 +494,13 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.autoCommitEnabled =
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
this.fetchBuffer = new FetchBuffer(logContext);
this.isolationLevel = IsolationLevel.READ_UNCOMMITTED;
- this.interceptors = new
ConsumerInterceptors<>(Collections.emptyList());
this.time = time;
this.metrics = new Metrics(time);
+ this.interceptors = new
ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.metadata = metadata;
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.defaultApiTimeoutMs =
Duration.ofMillis(config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG));
- this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.clientTelemetryReporter = Optional.empty();
ConsumerMetrics metricsRegistry = new
ConsumerMetrics(CONSUMER_METRIC_GROUP_PREFIX);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 28b82f3b3ad..9a8c637d778 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -179,13 +179,13 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList =
configuredConsumerInterceptors(config);
- this.interceptors = new ConsumerInterceptors<>(interceptorList);
- this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer);
+ this.interceptors = new ConsumerInterceptors<>(interceptorList,
metrics);
+ this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners =
ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
interceptorList,
- Arrays.asList(this.deserializers.keyDeserializer,
this.deserializers.valueDeserializer));
+ Arrays.asList(this.deserializers.keyDeserializer(),
this.deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions,
logContext, clusterResourceListeners);
List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
this.metadata.bootstrap(addresses);
@@ -289,12 +289,12 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.metrics = new Metrics(time);
this.clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
this.groupId =
Optional.ofNullable(config.getString(ConsumerConfig.GROUP_ID_CONFIG));
- this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.isolationLevel = ConsumerUtils.configuredIsolationLevel(config);
this.defaultApiTimeoutMs =
config.getInt(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG);
this.assignors = assignors;
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP_PREFIX);
- this.interceptors = new
ConsumerInterceptors<>(Collections.emptyList());
+ this.interceptors = new
ConsumerInterceptors<>(Collections.emptyList(), metrics);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.retryBackoffMaxMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
this.requestTimeoutMs =
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
index 2cba76588e5..a505de2dc12 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -319,13 +319,13 @@ public class CompletedFetch {
K key;
V value;
try {
- key = keyBytes == null ? null :
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+ key = keyBytes == null ? null :
deserializers.keyDeserializer().deserialize(partition.topic(), headers,
keyBytes);
} catch (RuntimeException e) {
log.error("Key Deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(DeserializationExceptionOrigin.KEY,
partition, timestampType, record, e, headers);
}
try {
- value = valueBytes == null ? null :
deserializers.valueDeserializer.deserialize(partition.topic(), headers,
valueBytes);
+ value = valueBytes == null ? null :
deserializers.valueDeserializer().deserialize(partition.topic(), headers,
valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(DeserializationExceptionOrigin.VALUE,
partition, timestampType, record, e, headers);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
index c56ea1a03e9..c58b60ba0f2 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
@@ -17,10 +17,13 @@
package org.apache.kafka.clients.consumer.internals;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,15 +38,15 @@ import java.util.Map;
*/
public class ConsumerInterceptors<K, V> implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(ConsumerInterceptors.class);
- private final List<ConsumerInterceptor<K, V>> interceptors;
+ private final List<Plugin<ConsumerInterceptor<K, V>>> interceptorPlugins;
- public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
- this.interceptors = interceptors;
+ public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors,
Metrics metrics) {
+ this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics,
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG);
}
/** Returns true if no interceptors are defined. All other methods will be
no-ops in this case. */
public boolean isEmpty() {
- return interceptors.isEmpty();
+ return interceptorPlugins.isEmpty();
}
/**
@@ -62,9 +65,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
*/
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
- for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptRecords = interceptor.onConsume(interceptRecords);
+ interceptRecords =
interceptorPlugin.get().onConsume(interceptRecords);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue
calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);
@@ -83,9 +86,9 @@ public class ConsumerInterceptors<K, V> implements Closeable {
* @param offsets A map of offsets by partition with associated metadata
*/
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
- for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptor.onCommit(offsets);
+ interceptorPlugin.get().onCommit(offsets);
} catch (Exception e) {
// do not propagate interceptor exception, just log
log.warn("Error executing interceptor onCommit callback", e);
@@ -98,9 +101,9 @@ public class ConsumerInterceptors<K, V> implements Closeable
{
*/
@Override
public void close() {
- for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ConsumerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptor.close();
+ interceptorPlugin.close();
} catch (Exception e) {
log.error("Failed to close consumer interceptor ", e);
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
index 5de2a888775..0926c720c0c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Deserializers.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Utils;
@@ -28,44 +30,54 @@ import java.util.concurrent.atomic.AtomicReference;
public class Deserializers<K, V> implements AutoCloseable {
- public final Deserializer<K> keyDeserializer;
- public final Deserializer<V> valueDeserializer;
+ private final Plugin<Deserializer<K>> keyDeserializerPlugin;
+ private final Plugin<Deserializer<V>> valueDeserializerPlugin;
- public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V>
valueDeserializer) {
- this.keyDeserializer = Objects.requireNonNull(keyDeserializer, "Key
deserializer provided to Deserializers should not be null");
- this.valueDeserializer = Objects.requireNonNull(valueDeserializer,
"Value deserializer provided to Deserializers should not be null");
- }
-
- public Deserializers(ConsumerConfig config) {
- this(config, null, null);
+ public Deserializers(Deserializer<K> keyDeserializer, Deserializer<V>
valueDeserializer, Metrics metrics) {
+ this.keyDeserializerPlugin = Plugin.wrapInstance(
+ Objects.requireNonNull(keyDeserializer, "Key deserializer
provided to Deserializers should not be null"),
+ metrics,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+ this.valueDeserializerPlugin = Plugin.wrapInstance(
+ Objects.requireNonNull(valueDeserializer, "Value deserializer
provided to Deserializers should not be null"),
+ metrics,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
}
@SuppressWarnings("unchecked")
- public Deserializers(ConsumerConfig config, Deserializer<K>
keyDeserializer, Deserializer<V> valueDeserializer) {
+ public Deserializers(ConsumerConfig config, Deserializer<K>
keyDeserializer, Deserializer<V> valueDeserializer, Metrics metrics) {
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (keyDeserializer == null) {
- this.keyDeserializer =
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
-
this.keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId)), true);
+ keyDeserializer =
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
+
keyDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId)), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
- this.keyDeserializer = keyDeserializer;
}
+ this.keyDeserializerPlugin = Plugin.wrapInstance(keyDeserializer,
metrics, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
if (valueDeserializer == null) {
- this.valueDeserializer =
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
-
this.valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId)), false);
+ valueDeserializer =
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
+
valueDeserializer.configure(config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId)), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
- this.valueDeserializer = valueDeserializer;
}
+ this.valueDeserializerPlugin = Plugin.wrapInstance(valueDeserializer,
metrics, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+ }
+
+ public Deserializer<K> keyDeserializer() {
+ return keyDeserializerPlugin.get();
+ }
+
+ public Deserializer<V> valueDeserializer() {
+ return valueDeserializerPlugin.get();
}
@Override
public void close() {
AtomicReference<Throwable> firstException = new AtomicReference<>();
- Utils.closeQuietly(keyDeserializer, "key deserializer",
firstException);
- Utils.closeQuietly(valueDeserializer, "value deserializer",
firstException);
+ Utils.closeQuietly(keyDeserializerPlugin, "key deserializer",
firstException);
+ Utils.closeQuietly(valueDeserializerPlugin, "value deserializer",
firstException);
Throwable exception = firstException.get();
if (exception != null) {
@@ -79,8 +91,8 @@ public class Deserializers<K, V> implements AutoCloseable {
@Override
public String toString() {
return "Deserializers{" +
- "keyDeserializer=" + keyDeserializer +
- ", valueDeserializer=" + valueDeserializer +
+ "keyDeserializer=" + keyDeserializerPlugin.get() +
+ ", valueDeserializer=" + valueDeserializerPlugin.get() +
'}';
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 74760beec6d..838416b8428 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -296,13 +296,13 @@ public class ShareCompletedFetch {
K key;
V value;
try {
- key = keyBytes == null ? null :
deserializers.keyDeserializer.deserialize(partition.topic(), headers, keyBytes);
+ key = keyBytes == null ? null :
deserializers.keyDeserializer().deserialize(partition.topic(), headers,
keyBytes);
} catch (RuntimeException e) {
log.error("Key Deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.KEY,
partition.topicPartition(), timestampType, record, e, headers);
}
try {
- value = valueBytes == null ? null :
deserializers.valueDeserializer.deserialize(partition.topic(), headers,
valueBytes);
+ value = valueBytes == null ? null :
deserializers.valueDeserializer().deserialize(partition.topic(), headers,
valueBytes);
} catch (RuntimeException e) {
log.error("Value Deserializers with error: {}", deserializers);
throw
newRecordDeserializationException(RecordDeserializationException.DeserializationExceptionOrigin.VALUE,
partition.topicPartition(), timestampType, record, e, headers);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index de737dde3bf..667f2d00033 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -257,12 +257,12 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.metrics = createMetrics(config, time, reporters);
this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
- this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer);
+ this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.subscriptions = createSubscriptionState(config, logContext);
ClusterResourceListeners clusterResourceListeners =
ClientUtils.configureClusterResourceListeners(
metrics.reporters(),
- Arrays.asList(deserializers.keyDeserializer,
deserializers.valueDeserializer));
+ Arrays.asList(deserializers.keyDeserializer(),
deserializers.valueDeserializer()));
this.metadata = new ConsumerMetadata(config, subscriptions,
logContext, clusterResourceListeners);
final List<InetSocketAddress> addresses =
ClientUtils.parseAndValidateAddresses(config);
metadata.bootstrap(addresses);
@@ -363,7 +363,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.time = time;
this.metrics = new Metrics(time);
this.clientTelemetryReporter = Optional.empty();
- this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer);
+ this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.subscriptions = subscriptions;
this.metadata = metadata;
@@ -462,7 +462,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.metrics = metrics;
this.metadata = metadata;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
- this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer);
+ this.deserializers = new Deserializers<>(keyDeserializer,
valueDeserializer, metrics);
this.currentFetch = ShareFetch.empty();
this.applicationEventHandler = applicationEventHandler;
this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 130cfcb7bfb..7ef51fcd2f3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -59,6 +59,7 @@ import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -249,7 +250,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
// Visible for testing
final Metrics metrics;
private final KafkaProducerMetrics producerMetrics;
- private final Partitioner partitioner;
+ private final Plugin<Partitioner> partitionerPlugin;
private final int maxRequestSize;
private final long totalMemorySize;
private final ProducerMetadata metadata;
@@ -259,8 +260,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private final Compression compression;
private final Sensor errors;
private final Time time;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valueSerializer;
+ private final Plugin<Serializer<K>> keySerializerPlugin;
+ private final Plugin<Serializer<V>> valueSerializerPlugin;
private final ProducerConfig producerConfig;
private final long maxBlockTimeMs;
private final boolean partitionerIgnoreKeys;
@@ -366,29 +367,32 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time,
metricsContext);
this.producerMetrics = new KafkaProducerMetrics(metrics);
- this.partitioner = config.getConfiguredInstance(
- ProducerConfig.PARTITIONER_CLASS_CONFIG,
- Partitioner.class,
- Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId));
+ this.partitionerPlugin = Plugin.wrapInstance(
+ config.getConfiguredInstance(
+ ProducerConfig.PARTITIONER_CLASS_CONFIG,
+ Partitioner.class,
+
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)),
+ metrics,
+ ProducerConfig.PARTITIONER_CLASS_CONFIG);
this.partitionerIgnoreKeys =
config.getBoolean(ProducerConfig.PARTITIONER_IGNORE_KEYS_CONFIG);
long retryBackoffMs =
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
long retryBackoffMaxMs =
config.getLong(ProducerConfig.RETRY_BACKOFF_MAX_MS_CONFIG);
if (keySerializer == null) {
- this.keySerializer =
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-
Serializer.class);
-
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId)), true);
+ keySerializer =
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
+
keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
- this.keySerializer = keySerializer;
}
+ this.keySerializerPlugin = Plugin.wrapInstance(keySerializer,
metrics, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+
if (valueSerializer == null) {
- this.valueSerializer =
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-
Serializer.class);
-
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId)), false);
+ valueSerializer =
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
+
valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
- this.valueSerializer = valueSerializer;
}
+ this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer,
metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+
List<ProducerInterceptor<K, V>> interceptorList =
ClientUtils.configuredInterceptors(config,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
@@ -396,11 +400,11 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
if (interceptors != null)
this.interceptors = interceptors;
else
- this.interceptors = new
ProducerInterceptors<>(interceptorList);
+ this.interceptors = new
ProducerInterceptors<>(interceptorList, metrics);
ClusterResourceListeners clusterResourceListeners =
ClientUtils.configureClusterResourceListeners(
interceptorList,
reporters,
- Arrays.asList(this.keySerializer, this.valueSerializer));
+ Arrays.asList(this.keySerializerPlugin.get(),
this.valueSerializerPlugin.get()));
this.maxRequestSize =
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize =
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compression = configureCompression(config);
@@ -411,7 +415,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.apiVersions = apiVersions;
this.transactionManager = configureTransactionState(config,
logContext);
// There is no need to do work required for adaptive partitioning,
if we use a custom partitioner.
- boolean enableAdaptivePartitioning = partitioner == null &&
+ boolean enableAdaptivePartitioning = partitionerPlugin.get() ==
null &&
config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG);
RecordAccumulator.PartitionerConfig partitionerConfig = new
RecordAccumulator.PartitionerConfig(
enableAdaptivePartitioning,
@@ -485,9 +489,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
this.log = logContext.logger(KafkaProducer.class);
this.metrics = metrics;
this.producerMetrics = new KafkaProducerMetrics(metrics);
- this.partitioner = partitioner;
- this.keySerializer = keySerializer;
- this.valueSerializer = valueSerializer;
+ this.partitionerPlugin = Plugin.wrapInstance(partitioner, metrics,
ProducerConfig.PARTITIONER_CLASS_CONFIG);
+ this.keySerializerPlugin = Plugin.wrapInstance(keySerializer, metrics,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ this.valueSerializerPlugin = Plugin.wrapInstance(valueSerializer,
metrics, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.interceptors = interceptors;
this.maxRequestSize =
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize =
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
@@ -972,7 +976,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
try {
- serializedKey = keySerializer.serialize(record.topic(),
record.headers(), record.key());
+ serializedKey =
keySerializerPlugin.get().serialize(record.topic(), record.headers(),
record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class "
+ record.key().getClass().getName() +
" to class " +
producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
@@ -980,7 +984,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
}
byte[] serializedValue;
try {
- serializedValue = valueSerializer.serialize(record.topic(),
record.headers(), record.value());
+ serializedValue =
valueSerializerPlugin.get().serialize(record.topic(), record.headers(),
record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class
" + record.value().getClass().getName() +
" to class " +
producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName()
+
@@ -1414,9 +1418,9 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
Utils.closeQuietly(interceptors, "producer interceptors",
firstException);
Utils.closeQuietly(producerMetrics, "producer metrics wrapper",
firstException);
Utils.closeQuietly(metrics, "producer metrics", firstException);
- Utils.closeQuietly(keySerializer, "producer keySerializer",
firstException);
- Utils.closeQuietly(valueSerializer, "producer valueSerializer",
firstException);
- Utils.closeQuietly(partitioner, "producer partitioner",
firstException);
+ Utils.closeQuietly(keySerializerPlugin, "producer keySerializer",
firstException);
+ Utils.closeQuietly(valueSerializerPlugin, "producer valueSerializer",
firstException);
+ Utils.closeQuietly(partitionerPlugin, "producer partitioner",
firstException);
clientTelemetryReporter.ifPresent(reporter ->
Utils.closeQuietly(reporter, "producer telemetry reporter", firstException));
AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
Throwable exception = firstException.get();
@@ -1443,8 +1447,8 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
if (record.partition() != null)
return record.partition();
- if (partitioner != null) {
- int customPartition = partitioner.partition(
+ if (partitionerPlugin.get() != null) {
+ int customPartition = partitionerPlugin.get().partition(
record.topic(), record.key(), serializedKey, record.value(),
serializedValue, cluster);
if (customPartition < 0) {
throw new IllegalArgumentException(String.format(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
index 96345d8f8b0..d1d1ad3ac55 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java
@@ -23,6 +23,9 @@ import java.io.Closeable;
/**
* Partitioner Interface
+ * <br/>
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
partitioner to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to
<code>partitioner.class</code>, and <code>class</code> set to the Partitioner
class name.
*/
public interface Partitioner extends Configurable, Closeable {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index 48caf98d44a..5bc4b2c2c85 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -33,6 +33,8 @@ import org.apache.kafka.common.Configurable;
* ProducerInterceptor callbacks may be called from multiple threads.
Interceptor implementation must ensure thread-safety, if needed.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to
receive cluster metadata once it's available. Please see the class
documentation for ClusterResourceListener for more information.
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
interceptor to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to
<code>interceptor.classes</code>, and <code>class</code> set to the
ProducerInterceptor class name.
*/
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable
{
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 75bf8485e47..9936eef7609 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -17,10 +17,13 @@
package org.apache.kafka.clients.producer.internals;
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.Plugin;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
@@ -35,10 +38,10 @@ import java.util.List;
*/
public class ProducerInterceptors<K, V> implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(ProducerInterceptors.class);
- private final List<ProducerInterceptor<K, V>> interceptors;
+ private final List<Plugin<ProducerInterceptor<K, V>>> interceptorPlugins;
- public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
- this.interceptors = interceptors;
+ public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors,
Metrics metrics) {
+ this.interceptorPlugins = Plugin.wrapInstances(interceptors, metrics,
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG);
}
/**
@@ -57,9 +60,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
*/
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
- for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptRecord = interceptor.onSend(interceptRecord);
+ interceptRecord =
interceptorPlugin.get().onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue
calling other interceptors
// be careful not to throw exception from here
@@ -84,9 +87,9 @@ public class ProducerInterceptors<K, V> implements Closeable {
* @param exception The exception thrown during processing of this record.
Null if no error occurred.
*/
public void onAcknowledgement(RecordMetadata metadata, Exception
exception) {
- for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptor.onAcknowledgement(metadata, exception);
+ interceptorPlugin.get().onAcknowledgement(metadata, exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement
callback", e);
@@ -105,15 +108,15 @@ public class ProducerInterceptors<K, V> implements
Closeable {
* @param exception The exception thrown during processing of this record.
*/
public void onSendError(ProducerRecord<K, V> record, TopicPartition
interceptTopicPartition, Exception exception) {
- for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
if (record == null && interceptTopicPartition == null) {
- interceptor.onAcknowledgement(null, exception);
+ interceptorPlugin.get().onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition =
extractTopicPartition(record);
}
- interceptor.onAcknowledgement(new
RecordMetadata(interceptTopicPartition, -1, -1,
+ interceptorPlugin.get().onAcknowledgement(new
RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1),
exception);
}
} catch (Exception e) {
@@ -132,9 +135,9 @@ public class ProducerInterceptors<K, V> implements
Closeable {
*/
@Override
public void close() {
- for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+ for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin :
this.interceptorPlugins) {
try {
- interceptor.close();
+ interceptorPlugin.close();
} catch (Exception e) {
log.error("Failed to close producer interceptor ", e);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
new file mode 100644
index 00000000000..620cd0c07ec
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/internals/Plugin.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.internals.PluginMetricsImpl;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+public class Plugin<T> implements Supplier<T>, AutoCloseable {
+
+ private final T instance;
+ private final Optional<PluginMetricsImpl> pluginMetrics;
+
+ private Plugin(T instance, PluginMetricsImpl pluginMetrics) {
+ this.instance = instance;
+ this.pluginMetrics = Optional.ofNullable(pluginMetrics);
+ }
+
+ public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics,
String key) {
+ return wrapInstance(instance, metrics, () -> tags(key, instance));
+ }
+
+ private static <T> Map<String, String> tags(String key, T instance) {
+ Map<String, String> tags = new LinkedHashMap<>();
+ tags.put("config", key);
+ tags.put("class", instance.getClass().getSimpleName());
+ return tags;
+ }
+
+ public static <T> List<Plugin<T>> wrapInstances(List<T> instances, Metrics
metrics, String key) {
+ List<Plugin<T>> plugins = new ArrayList<>();
+ for (T instance : instances) {
+ plugins.add(wrapInstance(instance, metrics, key));
+ }
+ return plugins;
+ }
+
+ public static <T> Plugin<T> wrapInstance(T instance, Metrics metrics,
Supplier<Map<String, String>> tagsSupplier) {
+ PluginMetricsImpl pluginMetrics = null;
+ if (instance instanceof Monitorable && metrics != null) {
+ pluginMetrics = new PluginMetricsImpl(metrics, tagsSupplier.get());
+ ((Monitorable) instance).withPluginMetrics(pluginMetrics);
+ }
+ return new Plugin<>(instance, pluginMetrics);
+ }
+
+ @Override
+ public T get() {
+ return instance;
+ }
+
+ @Override
+ public void close() throws Exception {
+ AtomicReference<Throwable> firstException = new AtomicReference<>();
+ if (instance instanceof AutoCloseable) {
+ Utils.closeQuietly((AutoCloseable) instance,
instance.getClass().getSimpleName(), firstException);
+ }
+ pluginMetrics.ifPresent(metrics -> Utils.closeQuietly(metrics,
"pluginMetrics", firstException));
+ Throwable throwable = firstException.get();
+ if (throwable != null) throw new KafkaException("failed closing
plugin", throwable);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java
new file mode 100644
index 00000000000..fa5a292bf31
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Monitorable.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+/**
+ * Plugins can implement this interface to register their own metrics.
+ */
+public interface Monitorable {
+
+ /**
+ * Provides a {@link PluginMetrics} instance from the component that
instantiates the plugin.
+ * PluginMetrics can be used by the plugin to register and unregister
metrics
+ * at any point in their lifecycle prior to their close method being
called.
+ * Any metrics registered will be automatically removed when the plugin is
closed.
+ */
+ void withPluginMetrics(PluginMetrics metrics);
+
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java
b/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java
new file mode 100644
index 00000000000..d296234105d
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/PluginMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import org.apache.kafka.common.MetricName;
+
+import java.util.Map;
+
+/**
+ * This allows plugins to register metrics and sensors.
+ * Any metrics registered by the plugin are automatically removed when the
plugin closed.
+ */
+public interface PluginMetrics {
+
+ /**
+ * Create a {@link MetricName} with the given name, description and tags.
The group will be set to "plugins"
+ * Tags to uniquely identify the plugins are automatically added to the
provided tags
+ *
+ * @param name The name of the metric
+ * @param description A human-readable description to include in the metric
+ * @param tags Additional tags for the metric
+ * @throws IllegalArgumentException if any of the tag names collide with
the default tags for the plugin
+ */
+ MetricName metricName(String name, String description, Map<String, String>
tags);
+
+ /**
+ * Add a metric to monitor an object that implements {@link
MetricValueProvider}. This metric won't be associated with any
+ * sensor. This is a way to expose existing values as metrics.
+ *
+ * @param metricName The name of the metric
+ * @param metricValueProvider The metric value provider associated with
this metric
+ * @throws IllegalArgumentException if a metric with same name already
exists
+ */
+ void addMetric(MetricName metricName, MetricValueProvider<?>
metricValueProvider);
+
+ /**
+ * Remove a metric if it exists.
+ *
+ * @param metricName The name of the metric
+ * @throws IllegalArgumentException if a metric with this name does not
exist
+ */
+ void removeMetric(MetricName metricName);
+
+ /**
+ * Create a {@link Sensor} with the given unique name. The name must only
be unique for the plugin, so different
+ * plugins can use the same names.
+ *
+ * @param name The sensor name
+ * @return The sensor
+ * @throws IllegalArgumentException if a sensor with same name already
exists for this plugin
+ */
+ Sensor addSensor(String name);
+
+ /**
+ * Remove a {@link Sensor} and its associated metrics.
+ *
+ * @param name The name of the sensor to be removed
+ * @throws IllegalArgumentException if a sensor with this name does not
exist
+ */
+ void removeSensor(String name);
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
b/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
new file mode 100644
index 00000000000..c109b578962
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/metrics/internals/PluginMetricsImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.MetricValueProvider;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.PluginMetrics;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class PluginMetricsImpl implements PluginMetrics, Closeable {
+
+ private static final String GROUP = "plugins";
+
+ private final Metrics metrics;
+ private final Map<String, String> tags;
+ private final Set<MetricName> metricNames = ConcurrentHashMap.newKeySet();
+ private final Set<String> sensors = ConcurrentHashMap.newKeySet();
+ private volatile boolean closing = false;
+
+ public PluginMetricsImpl(Metrics metrics, Map<String, String> tags) {
+ this.metrics = metrics;
+ this.tags = tags;
+ }
+
+ @Override
+ public MetricName metricName(String name, String description, Map<String,
String> tags) {
+ if (closing) throw new IllegalStateException("This PluginMetrics
instance is closed");
+ for (String tagName : tags.keySet()) {
+ if (this.tags.containsKey(tagName)) {
+ throw new IllegalArgumentException("Cannot use " + tagName + "
as a tag name");
+ }
+ }
+ Map<String, String> metricsTags = new LinkedHashMap<>(this.tags);
+ metricsTags.putAll(tags);
+ return metrics.metricName(name, GROUP, description, metricsTags);
+ }
+
+ @Override
+ public void addMetric(MetricName metricName, MetricValueProvider<?>
metricValueProvider) {
+ if (closing) throw new IllegalStateException("This PluginMetrics
instance is closed");
+ if (metricNames.contains(metricName)) {
+ throw new IllegalArgumentException("Metric " + metricName + "
already exists");
+ }
+ metrics.addMetric(metricName, metricValueProvider);
+ metricNames.add(metricName);
+ }
+
+ @Override
+ public void removeMetric(MetricName metricName) {
+ if (closing) throw new IllegalStateException("This PluginMetrics
instance is closed");
+ if (metricNames.contains(metricName)) {
+ metrics.removeMetric(metricName);
+ metricNames.remove(metricName);
+ } else {
+ throw new IllegalArgumentException("Unknown metric " + metricName);
+ }
+ }
+
+ @Override
+ public Sensor addSensor(String name) {
+ if (closing) throw new IllegalStateException("This PluginMetrics
instance is closed");
+ if (sensors.contains(name)) {
+ throw new IllegalArgumentException("Sensor " + name + " already
exists");
+ }
+ Sensor sensor = metrics.sensor(name);
+ sensors.add(name);
+ return sensor;
+ }
+
+ @Override
+ public void removeSensor(String name) {
+ if (closing) throw new IllegalStateException("This PluginMetrics
instance is closed");
+ if (sensors.contains(name)) {
+ metrics.removeSensor(name);
+ sensors.remove(name);
+ } else {
+ throw new IllegalArgumentException("Unknown sensor " + name);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ closing = true;
+ for (String sensor : sensors) {
+ metrics.removeSensor(sensor);
+ }
+ for (MetricName metricName : metricNames) {
+ metrics.removeMetric(metricName);
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index ad8d2bfe4d4..1997e66aad2 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -29,7 +29,8 @@ import java.util.Map;
* A class that implements this interface is expected to have a constructor
with no parameters.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to
receive cluster metadata once it's available. Please see the class
documentation for ClusterResourceListener for more information.
- *
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
deserializer to register metrics. The following tags are automatically added to
+ * all metrics registered: <code>config</code> set to either
<code>key.deserializer</code> or <code>value.deserializer</code>, and
<code>class</code> set to the Deserializer class name.
* @param <T> Type to be deserialized into.
*/
public interface Deserializer<T> extends Closeable {
diff --git
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 144b5ab945e..03eab512aa7 100644
---
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -27,7 +27,8 @@ import java.util.Map;
* A class that implements this interface is expected to have a constructor
with no parameter.
* <p>
* Implement {@link org.apache.kafka.common.ClusterResourceListener} to
receive cluster metadata once it's available. Please see the class
documentation for ClusterResourceListener for more information.
- *
+ * Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the
serializer to register metrics. The following tags ae automatically added to
+ * all metrics registered: <code>config</code> set to either
<code>key.serializer</code> or <code>value.serializer</code>, and
<code>class</code> set to the Serializer class name.
* @param <T> Type to be serialized from.
*/
public interface Serializer<T> extends Closeable {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 2749563df27..c2a3a4951e7 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -66,6 +66,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
@@ -101,6 +103,7 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
+import org.apache.kafka.test.MockDeserializer;
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.TestUtils;
@@ -3685,4 +3688,82 @@ public void
testClosingConsumerUnregistersConsumerMetrics(GroupProtocol groupPro
CLIENT_IDS.add(configs.get(ConsumerConfig.CLIENT_ID_CONFIG).toString());
}
}
+
+ @ParameterizedTest
+ @EnumSource(value = GroupProtocol.class)
+ void testMonitorablePlugins(GroupProtocol groupProtocol) {
+ try {
+ String clientId = "testMonitorablePlugins";
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+ configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
MonitorableDeserializer.class.getName());
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
MonitorableDeserializer.class.getName());
+ configs.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
MonitorableInterceptor.class.getName());
+
+ KafkaConsumer<String, String> consumer = new
KafkaConsumer<>(configs);
+ Map<MetricName, ? extends Metric> metrics = consumer.metrics();
+
+ MetricName expectedKeyDeserializerMetric = expectedMetricName(
+ clientId,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ MonitorableDeserializer.class);
+ assertTrue(metrics.containsKey(expectedKeyDeserializerMetric));
+ assertEquals(VALUE,
metrics.get(expectedKeyDeserializerMetric).metricValue());
+
+ MetricName expectedValueDeserializerMetric = expectedMetricName(
+ clientId,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ MonitorableDeserializer.class);
+ assertTrue(metrics.containsKey(expectedValueDeserializerMetric));
+ assertEquals(VALUE,
metrics.get(expectedValueDeserializerMetric).metricValue());
+
+ MetricName expectedInterceptorMetric = expectedMetricName(
+ clientId,
+ ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ MonitorableInterceptor.class);
+ assertTrue(metrics.containsKey(expectedInterceptorMetric));
+ assertEquals(VALUE,
metrics.get(expectedInterceptorMetric).metricValue());
+
+ consumer.close(Duration.ZERO);
+ metrics = consumer.metrics();
+ assertFalse(metrics.containsKey(expectedKeyDeserializerMetric));
+ assertFalse(metrics.containsKey(expectedValueDeserializerMetric));
+ assertFalse(metrics.containsKey(expectedInterceptorMetric));
+ } finally {
+ MockConsumerInterceptor.resetCounters();
+ }
+ }
+
+ private MetricName expectedMetricName(String clientId, String config,
Class<?> clazz) {
+ Map<String, String> expectedTags = new LinkedHashMap<>();
+ expectedTags.put("client-id", clientId);
+ expectedTags.put("config", config);
+ expectedTags.put("class", clazz.getSimpleName());
+ expectedTags.putAll(TAGS);
+ return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
+ }
+
+ private static final String NAME = "name";
+ private static final String DESCRIPTION = "description";
+ private static final Map<String, String> TAGS =
Collections.singletonMap("k", "v");
+ private static final double VALUE = 123.0;
+
+ public static class MonitorableDeserializer extends MockDeserializer
implements Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+ metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+ }
+ }
+
+ public static class MonitorableInterceptor extends MockConsumerInterceptor
implements Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+ metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 819365e9712..1fb8daae00d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -168,6 +168,7 @@ public class AsyncKafkaConsumerTest {
private AsyncKafkaConsumer<String, String> consumer = null;
private Time time = new MockTime(0);
+ private final Metrics metrics = new Metrics();
private final FetchCollector<String, String> fetchCollector =
mock(FetchCollector.class);
private final ApplicationEventHandler applicationEventHandler =
mock(ApplicationEventHandler.class);
private final ConsumerMetadata metadata = mock(ConsumerMetadata.class);
@@ -248,7 +249,7 @@ public class AsyncKafkaConsumerTest {
return new AsyncKafkaConsumer<>(
new LogContext(),
clientId,
- new Deserializers<>(new StringDeserializer(), new
StringDeserializer()),
+ new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), metrics),
fetchBuffer,
fetchCollector,
interceptors,
@@ -257,7 +258,7 @@ public class AsyncKafkaConsumerTest {
backgroundEventQueue,
backgroundEventReaper,
rebalanceListenerInvoker,
- new Metrics(),
+ metrics,
subscriptions,
metadata,
retryBackoffMs,
@@ -671,7 +672,7 @@ public class AsyncKafkaConsumerTest {
consumer = spy(newConsumer(
mock(FetchBuffer.class),
- new ConsumerInterceptors<>(Collections.emptyList()),
+ new ConsumerInterceptors<>(Collections.emptyList(), metrics),
invoker,
subscriptions,
"group-id",
@@ -1543,7 +1544,7 @@ public class AsyncKafkaConsumerTest {
SubscriptionState subscriptions = new SubscriptionState(new
LogContext(), AutoOffsetResetStrategy.NONE);
consumer = newConsumer(
mock(FetchBuffer.class),
- new ConsumerInterceptors<>(Collections.emptyList()),
+ new ConsumerInterceptors<>(Collections.emptyList(), metrics),
mock(ConsumerRebalanceListenerInvoker.class),
subscriptions,
"group-id",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
index e12b0121fd4..b804b6b8ae2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -232,11 +232,11 @@ public class CompletedFetchTest {
}
private static Deserializers<UUID, UUID> newUuidDeserializers() {
- return new Deserializers<>(new UUIDDeserializer(), new
UUIDDeserializer());
+ return new Deserializers<>(new UUIDDeserializer(), new
UUIDDeserializer(), null);
}
private static Deserializers<String, String> newStringDeserializers() {
- return new Deserializers<>(new StringDeserializer(), new
StringDeserializer());
+ return new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), null);
}
private static FetchConfig newFetchConfig(IsolationLevel isolationLevel,
boolean checkCrcs) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
index 5a7d85369ea..e6b091e2fd2 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -116,7 +116,7 @@ public class ConsumerInterceptorsTest {
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new
FilterConsumerInterceptor<>(filterPartition2);
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
- ConsumerInterceptors<Integer, Integer> interceptors = new
ConsumerInterceptors<>(interceptorList);
+ ConsumerInterceptors<Integer, Integer> interceptors = new
ConsumerInterceptors<>(interceptorList, null);
// verify that onConsumer modifies ConsumerRecords
Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records =
new HashMap<>();
@@ -177,7 +177,7 @@ public class ConsumerInterceptorsTest {
FilterConsumerInterceptor<Integer, Integer> interceptor2 = new
FilterConsumerInterceptor<>(filterPartition2);
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
- ConsumerInterceptors<Integer, Integer> interceptors = new
ConsumerInterceptors<>(interceptorList);
+ ConsumerInterceptors<Integer, Integer> interceptors = new
ConsumerInterceptors<>(interceptorList, null);
// verify that onCommit is called for all interceptors in the chain
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
index 68b9ecb528b..d7d53537ae6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchCollectorTest.java
@@ -712,7 +712,7 @@ public class FetchCollectorTest {
mock(ConsumerMetadata.class),
subscriptions,
new FetchConfig(new ConsumerConfig(consumerProps)),
- new Deserializers<>(new StringDeserializer(), new
StringDeserializer()),
+ new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), null),
mock(FetchMetricsManager.class),
new MockTime()
);
@@ -741,12 +741,11 @@ public class FetchCollectorTest {
Properties p = consumerProperties(maxPollRecords);
ConsumerConfig config = new ConsumerConfig(p);
- deserializers = new Deserializers<>(new StringDeserializer(), new
StringDeserializer());
-
subscriptions = createSubscriptionState(config, logContext);
fetchConfig = createFetchConfig(config, isolationLevel);
Metrics metrics = createMetrics(config, time);
metricsManager = createFetchMetricsManager(metrics);
+ deserializers = new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), metrics);
metadata = new ConsumerMetadata(
0,
1000,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index d6f19f4bf58..0ec0633b78b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -3936,7 +3936,7 @@ public class FetchRequestManagerTest {
SubscriptionState subscriptionState,
LogContext logContext) {
buildDependencies(metricConfig, metadataExpireMs, subscriptionState,
logContext);
- Deserializers<K, V> deserializers = new
Deserializers<>(keyDeserializer, valueDeserializer);
+ Deserializers<K, V> deserializers = new
Deserializers<>(keyDeserializer, valueDeserializer, metrics);
FetchConfig fetchConfig = new FetchConfig(
minBytes,
maxBytes,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index bc82aeae9fa..b24475300c8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2824,7 +2824,7 @@ public class FetcherTest {
isolationLevel,
apiVersions);
- Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new
ByteArrayDeserializer(), new ByteArrayDeserializer());
+ Deserializers<byte[], byte[]> deserializers = new Deserializers<>(new
ByteArrayDeserializer(), new ByteArrayDeserializer(), metrics);
FetchConfig fetchConfig = new FetchConfig(
minBytes,
maxBytes,
@@ -3858,7 +3858,7 @@ public class FetcherTest {
metadata,
subscriptionState,
fetchConfig,
- new Deserializers<>(keyDeserializer, valueDeserializer),
+ new Deserializers<>(keyDeserializer, valueDeserializer,
metrics),
metricsManager,
time,
apiVersions));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index 96d6e5e0b3d..3b7fe70ea4c 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -1249,7 +1249,7 @@ public class OffsetFetcherTest {
metadata,
subscriptions,
fetchConfig,
- new Deserializers<>(new ByteArrayDeserializer(), new
ByteArrayDeserializer()),
+ new Deserializers<>(new ByteArrayDeserializer(), new
ByteArrayDeserializer(), metrics),
new FetchMetricsManager(metrics, metricsRegistry),
time,
apiVersions);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index b117af177b1..3b488d3d002 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -374,11 +374,11 @@ public class ShareCompletedFetchTest {
}
private static Deserializers<UUID, UUID> newUuidDeserializers() {
- return new Deserializers<>(new UUIDDeserializer(), new
UUIDDeserializer());
+ return new Deserializers<>(new UUIDDeserializer(), new
UUIDDeserializer(), null);
}
private static Deserializers<String, String> newStringDeserializers() {
- return new Deserializers<>(new StringDeserializer(), new
StringDeserializer());
+ return new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), null);
}
private Records newRecords(long baseOffset, int count) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 0e74a9768a8..efb190506ea 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -1855,7 +1855,7 @@ public class ShareConsumeRequestManagerTest {
SubscriptionState
subscriptionState,
LogContext logContext) {
buildDependencies(metricConfig, subscriptionState, logContext);
- Deserializers<K, V> deserializers = new
Deserializers<>(keyDeserializer, valueDeserializer);
+ Deserializers<K, V> deserializers = new
Deserializers<>(keyDeserializer, valueDeserializer, metrics);
int maxWaitMs = 0;
int maxBytes = Integer.MAX_VALUE;
int fetchSize = 1000;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
index 893840de4c6..eb63c94673e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareFetchCollectorTest.java
@@ -236,8 +236,8 @@ public class ShareFetchCollectorTest {
ConsumerConfig config = new ConsumerConfig(p);
- deserializers = new Deserializers<>(new StringDeserializer(), new
StringDeserializer());
Metrics metrics = createMetrics(config, Time.SYSTEM);
+ deserializers = new Deserializers<>(new StringDeserializer(), new
StringDeserializer(), metrics);
ShareFetchMetricsManager shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
Set<TopicPartition> partitionSet = new HashSet<>();
partitionSet.add(topicAPartition0.topicPartition());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 779a42656a9..3af4a7185cc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -60,6 +60,8 @@ import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
@@ -113,6 +115,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -1456,7 +1459,7 @@ public class KafkaProducerTest {
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(NODE.idString(), nodeApiVersions);
- ProducerInterceptors<String, String> interceptor = new
ProducerInterceptors<>(Collections.emptyList());
+ ProducerInterceptors<String, String> interceptor = new
ProducerInterceptors<>(Collections.emptyList(), null);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some-txn", NODE));
client.prepareResponse(initProducerIdResponse(1L, (short) 5,
Errors.NONE));
@@ -1730,7 +1733,7 @@ public class KafkaProducerTest {
try (KafkaProducer<String, String> producer = new KafkaProducer<>(
new ProducerConfig(properties), new StringSerializer(), new
StringSerializer(), metadata, client,
- new ProducerInterceptors<>(Collections.emptyList()), apiVersions,
time)) {
+ new ProducerInterceptors<>(Collections.emptyList(), null),
apiVersions, time)) {
producer.initTransactions();
producer.beginTransaction();
producer.sendOffsetsToTransaction(Collections.singletonMap(
@@ -1782,7 +1785,7 @@ public class KafkaProducerTest {
ApiVersions apiVersions = new ApiVersions();
apiVersions.update(NODE.idString(), nodeApiVersions);
- ProducerInterceptors<String, String> interceptor = new
ProducerInterceptors<>(Collections.emptyList());
+ ProducerInterceptors<String, String> interceptor = new
ProducerInterceptors<>(Collections.emptyList(), null);
client.prepareResponse(FindCoordinatorResponse.prepareResponse(Errors.NONE,
"some-txn", NODE));
client.prepareResponse(initProducerIdResponse(1L, (short) 5,
Errors.NONE));
@@ -2310,7 +2313,7 @@ public class KafkaProducerTest {
String invalidTopicName = "topic abc"; // Invalid topic name due to
space
ProducerInterceptors<String, String> producerInterceptors =
- new ProducerInterceptors<>(Collections.singletonList(new
MockProducerInterceptor()));
+ new ProducerInterceptors<>(Collections.singletonList(new
MockProducerInterceptor()), null);
try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
producerMetadata, client, producerInterceptors, time)) {
@@ -2605,7 +2608,7 @@ public class KafkaProducerTest {
ProducerConfig producerConfig = new ProducerConfig(
ProducerConfig.appendSerializerToConfig(configs, serializer,
serializer));
- ProducerInterceptors<T, T> interceptors = new
ProducerInterceptors<>(this.interceptors);
+ ProducerInterceptors<T, T> interceptors = new
ProducerInterceptors<>(this.interceptors, metrics);
return new KafkaProducer<>(
producerConfig,
@@ -2761,4 +2764,100 @@ public class KafkaProducerTest {
KafkaMetric streamClientMetricTwo = new KafkaMetric(lock,
metricNameTwo, (Measurable) (m, now) -> 2.0, metricConfig, Time.SYSTEM);
return Map.of(metricNameOne, streamClientMetricOne, metricNameTwo,
streamClientMetricTwo);
}
+
+ @Test
+ void testMonitorablePlugins() {
+ try {
+ String clientId = "testMonitorablePlugins";
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
MonitorableSerializer.class.getName());
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
MonitorableSerializer.class.getName());
+ configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
MonitorablePartitioner.class.getName());
+ configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
MonitorableInterceptor.class.getName());
+ configs.put(MockProducerInterceptor.APPEND_STRING_PROP, "");
+
+ KafkaProducer<String, String> producer = new
KafkaProducer<>(configs);
+ Map<MetricName, ? extends Metric> metrics = producer.metrics();
+
+ MetricName expectedKeySerializerMetric = expectedMetricName(
+ clientId,
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ MonitorableSerializer.class);
+ assertTrue(metrics.containsKey(expectedKeySerializerMetric));
+ assertEquals(VALUE,
metrics.get(expectedKeySerializerMetric).metricValue());
+
+ MetricName expectedValueSerializerMetric = expectedMetricName(
+ clientId,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ MonitorableSerializer.class);
+ assertTrue(metrics.containsKey(expectedValueSerializerMetric));
+ assertEquals(VALUE,
metrics.get(expectedValueSerializerMetric).metricValue());
+
+ MetricName expectedPartitionerMetric = expectedMetricName(
+ clientId,
+ ProducerConfig.PARTITIONER_CLASS_CONFIG,
+ MonitorablePartitioner.class);
+ assertTrue(metrics.containsKey(expectedPartitionerMetric));
+ assertEquals(VALUE,
metrics.get(expectedPartitionerMetric).metricValue());
+
+ MetricName expectedInterceptorMetric = expectedMetricName(
+ clientId,
+ ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+ MonitorableInterceptor.class);
+ assertTrue(metrics.containsKey(expectedInterceptorMetric));
+ assertEquals(VALUE,
metrics.get(expectedInterceptorMetric).metricValue());
+
+ producer.close();
+ metrics = producer.metrics();
+ assertFalse(metrics.containsKey(expectedKeySerializerMetric));
+ assertFalse(metrics.containsKey(expectedValueSerializerMetric));
+ assertFalse(metrics.containsKey(expectedPartitionerMetric));
+ assertFalse(metrics.containsKey(expectedInterceptorMetric));
+ } finally {
+ MockProducerInterceptor.resetCounters();
+ }
+ }
+
+ private MetricName expectedMetricName(String clientId, String config,
Class<?> clazz) {
+ Map<String, String> expectedTags = new LinkedHashMap<>();
+ expectedTags.put("client-id", clientId);
+ expectedTags.put("config", config);
+ expectedTags.put("class", clazz.getSimpleName());
+ expectedTags.putAll(TAGS);
+ return new MetricName(NAME, "plugins", DESCRIPTION, expectedTags);
+ }
+
+ private static final String NAME = "name";
+ private static final String DESCRIPTION = "description";
+ private static final Map<String, String> TAGS =
Collections.singletonMap("k", "v");
+ private static final double VALUE = 123.0;
+
+ public static class MonitorableSerializer extends MockSerializer
implements Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+ metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+ }
+ }
+
+ public static class MonitorablePartitioner extends MockPartitioner
implements Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+ metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+ }
+ }
+
+ public static class MonitorableInterceptor extends MockProducerInterceptor
implements Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ MetricName name = metrics.metricName(NAME, DESCRIPTION, TAGS);
+ metrics.addMetric(name, (Measurable) (config, now) -> VALUE);
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 13d4957a78e..853b27b2551 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -104,7 +104,7 @@ public class ProducerInterceptorsTest {
AppendProducerInterceptor interceptor2 = new
AppendProducerInterceptor("Two");
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
- ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList);
+ ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList, null);
// verify that onSend() mutates the record as expected
ProducerRecord<Integer, String> interceptedRecord =
interceptors.onSend(producerRecord);
@@ -142,7 +142,7 @@ public class ProducerInterceptorsTest {
AppendProducerInterceptor interceptor2 = new
AppendProducerInterceptor("Two");
interceptorList.add(interceptor1);
interceptorList.add(interceptor2);
- ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList);
+ ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList, null);
// verify onAck is called on all interceptors
RecordMetadata meta = new RecordMetadata(tp, 0, 0, 0, 0, 0);
@@ -166,7 +166,7 @@ public class ProducerInterceptorsTest {
List<ProducerInterceptor<Integer, String>> interceptorList = new
ArrayList<>();
AppendProducerInterceptor interceptor1 = new
AppendProducerInterceptor("One");
interceptorList.add(interceptor1);
- ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList);
+ ProducerInterceptors<Integer, String> interceptors = new
ProducerInterceptors<>(interceptorList, null);
// verify that metadata contains both topic and partition
interceptors.onSendError(producerRecord,
diff --git
a/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java
b/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java
new file mode 100644
index 00000000000..d414d77bdba
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/internals/PluginTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Monitorable;
+import org.apache.kafka.common.metrics.PluginMetrics;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class PluginTest {
+
+ private static final String CONFIG = "some.config";
+ private static final Metrics METRICS = new Metrics();
+
+ static class SomePlugin implements Closeable {
+
+ PluginMetrics pluginMetrics;
+ boolean closed;
+ boolean throwOnClose = false;
+
+ @Override
+ public void close() throws IOException {
+ if (throwOnClose) throw new RuntimeException("throw on close");
+ closed = true;
+ }
+ }
+
+ static class SomeMonitorablePlugin extends SomePlugin implements
Monitorable {
+
+ @Override
+ public void withPluginMetrics(PluginMetrics metrics) {
+ pluginMetrics = metrics;
+ }
+ }
+
+ @Test
+ void testWrapInstance() throws Exception {
+ SomeMonitorablePlugin someMonitorablePlugin = new
SomeMonitorablePlugin();
+ Plugin<SomeMonitorablePlugin> pluginMonitorable =
Plugin.wrapInstance(someMonitorablePlugin, METRICS, CONFIG);
+ checkPlugin(pluginMonitorable, someMonitorablePlugin, true);
+
+ someMonitorablePlugin = new SomeMonitorablePlugin();
+ assertFalse(someMonitorablePlugin.closed);
+ pluginMonitorable = Plugin.wrapInstance(someMonitorablePlugin, null,
CONFIG);
+ checkPlugin(pluginMonitorable, someMonitorablePlugin, false);
+
+ SomePlugin somePlugin = new SomePlugin();
+ assertFalse(somePlugin.closed);
+ Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, null,
CONFIG);
+ assertSame(somePlugin, plugin.get());
+ assertNull(somePlugin.pluginMetrics);
+ plugin.close();
+ assertTrue(somePlugin.closed);
+ }
+
+ @Test
+ void testWrapInstances() throws Exception {
+ List<SomeMonitorablePlugin> someMonitorablePlugins = Arrays.asList(new
SomeMonitorablePlugin(), new SomeMonitorablePlugin());
+ List<Plugin<SomeMonitorablePlugin>> pluginsMonitorable =
Plugin.wrapInstances(someMonitorablePlugins, METRICS, CONFIG);
+ assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
+ for (int i = 0; i < pluginsMonitorable.size(); i++) {
+ Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
+ SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
+ checkPlugin(plugin, somePlugin, true);
+ }
+
+ someMonitorablePlugins = Arrays.asList(new SomeMonitorablePlugin(),
new SomeMonitorablePlugin());
+ pluginsMonitorable = Plugin.wrapInstances(someMonitorablePlugins,
null, CONFIG);
+ assertEquals(someMonitorablePlugins.size(), pluginsMonitorable.size());
+ for (int i = 0; i < pluginsMonitorable.size(); i++) {
+ Plugin<SomeMonitorablePlugin> plugin = pluginsMonitorable.get(i);
+ SomeMonitorablePlugin somePlugin = someMonitorablePlugins.get(i);
+ checkPlugin(plugin, somePlugin, false);
+ }
+
+ List<SomePlugin> somePlugins = Arrays.asList(new SomePlugin(), new
SomePlugin());
+ List<Plugin<SomePlugin>> plugins = Plugin.wrapInstances(somePlugins,
METRICS, CONFIG);
+ assertEquals(somePlugins.size(), plugins.size());
+ for (int i = 0; i < plugins.size(); i++) {
+ Plugin<SomePlugin> plugin = plugins.get(i);
+ SomePlugin somePlugin = somePlugins.get(i);
+ assertSame(somePlugin, plugin.get());
+ assertNull(somePlugin.pluginMetrics);
+ plugin.close();
+ assertTrue(somePlugin.closed);
+ }
+ }
+
+ @Test
+ public void testCloseThrows() {
+ SomePlugin somePlugin = new SomePlugin();
+ somePlugin.throwOnClose = true;
+ Plugin<SomePlugin> plugin = Plugin.wrapInstance(somePlugin, METRICS,
CONFIG);
+ assertThrows(KafkaException.class, plugin::close);
+ }
+
+ @Test
+ public void testUsePluginMetricsAfterClose() throws Exception {
+ Plugin<SomeMonitorablePlugin> plugin = Plugin.wrapInstance(new
SomeMonitorablePlugin(), METRICS, CONFIG);
+ PluginMetrics pluginMetrics = plugin.get().pluginMetrics;
+ plugin.close();
+ assertThrows(IllegalStateException.class, () ->
pluginMetrics.metricName("", "", Collections.emptyMap()));
+ assertThrows(IllegalStateException.class, () ->
pluginMetrics.addMetric(null, null));
+ assertThrows(IllegalStateException.class, () ->
pluginMetrics.removeMetric(null));
+ assertThrows(IllegalStateException.class, () ->
pluginMetrics.addSensor(""));
+ assertThrows(IllegalStateException.class, () ->
pluginMetrics.removeSensor(""));
+ }
+
+ private void checkPlugin(Plugin<SomeMonitorablePlugin> plugin,
SomeMonitorablePlugin instance, boolean metricsSet) throws Exception {
+ assertSame(instance, plugin.get());
+ if (metricsSet) {
+ assertNotNull(instance.pluginMetrics);
+ } else {
+ assertNull(instance.pluginMetrics);
+ }
+ plugin.close();
+ assertTrue(instance.closed);
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
b/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
new file mode 100644
index 00000000000..0ff349e361d
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/metrics/internals/PluginMetricsImplTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Rate;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class PluginMetricsImplTest {
+
+ private final Map<String, String> extraTags =
Collections.singletonMap("my-tag", "my-value");
+ private Map<String, String> tags;
+ private Metrics metrics;
+ private int initialMetrics;
+
+ @BeforeEach
+ void setup() {
+ metrics = new Metrics();
+ initialMetrics = metrics.metrics().size();
+ tags = new LinkedHashMap<>();
+ tags.put("k1", "v1");
+ tags.put("k2", "v2");
+ }
+
+ @Test
+ void testMetricName() {
+ PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+ MetricName metricName = pmi.metricName("name", "description",
extraTags);
+ assertEquals("name", metricName.name());
+ assertEquals("plugins", metricName.group());
+ assertEquals("description", metricName.description());
+ Map<String, String> expectedTags = new LinkedHashMap<>(tags);
+ expectedTags.putAll(extraTags);
+ assertEquals(expectedTags, metricName.tags());
+ }
+
+ @Test
+ void testDuplicateTagName() {
+ PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+ assertThrows(IllegalArgumentException.class,
+ () -> pmi.metricName("name", "description",
Collections.singletonMap("k1", "value")));
+ }
+
+ @Test
+ void testAddRemoveMetrics() {
+ PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+ MetricName metricName = pmi.metricName("name", "description",
extraTags);
+ pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0);
+ assertEquals(initialMetrics + 1, metrics.metrics().size());
+
+ assertThrows(IllegalArgumentException.class, () ->
pmi.addMetric(metricName, (Measurable) (config, now) -> 0.0));
+
+ pmi.removeMetric(metricName);
+ assertEquals(initialMetrics, metrics.metrics().size());
+
+ assertThrows(IllegalArgumentException.class, () ->
pmi.removeMetric(metricName));
+ }
+
+ @Test
+ void testAddRemoveSensor() {
+ PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+ String sensorName = "my-sensor";
+ MetricName metricName = pmi.metricName("name", "description",
extraTags);
+ Sensor sensor = pmi.addSensor(sensorName);
+ assertEquals(initialMetrics, metrics.metrics().size());
+ sensor.add(metricName, new Rate());
+ sensor.add(metricName, new Max());
+ assertEquals(initialMetrics + 1, metrics.metrics().size());
+
+ assertThrows(IllegalArgumentException.class, () ->
pmi.addSensor(sensorName));
+
+ pmi.removeSensor(sensorName);
+ assertEquals(initialMetrics, metrics.metrics().size());
+
+ assertThrows(IllegalArgumentException.class, () ->
pmi.removeSensor(sensorName));
+ }
+
+ @Test
+ void testClose() throws IOException {
+ PluginMetricsImpl pmi = new PluginMetricsImpl(metrics, tags);
+ String sensorName = "my-sensor";
+ MetricName metricName1 = pmi.metricName("name1", "description",
extraTags);
+ Sensor sensor = pmi.addSensor(sensorName);
+ sensor.add(metricName1, new Rate());
+ MetricName metricName2 = pmi.metricName("name2", "description",
extraTags);
+ pmi.addMetric(metricName2, (Measurable) (config, now) -> 1.0);
+
+ assertEquals(initialMetrics + 2, metrics.metrics().size());
+ pmi.close();
+ assertEquals(initialMetrics, metrics.metrics().size());
+ }
+}