This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 770d64d2ccb KAFKA-16143: New JMX metrics for AsyncKafkaConsumer
(#17199)
770d64d2ccb is described below
commit 770d64d2ccbc02ba8579c6f50b2901a3f2245928
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Dec 13 20:20:27 2024 +0800
KAFKA-16143: New JMX metrics for AsyncKafkaConsumer (#17199)
Reviewers: Andrew Schofield <[email protected]>, Kirk True
<[email protected]>, Lianet Magrans <[email protected]>
---
.../consumer/internals/AsyncKafkaConsumer.java | 84 ++++---
.../consumer/internals/ConsumerNetworkThread.java | 22 +-
.../clients/consumer/internals/ConsumerUtils.java | 1 +
.../consumer/internals/NetworkClientDelegate.java | 31 ++-
.../consumer/internals/ShareConsumerImpl.java | 26 +-
.../internals/events/ApplicationEvent.java | 16 +-
.../internals/events/ApplicationEventHandler.java | 13 +-
.../consumer/internals/events/BackgroundEvent.java | 16 +-
.../internals/events/BackgroundEventHandler.java | 30 ++-
.../internals/events/CompletableEventReaper.java | 24 +-
.../internals/metrics/AsyncConsumerMetrics.java | 262 +++++++++++++++++++++
.../internals/ApplicationEventHandlerTest.java | 73 ++++++
.../consumer/internals/AsyncKafkaConsumerTest.java | 33 ++-
.../internals/BackgroundEventHandlerTest.java | 65 +++++
.../internals/ConsumerMembershipManagerTest.java | 3 +-
.../internals/ConsumerNetworkThreadTest.java | 99 +++++++-
.../internals/FetchRequestManagerTest.java | 3 +-
.../internals/NetworkClientDelegateTest.java | 53 ++++-
.../internals/ShareConsumeRequestManagerTest.java | 15 +-
.../consumer/internals/ShareConsumerImplTest.java | 2 +-
.../events/CompletableEventReaperTest.java | 16 +-
.../metrics/AsyncConsumerMetricsTest.java | 237 +++++++++++++++++++
22 files changed, 1036 insertions(+), 88 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 5bbabe61a6c..ff679e5542d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -69,7 +69,7 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscr
import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
-import
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
@@ -109,7 +109,6 @@ import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -215,10 +214,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final AtomicReference<Optional<ConsumerGroupMetadata>>
groupMetadata = new AtomicReference<>(Optional.empty());
- private final KafkaConsumerMetrics kafkaConsumerMetrics;
+ private final AsyncConsumerMetrics kafkaConsumerMetrics;
private Logger log;
private final String clientId;
private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+ private final BackgroundEventHandler backgroundEventHandler;
private final BackgroundEventProcessor backgroundEventProcessor;
private final CompletableEventReaper backgroundEventReaper;
private final Deserializers<K, V> deserializers;
@@ -320,6 +320,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
+ this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
List<ConsumerInterceptor<K, V>> interceptorList =
configuredConsumerInterceptors(config);
@@ -339,7 +340,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue,
+ time,
+ kafkaConsumerMetrics
+ );
// This FetchBuffer is shared between the application and network
threads.
this.fetchBuffer = new FetchBuffer(logContext);
@@ -352,7 +357,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
- false);
+ false,
+ kafkaConsumerMetrics
+ );
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
this.groupMetadata.set(initializeGroupMetadata(config,
groupRebalanceConfig));
final Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(time,
@@ -382,7 +389,9 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
- requestManagersSupplier);
+ requestManagersSupplier,
+ kafkaConsumerMetrics
+ );
this.rebalanceListenerInvoker = new
ConsumerRebalanceListenerInvoker(
logContext,
@@ -402,8 +411,6 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
fetchMetricsManager,
time);
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
CONSUMER_METRIC_GROUP_PREFIX);
-
if (groupMetadata.get().isPresent() &&
GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) ==
GroupProtocol.CONSUMER) {
config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); //
Used by background thread
@@ -460,10 +467,15 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
this.deserializers = deserializers;
this.applicationEventHandler = applicationEventHandler;
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
"consumer");
+ this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.clientTelemetryReporter = Optional.empty();
this.autoCommitEnabled = autoCommitEnabled;
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue,
+ time,
+ kafkaConsumerMetrics
+ );
}
AsyncKafkaConsumer(LogContext logContext,
@@ -498,7 +510,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
deserializers,
fetchMetricsManager,
time);
- this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics,
"consumer");
+ this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
@@ -509,7 +521,11 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
this.backgroundEventQueue = new LinkedBlockingQueue<>();
- BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ this.backgroundEventHandler = new BackgroundEventHandler(
+ backgroundEventQueue,
+ time,
+ kafkaConsumerMetrics
+ );
this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
logContext,
subscriptions,
@@ -524,7 +540,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
client,
metadata,
backgroundEventHandler,
- false
+ false,
+ kafkaConsumerMetrics
);
this.offsetCommitCallbackInvoker = new
OffsetCommitCallbackInvoker(interceptors);
Supplier<RequestManagers> requestManagersSupplier =
RequestManagers.supplier(
@@ -556,7 +573,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
- requestManagersSupplier);
+ requestManagersSupplier,
+ kafkaConsumerMetrics);
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
}
@@ -571,7 +589,8 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate>
networkClientDelegateSupplier,
- final Supplier<RequestManagers> requestManagersSupplier
+ final Supplier<RequestManagers> requestManagersSupplier,
+ final AsyncConsumerMetrics asyncConsumerMetrics
);
}
@@ -1941,25 +1960,30 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor
will take a reference to the first
* error, continue to process the remaining events, and then throw the
first error that occurred.
+ *
+ * Visible for testing.
*/
- private boolean processBackgroundEvents() {
+ boolean processBackgroundEvents() {
AtomicReference<KafkaException> firstError = new AtomicReference<>();
- LinkedList<BackgroundEvent> events = new LinkedList<>();
- backgroundEventQueue.drainTo(events);
-
- for (BackgroundEvent event : events) {
- try {
- if (event instanceof CompletableEvent)
- backgroundEventReaper.add((CompletableEvent<?>) event);
-
- backgroundEventProcessor.process(event);
- } catch (Throwable t) {
- KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
- if (!firstError.compareAndSet(null, e))
- log.warn("An error occurred when processing the background
event: {}", e.getMessage(), e);
+ List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
+ if (!events.isEmpty()) {
+ long startMs = time.milliseconds();
+ for (BackgroundEvent event : events) {
+
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() -
event.enqueuedMs());
+ try {
+ if (event instanceof CompletableEvent)
+ backgroundEventReaper.add((CompletableEvent<?>) event);
+
+ backgroundEventProcessor.process(event);
+ } catch (Throwable t) {
+ KafkaException e =
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+ if (!firstError.compareAndSet(null, e))
+ log.warn("An error occurred when processing the
background event: {}", e.getMessage(), e);
+ }
}
+
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
- startMs);
}
backgroundEventReaper.reap(time.milliseconds());
@@ -2088,7 +2112,7 @@ public class AsyncKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
}
@Override
- public KafkaConsumerMetrics kafkaConsumerMetrics() {
+ public AsyncConsumerMetrics kafkaConsumerMetrics() {
return kafkaConsumerMetrics;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 7ca01ce2578..0e7b58acc21 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -23,6 +23,7 @@ import
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.KafkaThread;
@@ -62,6 +63,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
private final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier;
private final Supplier<NetworkClientDelegate>
networkClientDelegateSupplier;
private final Supplier<RequestManagers> requestManagersSupplier;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
private ApplicationEventProcessor applicationEventProcessor;
private NetworkClientDelegate networkClientDelegate;
private RequestManagers requestManagers;
@@ -69,6 +71,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
private final IdempotentCloser closer = new IdempotentCloser();
private volatile Duration closeTimeout =
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
+ private long lastPollTimeMs = 0L;
public ConsumerNetworkThread(LogContext logContext,
Time time,
@@ -76,7 +79,8 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
CompletableEventReaper applicationEventReaper,
Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier,
Supplier<NetworkClientDelegate>
networkClientDelegateSupplier,
- Supplier<RequestManagers>
requestManagersSupplier) {
+ Supplier<RequestManagers>
requestManagersSupplier,
+ AsyncConsumerMetrics asyncConsumerMetrics) {
super(BACKGROUND_THREAD_NAME, true);
this.time = time;
this.log = logContext.logger(getClass());
@@ -86,6 +90,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
this.networkClientDelegateSupplier = networkClientDelegateSupplier;
this.requestManagersSupplier = requestManagersSupplier;
this.running = true;
+ this.asyncConsumerMetrics = asyncConsumerMetrics;
}
@Override
@@ -141,6 +146,11 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
processApplicationEvents();
final long currentTimeMs = time.milliseconds();
+ if (lastPollTimeMs != 0L) {
+
asyncConsumerMetrics.recordTimeBetweenNetworkThreadPoll(currentTimeMs -
lastPollTimeMs);
+ }
+ lastPollTimeMs = currentTimeMs;
+
final long pollWaitTimeMs = requestManagers.entries().stream()
.filter(Optional::isPresent)
.map(Optional::get)
@@ -166,8 +176,13 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
private void processApplicationEvents() {
LinkedList<ApplicationEvent> events = new LinkedList<>();
applicationEventQueue.drainTo(events);
+ if (events.isEmpty())
+ return;
+ asyncConsumerMetrics.recordApplicationEventQueueSize(0);
+ long startMs = time.milliseconds();
for (ApplicationEvent event : events) {
+
asyncConsumerMetrics.recordApplicationEventQueueTime(time.milliseconds() -
event.enqueuedMs());
try {
if (event instanceof CompletableEvent) {
applicationEventReaper.add((CompletableEvent<?>) event);
@@ -181,6 +196,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
log.warn("Error processing event {}", t.getMessage(), t);
}
}
+
asyncConsumerMetrics.recordApplicationEventQueueProcessingTime(time.milliseconds()
- startMs);
}
/**
@@ -189,7 +205,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
* is given least one attempt to satisfy any network requests
<em>before</em> checking if a timeout has expired.
*/
private void reapExpiredApplicationEvents(long currentTimeMs) {
- applicationEventReaper.reap(currentTimeMs);
+
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(currentTimeMs));
}
/**
@@ -326,7 +342,7 @@ public class ConsumerNetworkThread extends KafkaThread
implements Closeable {
log.error("Unexpected error during shutdown. Proceed with
closing.", e);
} finally {
sendUnsentRequests(timer);
- applicationEventReaper.reap(applicationEventQueue);
+
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue));
closeQuietly(requestManagers, "request managers");
closeQuietly(networkClientDelegate, "network client delegate");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index d803b66780b..e4b0fa924c0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -68,6 +68,7 @@ public final class ConsumerUtils {
public static final String CONSUMER_SHARE_METRIC_GROUP_PREFIX =
"consumer-share";
public static final String COORDINATOR_METRICS_SUFFIX =
"-coordinator-metrics";
public static final String CONSUMER_METRICS_SUFFIX = "-metrics";
+ public static final String CONSUMER_METRIC_GROUP =
CONSUMER_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
/**
* A fixed, large enough value will suffice for max.
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 6c1ab43a4f2..3c280e39d02 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
@@ -71,6 +72,7 @@ public class NetworkClientDelegate implements AutoCloseable {
private final long retryBackoffMs;
private Optional<Exception> metadataError;
private final boolean notifyMetadataErrorsViaErrorQueue;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
public NetworkClientDelegate(
final Time time,
@@ -79,7 +81,8 @@ public class NetworkClientDelegate implements AutoCloseable {
final KafkaClient client,
final Metadata metadata,
final BackgroundEventHandler backgroundEventHandler,
- final boolean notifyMetadataErrorsViaErrorQueue) {
+ final boolean notifyMetadataErrorsViaErrorQueue,
+ final AsyncConsumerMetrics asyncConsumerMetrics) {
this.time = time;
this.client = client;
this.metadata = metadata;
@@ -90,6 +93,7 @@ public class NetworkClientDelegate implements AutoCloseable {
this.retryBackoffMs =
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
this.metadataError = Optional.empty();
this.notifyMetadataErrorsViaErrorQueue =
notifyMetadataErrorsViaErrorQueue;
+ this.asyncConsumerMetrics = asyncConsumerMetrics;
}
// Visible for testing
@@ -149,6 +153,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
this.client.poll(pollTimeoutMs, currentTimeMs);
maybePropagateMetadataError();
checkDisconnects(currentTimeMs);
+
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(),
currentTimeMs);
}
private void maybePropagateMetadataError() {
@@ -182,6 +187,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
unsent.timer.update(currentTimeMs);
if (unsent.timer.isExpired()) {
iterator.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
unsent.enqueueTimeMs());
unsent.handler.onFailure(currentTimeMs, new TimeoutException(
"Failed to send request after " + unsent.timer.timeoutMs()
+ " ms."));
continue;
@@ -192,6 +198,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
continue;
}
iterator.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
unsent.enqueueTimeMs());
}
}
@@ -219,6 +226,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
UnsentRequest u = iter.next();
if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
iter.remove();
+
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() -
u.enqueueTimeMs());
AuthenticationException authenticationException =
client.authenticationException(u.node.get());
u.handler.onFailure(currentTimeMs, authenticationException);
}
@@ -282,6 +290,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
public void add(final UnsentRequest r) {
Objects.requireNonNull(r);
r.setTimer(this.time, this.requestTimeoutMs);
+ r.setEnqueueTimeMs(time.milliseconds());
unsentRequests.add(r);
}
@@ -315,6 +324,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
private final Optional<Node> node; // empty if random node can be
chosen
private Timer timer;
+ private long enqueueTimeMs; // time when the request was enqueued to
unsentRequests, not duration in the queue.
public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
final Optional<Node> node) {
@@ -332,6 +342,20 @@ public class NetworkClientDelegate implements
AutoCloseable {
return timer;
}
+ /**
+ * Set the time when the request was enqueued to {@link
NetworkClientDelegate#unsentRequests}.
+ */
+ private void setEnqueueTimeMs(final long enqueueTimeMs) {
+ this.enqueueTimeMs = enqueueTimeMs;
+ }
+
+ /**
+ * Return the time when the request was enqueued to {@link
NetworkClientDelegate#unsentRequests}.
+ */
+ private long enqueueTimeMs() {
+ return enqueueTimeMs;
+ }
+
CompletableFuture<ClientResponse> future() {
return handler.future;
}
@@ -428,7 +452,8 @@ public class NetworkClientDelegate implements AutoCloseable
{
final Sensor
throttleTimeSensor,
final
ClientTelemetrySender clientTelemetrySender,
final
BackgroundEventHandler backgroundEventHandler,
- final boolean
notifyMetadataErrorsViaErrorQueue) {
+ final boolean
notifyMetadataErrorsViaErrorQueue,
+ final
AsyncConsumerMetrics asyncConsumerMetrics) {
return new CachedSupplier<>() {
@Override
protected NetworkClientDelegate create() {
@@ -442,7 +467,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
metadata,
throttleTimeSensor,
clientTelemetrySender);
- return new NetworkClientDelegate(time, config, logContext,
client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+ return new NetworkClientDelegate(time, config, logContext,
client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue,
asyncConsumerMetrics);
}
};
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 0ca371253bc..4a39c75745e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -47,6 +47,7 @@ import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
import
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
@@ -160,6 +161,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
private final ApplicationEventHandler applicationEventHandler;
private final Time time;
private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
private Logger log;
private final String clientId;
private final String groupId;
@@ -252,6 +254,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.clientTelemetryReporter =
CommonClientConfigs.telemetryReporter(clientId, config);
this.clientTelemetryReporter.ifPresent(reporters::add);
this.metrics = createMetrics(config, time, reporters);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
this.deserializers = new Deserializers<>(config, keyDeserializer,
valueDeserializer);
this.currentFetch = ShareFetch.empty();
@@ -266,7 +269,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
ShareFetchMetricsManager shareFetchMetricsManager =
createShareFetchMetricsManager(metrics);
ApiVersions apiVersions = new ApiVersions();
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ backgroundEventQueue, time, asyncConsumerMetrics);
// This FetchBuffer is shared between the application and network
threads.
this.fetchBuffer = new ShareFetchBuffer(logContext);
@@ -280,7 +284,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
shareFetchMetricsManager.throttleTimeSensor(),
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
backgroundEventHandler,
- true
+ true,
+ asyncConsumerMetrics
);
this.completedAcknowledgements = new LinkedList<>();
@@ -311,7 +316,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
- requestManagersSupplier);
+ requestManagersSupplier,
+ asyncConsumerMetrics);
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper =
backgroundEventReaperFactory.build(logContext);
@@ -373,13 +379,15 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
new FetchConfig(config),
deserializers);
this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
final BlockingQueue<ApplicationEvent> applicationEventQueue = new
LinkedBlockingQueue<>();
final BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
- final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ final BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ backgroundEventQueue, time, asyncConsumerMetrics);
final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
- () -> new NetworkClientDelegate(time, config, logContext,
client, metadata, backgroundEventHandler, true);
+ () -> new NetworkClientDelegate(time, config, logContext,
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
config,
@@ -412,7 +420,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
new CompletableEventReaper(logContext),
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
- requestManagersSupplier);
+ requestManagersSupplier,
+ asyncConsumerMetrics);
this.backgroundEventQueue = new LinkedBlockingQueue<>();
this.backgroundEventProcessor = new BackgroundEventProcessor();
@@ -458,6 +467,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
this.kafkaShareConsumerMetrics = new
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
this.clientTelemetryReporter = Optional.empty();
this.completedAcknowledgements = Collections.emptyList();
+ this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
}
// auxiliary interface for testing
@@ -470,7 +480,8 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
final CompletableEventReaper applicationEventReaper,
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate>
networkClientDelegateSupplier,
- final Supplier<RequestManagers> requestManagersSupplier
+ final Supplier<RequestManagers> requestManagersSupplier,
+ final AsyncConsumerMetrics asyncConsumerMetrics
);
}
@@ -855,6 +866,7 @@ public class ShareConsumerImpl<K, V> implements
ShareConsumerDelegate<K, V> {
backgroundEventReaper.reap(backgroundEventQueue);
closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer
metrics", firstException);
+ closeQuietly(asyncConsumerMetrics, "kafka async consumer metrics",
firstException);
closeQuietly(metrics, "consumer metrics", firstException);
closeQuietly(deserializers, "consumer deserializers", firstException);
clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter,
"consumer telemetry reporter", firstException));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index c30c0c2edde..dfb775f8947 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -51,6 +51,12 @@ public abstract class ApplicationEvent {
*/
private final Uuid id;
+ /**
+ * The time in milliseconds when this event was enqueued.
+ * This field can be changed after the event is created, so it should not
be used in hashCode or equals.
+ */
+ private long enqueuedMs;
+
protected ApplicationEvent(Type type) {
this.type = Objects.requireNonNull(type);
this.id = Uuid.randomUuid();
@@ -64,6 +70,14 @@ public abstract class ApplicationEvent {
return id;
}
+ public void setEnqueuedMs(long enqueuedMs) {
+ this.enqueuedMs = enqueuedMs;
+ }
+
+ public long enqueuedMs() {
+ return enqueuedMs;
+ }
+
@Override
public final boolean equals(Object o) {
if (this == o) return true;
@@ -78,7 +92,7 @@ public abstract class ApplicationEvent {
}
protected String toStringBase() {
- return "type=" + type + ", id=" + id;
+ return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs;
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 0baafcd3038..dd6a1666c7b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -20,6 +20,7 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.internals.IdempotentCloser;
import org.apache.kafka.common.utils.LogContext;
@@ -42,9 +43,11 @@ import java.util.function.Supplier;
public class ApplicationEventHandler implements Closeable {
private final Logger log;
+ private final Time time;
private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final ConsumerNetworkThread networkThread;
private final IdempotentCloser closer = new IdempotentCloser();
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
public ApplicationEventHandler(final LogContext logContext,
final Time time,
@@ -52,16 +55,20 @@ public class ApplicationEventHandler implements Closeable {
final CompletableEventReaper
applicationEventReaper,
final Supplier<ApplicationEventProcessor>
applicationEventProcessorSupplier,
final Supplier<NetworkClientDelegate>
networkClientDelegateSupplier,
- final Supplier<RequestManagers>
requestManagersSupplier) {
+ final Supplier<RequestManagers>
requestManagersSupplier,
+ final AsyncConsumerMetrics
asyncConsumerMetrics) {
this.log = logContext.logger(ApplicationEventHandler.class);
+ this.time = time;
this.applicationEventQueue = applicationEventQueue;
+ this.asyncConsumerMetrics = asyncConsumerMetrics;
this.networkThread = new ConsumerNetworkThread(logContext,
time,
applicationEventQueue,
applicationEventReaper,
applicationEventProcessorSupplier,
networkClientDelegateSupplier,
- requestManagersSupplier);
+ requestManagersSupplier,
+ asyncConsumerMetrics);
this.networkThread.start();
}
@@ -73,7 +80,9 @@ public class ApplicationEventHandler implements Closeable {
*/
public void add(final ApplicationEvent event) {
Objects.requireNonNull(event, "ApplicationEvent provided to add must
be non-null");
+ event.setEnqueuedMs(time.milliseconds());
applicationEventQueue.add(event);
+
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
wakeupNetworkThread();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 7e9fdaed2d8..02fc4b4a29b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -38,6 +38,12 @@ public abstract class BackgroundEvent {
*/
private final Uuid id;
+ /**
+ * The time in milliseconds when this event was enqueued.
+ * This field can be changed after the event is created, so it should not
be used in hashCode or equals.
+ */
+ private long enqueuedMs;
+
protected BackgroundEvent(Type type) {
this.type = Objects.requireNonNull(type);
this.id = Uuid.randomUuid();
@@ -51,6 +57,14 @@ public abstract class BackgroundEvent {
return id;
}
+ public void setEnqueuedMs(long enqueuedMs) {
+ this.enqueuedMs = enqueuedMs;
+ }
+
+ public long enqueuedMs() {
+ return enqueuedMs;
+ }
+
@Override
public final boolean equals(Object o) {
if (this == o) return true;
@@ -65,7 +79,7 @@ public abstract class BackgroundEvent {
}
protected String toStringBase() {
- return "type=" + type + ", id=" + id;
+ return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs;
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index f6ded0bf735..adc621d5f2e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -17,9 +17,13 @@
package org.apache.kafka.clients.consumer.internals.events;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.utils.Time;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Objects;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
/**
* An event handler that receives {@link BackgroundEvent background events}
from the
@@ -29,10 +33,16 @@ import java.util.Queue;
public class BackgroundEventHandler {
- private final Queue<BackgroundEvent> backgroundEventQueue;
+ private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+ private final Time time;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
- public BackgroundEventHandler(final Queue<BackgroundEvent>
backgroundEventQueue) {
+ public BackgroundEventHandler(final BlockingQueue<BackgroundEvent>
backgroundEventQueue,
+ final Time time,
+ final AsyncConsumerMetrics
asyncConsumerMetrics) {
this.backgroundEventQueue = backgroundEventQueue;
+ this.time = time;
+ this.asyncConsumerMetrics = asyncConsumerMetrics;
}
/**
@@ -42,6 +52,20 @@ public class BackgroundEventHandler {
*/
public void add(BackgroundEvent event) {
Objects.requireNonNull(event, "BackgroundEvent provided to add must be
non-null");
+ event.setEnqueuedMs(time.milliseconds());
backgroundEventQueue.add(event);
+
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
+ }
+
+ /**
+ * Drain all the {@link BackgroundEvent events} from the handler.
+ *
+ * @return A list of {@link BackgroundEvent events} that were drained
+ */
+ public List<BackgroundEvent> drainEvents() {
+ List<BackgroundEvent> events = new ArrayList<>();
+ backgroundEventQueue.drainTo(events);
+ asyncConsumerMetrics.recordBackgroundEventQueueSize(0);
+ return events;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
index 019526836af..5a0358df896 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
@@ -82,8 +82,9 @@ public class CompletableEventReaper {
*
* @param currentTimeMs <em>Current</em> time with which to compare
against the
* <em>{@link CompletableEvent#deadlineMs()
expiration time}</em>
+ * @return The number of events that were expired
*/
- public void reap(long currentTimeMs) {
+ public long reap(long currentTimeMs) {
Consumer<CompletableEvent<?>> expireEvent = event -> {
long pastDueMs = currentTimeMs - event.deadlineMs();
TimeoutException error = new TimeoutException(String.format("%s
was %s ms past its expiration of %s", event.getClass().getSimpleName(),
pastDueMs, event.deadlineMs()));
@@ -96,13 +97,16 @@ public class CompletableEventReaper {
};
// First, complete (exceptionally) any events that have passed their
deadline AND aren't already complete.
- tracked.stream()
+ long count = tracked.stream()
.filter(e -> !e.future().isDone())
.filter(e -> currentTimeMs >= e.deadlineMs())
- .forEach(expireEvent);
+ .peek(expireEvent)
+ .count();
// Second, remove any events that are already complete, just to make
sure we don't hold references. This will
// include any events that finished successfully as well as any events
we just completed exceptionally above.
tracked.removeIf(e -> e.future().isDone());
+
+ return count;
}
/**
@@ -122,8 +126,9 @@ public class CompletableEventReaper {
* don't take the deadline into consideration, just close it regardless.
*
* @param events Events from a queue that have not yet been tracked that
also need to be reviewed
+ * @return The number of events that were expired
*/
- public void reap(Collection<?> events) {
+ public long reap(Collection<?> events) {
Objects.requireNonNull(events, "Event queue to reap must be non-null");
Consumer<CompletableEvent<?>> expireEvent = event -> {
@@ -136,17 +141,20 @@ public class CompletableEventReaper {
}
};
- tracked.stream()
+ long trackedExpiredCount = tracked.stream()
.filter(e -> !e.future().isDone())
- .forEach(expireEvent);
+ .peek(expireEvent)
+ .count();
tracked.clear();
- events.stream()
+ long eventExpiredCount = events.stream()
.filter(e -> e instanceof CompletableEvent<?>)
.map(e -> (CompletableEvent<?>) e)
.filter(e -> !e.future().isDone())
- .forEach(expireEvent);
+ .peek(expireEvent)
+ .count();
events.clear();
+ return trackedExpiredCount + eventExpiredCount;
}
public int size() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
new file mode 100644
index 00000000000..09e84cbe985
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+
+public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements
AutoCloseable {
+ private final Metrics metrics;
+
+ public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME =
"time-between-network-thread-poll";
+ public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME =
"application-event-queue-size";
+ public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME =
"application-event-queue-time";
+ public static final String
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"application-event-queue-processing-time";
+ public static final String APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME =
"application-events-expired-count";
+ public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME =
"background-event-queue-size";
+ public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME =
"background-event-queue-time";
+ public static final String
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME =
"background-event-queue-processing-time";
+ public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME =
"unsent-requests-queue-size";
+ public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME =
"unsent-requests-queue-time";
+ private final Sensor timeBetweenNetworkThreadPollSensor;
+ private final Sensor applicationEventQueueSizeSensor;
+ private final Sensor applicationEventQueueTimeSensor;
+ private final Sensor applicationEventQueueProcessingTimeSensor;
+ private final Sensor applicationEventExpiredSizeSensor;
+ private final Sensor backgroundEventQueueSizeSensor;
+ private final Sensor backgroundEventQueueTimeSensor;
+ private final Sensor backgroundEventQueueProcessingTimeSensor;
+ private final Sensor unsentRequestsQueueSizeSensor;
+ private final Sensor unsentRequestsQueueTimeSensor;
+
+ public AsyncConsumerMetrics(Metrics metrics) {
+ super(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+
+ this.metrics = metrics;
+ this.timeBetweenNetworkThreadPollSensor =
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+ this.timeBetweenNetworkThreadPollSensor.add(
+ metrics.metricName(
+ "time-between-network-thread-poll-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time taken, in milliseconds, between each poll in
the network thread."
+ ),
+ new Avg()
+ );
+ this.timeBetweenNetworkThreadPollSensor.add(
+ metrics.metricName(
+ "time-between-network-thread-poll-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time taken, in milliseconds, between each poll in
the network thread."
+ ),
+ new Max()
+ );
+
+ this.applicationEventQueueSizeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+ this.applicationEventQueueSizeSensor.add(
+ metrics.metricName(
+ APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
+ CONSUMER_METRIC_GROUP,
+ "The current number of events in the queue to send from the
application thread to the background thread."
+ ),
+ new Value()
+ );
+
+ this.applicationEventQueueTimeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME);
+ this.applicationEventQueueTimeSensor.add(
+ metrics.metricName(
+ "application-event-queue-time-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time, in milliseconds, that application events
are taking to be dequeued."
+ ),
+ new Avg()
+ );
+ this.applicationEventQueueTimeSensor.add(
+ metrics.metricName(
+ "application-event-queue-time-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time, in milliseconds, that an application event
took to be dequeued."
+ ),
+ new Max()
+ );
+
+ this.applicationEventQueueProcessingTimeSensor =
metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+ this.applicationEventQueueProcessingTimeSensor.add(
+ metrics.metricName(
+ "application-event-queue-processing-time-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time, in milliseconds, that the background thread
takes to process all available application events."
+ ),
+ new Avg()
+ );
+ this.applicationEventQueueProcessingTimeSensor.add(
+ metrics.metricName("application-event-queue-processing-time-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time, in milliseconds, that the background thread
took to process all available application events."
+ ),
+ new Max()
+ );
+
+ this.applicationEventExpiredSizeSensor =
metrics.sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME);
+ this.applicationEventExpiredSizeSensor.add(
+ metrics.metricName(
+ APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME,
+ CONSUMER_METRIC_GROUP,
+ "The current number of expired application events."
+ ),
+ new Value()
+ );
+
+ this.unsentRequestsQueueSizeSensor =
metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME);
+ this.unsentRequestsQueueSizeSensor.add(
+ metrics.metricName(
+ UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME,
+ CONSUMER_METRIC_GROUP,
+ "The current number of unsent requests in the background
thread."
+ ),
+ new Value()
+ );
+
+ this.unsentRequestsQueueTimeSensor =
metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME);
+ this.unsentRequestsQueueTimeSensor.add(
+ metrics.metricName(
+ "unsent-requests-queue-time-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time, in milliseconds, that requests are taking
to be sent in the background thread."
+ ),
+ new Avg()
+ );
+ this.unsentRequestsQueueTimeSensor.add(
+ metrics.metricName(
+ "unsent-requests-queue-time-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time, in milliseconds, that a request remained
unsent in the background thread."
+ ),
+ new Max()
+ );
+
+ this.backgroundEventQueueSizeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME);
+ this.backgroundEventQueueSizeSensor.add(
+ metrics.metricName(
+ BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
+ CONSUMER_METRIC_GROUP,
+ "The current number of events in the queue to send from the
background thread to the application thread."
+ ),
+ new Value()
+ );
+
+ this.backgroundEventQueueTimeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME);
+ this.backgroundEventQueueTimeSensor.add(
+ metrics.metricName(
+ "background-event-queue-time-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time, in milliseconds, that background events are
taking to be dequeued."
+ ),
+ new Avg()
+ );
+ this.backgroundEventQueueTimeSensor.add(
+ metrics.metricName(
+ "background-event-queue-time-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time, in milliseconds, that background events are
taking to be dequeued."
+ ),
+ new Max()
+ );
+
+ this.backgroundEventQueueProcessingTimeSensor =
metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+ this.backgroundEventQueueProcessingTimeSensor.add(
+ metrics.metricName(
+ "background-event-queue-processing-time-avg",
+ CONSUMER_METRIC_GROUP,
+ "The average time, in milliseconds, that the consumer took to
process all available background events."
+ ),
+ new Avg()
+ );
+ this.backgroundEventQueueProcessingTimeSensor.add(
+ metrics.metricName(
+ "background-event-queue-processing-time-max",
+ CONSUMER_METRIC_GROUP,
+ "The maximum time, in milliseconds, that the consumer took to
process all available background events."
+ ),
+ new Max()
+ );
+ }
+
+ public void recordTimeBetweenNetworkThreadPoll(long
timeBetweenNetworkThreadPoll) {
+
this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll);
+ }
+
+ public void recordApplicationEventQueueSize(int size) {
+ this.applicationEventQueueSizeSensor.record(size);
+ }
+
+ public void recordApplicationEventQueueTime(long time) {
+ this.applicationEventQueueTimeSensor.record(time);
+ }
+
+ public void recordApplicationEventQueueProcessingTime(long processingTime)
{
+ this.applicationEventQueueProcessingTimeSensor.record(processingTime);
+ }
+
+ public void recordApplicationEventExpiredSize(long size) {
+ this.applicationEventExpiredSizeSensor.record(size);
+ }
+
+ public void recordUnsentRequestsQueueSize(int size, long timeMs) {
+ this.unsentRequestsQueueSizeSensor.record(size, timeMs);
+ }
+
+ public void recordUnsentRequestsQueueTime(long time) {
+ this.unsentRequestsQueueTimeSensor.record(time);
+ }
+
+ public void recordBackgroundEventQueueSize(int size) {
+ this.backgroundEventQueueSizeSensor.record(size);
+ }
+
+ public void recordBackgroundEventQueueTime(long time) {
+ this.backgroundEventQueueTimeSensor.record(time);
+ }
+
+ public void recordBackgroundEventQueueProcessingTime(long processingTime) {
+ this.backgroundEventQueueProcessingTimeSensor.record(processingTime);
+ }
+
+ @Override
+ public void close() {
+ Arrays.asList(
+ timeBetweenNetworkThreadPollSensor.name(),
+ applicationEventQueueSizeSensor.name(),
+ applicationEventQueueTimeSensor.name(),
+ applicationEventQueueProcessingTimeSensor.name(),
+ applicationEventExpiredSizeSensor.name(),
+ backgroundEventQueueSizeSensor.name(),
+ backgroundEventQueueTimeSensor.name(),
+ backgroundEventQueueProcessingTimeSensor.name(),
+ unsentRequestsQueueSizeSensor.name(),
+ unsentRequestsQueueTimeSensor.name()
+ ).forEach(metrics::removeSensor);
+ super.close();
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
new file mode 100644
index 00000000000..a8ce990a23d
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+ private final Time time = new MockTime();
+ private final BlockingQueue<ApplicationEvent> applicationEventsQueue =
new LinkedBlockingQueue<>();
+ private final ApplicationEventProcessor applicationEventProcessor =
mock(ApplicationEventProcessor.class);
+ private final NetworkClientDelegate networkClientDelegate =
mock(NetworkClientDelegate.class);
+ private final RequestManagers requestManagers =
mock(RequestManagers.class);
+ private final CompletableEventReaper applicationEventReaper =
mock(CompletableEventReaper.class);
+
+ @Test
+ public void testRecordApplicationEventQueueSize() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ ApplicationEventHandler applicationEventHandler = new
ApplicationEventHandler(
+ new LogContext(),
+ time,
+ applicationEventsQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ // add event
+ applicationEventHandler.add(new PollEvent(time.milliseconds()));
+ assertEquals(
+ 1,
+ (double) metrics.metric(
+ metrics.metricName(
+
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
+ ConsumerUtils.CONSUMER_METRIC_GROUP
+ )
+ ).metricValue()
+ );
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 5239482bdd0..819365e9712 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -57,6 +57,7 @@ import
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscr
import
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
@@ -133,6 +134,7 @@ import static
org.apache.kafka.clients.consumer.internals.AbstractMembershipMana
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
import static
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -211,7 +213,7 @@ public class AsyncKafkaConsumerTest {
new StringDeserializer(),
new StringDeserializer(),
time,
- (a, b, c, d, e, f, g) -> applicationEventHandler,
+ (a, b, c, d, e, f, g, h) -> applicationEventHandler,
a -> backgroundEventReaper,
(a, b, c, d, e, f, g) -> fetchCollector,
(a, b, c, d) -> metadata,
@@ -225,7 +227,7 @@ public class AsyncKafkaConsumerTest {
new StringDeserializer(),
new StringDeserializer(),
time,
- (a, b, c, d, e, f, g) -> applicationEventHandler,
+ (a, b, c, d, e, f, g, h) -> applicationEventHandler,
a -> backgroundEventReaper,
(a, b, c, d, e, f, g) -> fetchCollector,
(a, b, c, d) -> metadata,
@@ -358,7 +360,7 @@ public class AsyncKafkaConsumerTest {
assertEquals(topicPartitionOffsets,
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
final Metric metric = consumer.metrics()
-
.get(consumer.metricsRegistry().metricName("committed-time-ns-total",
"consumer-metrics"));
+
.get(consumer.metricsRegistry().metricName("committed-time-ns-total",
CONSUMER_METRIC_GROUP));
assertTrue((double) metric.metricValue() > 0);
}
@@ -1915,6 +1917,31 @@ public class AsyncKafkaConsumerTest {
}, "Consumer did not throw the expected UnsupportedVersionException on
poll");
}
+ @Test
+ public void
testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
+ consumer = newConsumer(
+ mock(FetchBuffer.class),
+ mock(ConsumerInterceptors.class),
+ mock(ConsumerRebalanceListenerInvoker.class),
+ mock(SubscriptionState.class),
+ "group-id",
+ "client-id",
+ false);
+ Metrics metrics = consumer.metricsRegistry();
+ AsyncConsumerMetrics kafkaConsumerMetrics =
consumer.kafkaConsumerMetrics();
+
+ ConsumerRebalanceListenerCallbackNeededEvent event = new
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED,
Collections.emptySortedSet());
+ event.setEnqueuedMs(time.milliseconds());
+ backgroundEventQueue.add(event);
+ kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+ time.sleep(10);
+ consumer.processBackgroundEvents();
+ assertEquals(0, (double)
metrics.metric(metrics.metricName("background-event-queue-size",
CONSUMER_METRIC_GROUP)).metricValue());
+ assertEquals(10, (double)
metrics.metric(metrics.metricName("background-event-queue-time-avg",
CONSUMER_METRIC_GROUP)).metricValue());
+ assertEquals(10, (double)
metrics.metric(metrics.metricName("background-event-queue-time-max",
CONSUMER_METRIC_GROUP)).metricValue());
+ }
+
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
new file mode 100644
index 00000000000..63269b6f554
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics.BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BackgroundEventHandlerTest {
+ private final BlockingQueue<BackgroundEvent> backgroundEventsQueue = new
LinkedBlockingQueue<>();
+
+ @Test
+ public void testRecordBackgroundEventQueueSize() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics)) {
+ BackgroundEventHandler backgroundEventHandler = new
BackgroundEventHandler(
+ backgroundEventsQueue,
+ new MockTime(0),
+ asyncConsumerMetrics);
+ // add event
+ backgroundEventHandler.add(new ErrorEvent(new Throwable()));
+ assertEquals(
+ 1,
+ (double) metrics.metric(
+
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+
+ // drain event
+ backgroundEventHandler.drainEvents();
+ assertEquals(
+ 0,
+ (double) metrics.metric(
+
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 9d09aee2697..9517e04e054 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -21,6 +21,7 @@ import
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
import
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
import org.apache.kafka.common.KafkaException;
@@ -117,8 +118,8 @@ public class ConsumerMembershipManagerTest {
subscriptionState = mock(SubscriptionState.class);
commitRequestManager = mock(CommitRequestManager.class);
backgroundEventQueue = new LinkedBlockingQueue<>();
- backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
time = new MockTime(0);
+ backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue, time,
mock(AsyncConsumerMetrics.class));
metrics = new Metrics(time);
rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 0e85fe2d799..520279fc8d4 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -40,6 +43,7 @@ import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -53,7 +57,7 @@ import static org.mockito.Mockito.when;
public class ConsumerNetworkThreadTest {
private final Time time;
- private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+ private final BlockingQueue<ApplicationEvent> applicationEventQueue;
private final ApplicationEventProcessor applicationEventProcessor;
private final OffsetsRequestManager offsetsRequestManager;
private final ConsumerHeartbeatRequestManager heartbeatRequestManager;
@@ -62,6 +66,7 @@ public class ConsumerNetworkThreadTest {
private final NetworkClientDelegate networkClientDelegate;
private final RequestManagers requestManagers;
private final CompletableEventReaper applicationEventReaper;
+ private final AsyncConsumerMetrics asyncConsumerMetrics;
ConsumerNetworkThreadTest() {
this.networkClientDelegate = mock(NetworkClientDelegate.class);
@@ -72,17 +77,19 @@ public class ConsumerNetworkThreadTest {
this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
this.applicationEventReaper = mock(CompletableEventReaper.class);
this.time = new MockTime();
- this.applicationEventsQueue = new LinkedBlockingQueue<>();
+ this.applicationEventQueue = new LinkedBlockingQueue<>();
+ this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class);
LogContext logContext = new LogContext();
this.consumerNetworkThread = new ConsumerNetworkThread(
logContext,
time,
- applicationEventsQueue,
+ applicationEventQueue,
applicationEventReaper,
() -> applicationEventProcessor,
() -> networkClientDelegate,
- () -> requestManagers
+ () -> requestManagers,
+ asyncConsumerMetrics
);
}
@@ -183,14 +190,18 @@ public class ConsumerNetworkThreadTest {
public void testCleanupInvokesReaper() {
LinkedList<NetworkClientDelegate.UnsentRequest> queue = new
LinkedList<>();
when(networkClientDelegate.unsentRequests()).thenReturn(queue);
+
when(applicationEventReaper.reap(applicationEventQueue)).thenReturn(1L);
consumerNetworkThread.cleanup();
- verify(applicationEventReaper).reap(applicationEventsQueue);
+ verify(applicationEventReaper).reap(applicationEventQueue);
+ verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L);
}
@Test
public void testRunOnceInvokesReaper() {
+ when(applicationEventReaper.reap(any(Long.class))).thenReturn(1L);
consumerNetworkThread.runOnce();
verify(applicationEventReaper).reap(any(Long.class));
+ verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L);
}
@Test
@@ -199,4 +210,82 @@ public class ConsumerNetworkThreadTest {
consumerNetworkThread.cleanup();
verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
}
+
+ @Test
+ public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ ConsumerNetworkThread consumerNetworkThread = new
ConsumerNetworkThread(
+ new LogContext(),
+ time,
+ applicationEventQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ consumerNetworkThread.initializeResources();
+
+ consumerNetworkThread.runOnce();
+ time.sleep(10);
+ consumerNetworkThread.runOnce();
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("time-between-network-thread-poll-avg",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("time-between-network-thread-poll-max",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ }
+ }
+
+ @Test
+ public void
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ ConsumerNetworkThread consumerNetworkThread = new
ConsumerNetworkThread(
+ new LogContext(),
+ time,
+ applicationEventQueue,
+ applicationEventReaper,
+ () -> applicationEventProcessor,
+ () -> networkClientDelegate,
+ () -> requestManagers,
+ asyncConsumerMetrics
+ )) {
+ consumerNetworkThread.initializeResources();
+
+ PollEvent event = new PollEvent(0);
+ event.setEnqueuedMs(time.milliseconds());
+ applicationEventQueue.add(event);
+ asyncConsumerMetrics.recordApplicationEventQueueSize(1);
+
+ time.sleep(10);
+ consumerNetworkThread.runOnce();
+ assertEquals(
+ 0,
+ (double) metrics.metric(
+ metrics.metricName("application-event-queue-size",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("application-event-queue-time-avg",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("application-event-queue-time-max",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index e97efd2b75f..ab665aac695 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
@@ -3779,7 +3780,7 @@ public class FetchRequestManagerTest {
Metadata metadata,
BackgroundEventHandler
backgroundEventHandler,
boolean
notifyMetadataErrorsViaErrorQueue) {
- super(time, config, logContext, client, metadata,
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+ super(time, config, logContext, client, metadata,
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue,
mock(AsyncConsumerMetrics.class));
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
index 4177419fa53..81eb5187fec 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -23,11 +23,13 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
@@ -41,15 +43,17 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -228,8 +232,8 @@ public class NetworkClientDelegateTest {
AuthenticationException authException = new
AuthenticationException("Test Auth Exception");
doThrow(authException).when(metadata).maybeThrowAnyException();
- LinkedList<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
- this.backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue);
+ BlockingQueue<BackgroundEvent> backgroundEventQueue = new
LinkedBlockingQueue<>();
+ this.backgroundEventHandler = new
BackgroundEventHandler(backgroundEventQueue, time,
mock(AsyncConsumerMetrics.class));
NetworkClientDelegate networkClientDelegate =
newNetworkClientDelegate(true);
assertEquals(0, backgroundEventQueue.size());
@@ -242,20 +246,59 @@ public class NetworkClientDelegateTest {
assertEquals(authException, ((ErrorEvent) event).error());
}
+ @Test
+ public void testRecordUnsentRequestsQueueTime() throws Exception {
+ try (Metrics metrics = new Metrics();
+ AsyncConsumerMetrics asyncConsumerMetrics = new
AsyncConsumerMetrics(metrics);
+ NetworkClientDelegate networkClientDelegate =
newNetworkClientDelegate(false, asyncConsumerMetrics)) {
+ NetworkClientDelegate.UnsentRequest unsentRequest =
newUnsentFindCoordinatorRequest();
+ networkClientDelegate.add(unsentRequest);
+ asyncConsumerMetrics.recordUnsentRequestsQueueSize(1,
time.milliseconds());
+
+ time.sleep(10);
+ long timeMs = time.milliseconds();
+ networkClientDelegate.poll(0, timeMs);
+ assertEquals(
+ 0,
+ (double) metrics.metric(
+ metrics.metricName("unsent-requests-queue-size",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("unsent-requests-queue-time-avg",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ assertEquals(
+ 10,
+ (double) metrics.metric(
+ metrics.metricName("unsent-requests-queue-time-max",
CONSUMER_METRIC_GROUP)
+ ).metricValue()
+ );
+ }
+ }
+
public NetworkClientDelegate newNetworkClientDelegate(boolean
notifyMetadataErrorsViaErrorQueue) {
+ return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue,
mock(AsyncConsumerMetrics.class));
+ }
+
+ public NetworkClientDelegate newNetworkClientDelegate(boolean
notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) {
LogContext logContext = new LogContext();
Properties properties = new Properties();
properties.put(KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
properties.put(VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
properties.put(GROUP_ID_CONFIG, GROUP_ID);
properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
- return new NetworkClientDelegate(this.time,
+ return new NetworkClientDelegate(time,
new ConsumerConfig(properties),
logContext,
this.client,
this.metadata,
this.backgroundEventHandler,
- notifyMetadataErrorsViaErrorQueue);
+ notifyMetadataErrorsViaErrorQueue,
+ asyncConsumerMetrics
+ );
}
public NetworkClientDelegate.UnsentRequest
newUnsentFindCoordinatorRequest() {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 7231e0b6390..1b1ed587203 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
@@ -113,6 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -1656,7 +1658,7 @@ public class ShareConsumeRequestManagerTest {
subscriptions,
fetchConfig,
deserializers);
- BackgroundEventHandler backgroundEventHandler = new
TestableBackgroundEventHandler(completedAcknowledgements);
+ BackgroundEventHandler backgroundEventHandler = new
TestableBackgroundEventHandler(time, completedAcknowledgements);
shareConsumeRequestManager = spy(new
TestableShareConsumeRequestManager<>(
logContext,
groupId,
@@ -1687,8 +1689,9 @@ public class ShareConsumeRequestManagerTest {
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,
String.valueOf(requestTimeoutMs));
properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG,
String.valueOf(retryBackoffMs));
ConsumerConfig config = new ConsumerConfig(properties);
- networkClientDelegate = spy(new TestableNetworkClientDelegate(time,
config, logContext, client, metadata,
- new BackgroundEventHandler(new LinkedBlockingQueue<>()),
false));
+ networkClientDelegate = spy(new TestableNetworkClientDelegate(
+ time, config, logContext, client, metadata,
+ new BackgroundEventHandler(new LinkedBlockingQueue<>(), time,
mock(AsyncConsumerMetrics.class)), false));
}
private class TestableShareConsumeRequestManager<K, V> extends
ShareConsumeRequestManager {
@@ -1742,7 +1745,7 @@ public class ShareConsumeRequestManagerTest {
Metadata metadata,
BackgroundEventHandler
backgroundEventHandler,
boolean
notifyMetadataErrorsViaErrorQueue) {
- super(time, config, logContext, client, metadata,
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+ super(time, config, logContext, client, metadata,
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue,
mock(AsyncConsumerMetrics.class));
}
@Override
@@ -1837,8 +1840,8 @@ public class ShareConsumeRequestManagerTest {
private static class TestableBackgroundEventHandler extends
BackgroundEventHandler {
List<Map<TopicIdPartition, Acknowledgements>>
completedAcknowledgements;
- public TestableBackgroundEventHandler(List<Map<TopicIdPartition,
Acknowledgements>> completedAcknowledgements) {
- super(new LinkedBlockingQueue<>());
+ public TestableBackgroundEventHandler(Time time,
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements) {
+ super(new LinkedBlockingQueue<>(), time,
mock(AsyncConsumerMetrics.class));
this.completedAcknowledgements = completedAcknowledgements;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 7fd0bc3fe60..04db229c8df 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -122,7 +122,7 @@ public class ShareConsumerImplTest {
new StringDeserializer(),
new StringDeserializer(),
time,
- (a, b, c, d, e, f, g) -> applicationEventHandler,
+ (a, b, c, d, e, f, g, h) -> applicationEventHandler,
a -> backgroundEventReaper,
(a, b, c, d, e) -> fetchCollector,
backgroundEventQueue
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
index eabcb8773e1..71e44631ee8 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
@@ -49,7 +49,7 @@ public class CompletableEventReaperTest {
// Without any time passing, we check the reaper and verify that the
event is not done amd is still
// being tracked.
- reaper.reap(time.milliseconds());
+ assertEquals(0, reaper.reap(time.milliseconds()));
assertFalse(event.future().isDone());
assertEquals(1, reaper.size());
@@ -62,7 +62,7 @@ public class CompletableEventReaperTest {
// Call the reaper and validate that the event is now "done"
(expired), the correct exception type is
// thrown, and the event is no longer tracked.
- reaper.reap(time.milliseconds());
+ assertEquals(1, reaper.reap(time.milliseconds()));
assertTrue(event.future().isDone());
assertThrows(TimeoutException.class, () ->
ConsumerUtils.getResult(event.future()));
assertEquals(0, reaper.size());
@@ -77,7 +77,7 @@ public class CompletableEventReaperTest {
// Without any time passing, we check the reaper and verify that the
event is not done amd is still
// being tracked.
- reaper.reap(time.milliseconds());
+ assertEquals(0, reaper.reap(time.milliseconds()));
assertFalse(event.future().isDone());
assertEquals(1, reaper.size());
@@ -91,7 +91,7 @@ public class CompletableEventReaperTest {
time.sleep(timeoutMs + 1);
// Call the reaper and validate that the event is not considered
expired, but is still no longer tracked.
- reaper.reap(time.milliseconds());
+ assertEquals(0, reaper.reap(time.milliseconds()));
assertTrue(event.future().isDone());
assertNull(ConsumerUtils.getResult(event.future()));
assertEquals(0, reaper.size());
@@ -108,7 +108,7 @@ public class CompletableEventReaperTest {
// Without any time passing, we check the reaper and verify that the
event is not done amd is still
// being tracked.
- reaper.reap(time.milliseconds());
+ assertEquals(0, reaper.reap(time.milliseconds()));
assertFalse(event1.future().isDone());
assertFalse(event2.future().isDone());
assertEquals(2, reaper.size());
@@ -124,7 +124,7 @@ public class CompletableEventReaperTest {
// Validate that the first (completed) event is not expired, but the
second one is expired. In either case,
// both should be completed and neither should be tracked anymore.
- reaper.reap(time.milliseconds());
+ assertEquals(1, reaper.reap(time.milliseconds()));
assertTrue(event1.future().isDone());
assertTrue(event2.future().isDone());
assertNull(ConsumerUtils.getResult(event1.future()));
@@ -150,7 +150,7 @@ public class CompletableEventReaperTest {
assertEquals(2, queue.size());
// Go ahead and reap the incomplete from the queue.
- reaper.reap(queue);
+ assertEquals(1, reaper.reap(queue));
// The first event was completed, so we didn't expire it in the reaper.
assertTrue(event1.future().isDone());
@@ -186,7 +186,7 @@ public class CompletableEventReaperTest {
assertEquals(2, reaper.size());
// Go ahead and reap the incomplete events. Both sets should be zero
after that.
- reaper.reap(queue);
+ assertEquals(1, reaper.reap(queue));
assertEquals(0, reaper.size());
assertEquals(0, queue.size());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
new file mode 100644
index 00000000000..2913bcfad70
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AsyncConsumerMetricsTest {
+ private static final long METRIC_VALUE = 123L;
+
+ private final Metrics metrics = new Metrics();
+ private AsyncConsumerMetrics consumerMetrics;
+
+ @AfterEach
+ public void tearDown() {
+ if (consumerMetrics != null) {
+ consumerMetrics.close();
+ }
+ metrics.close();
+ }
+
+ @Test
+ public void shouldMetricNames() {
+ // create
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ HashSet<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
+ metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
+ metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
+ metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
+ metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
+ metrics.metricName("commit-sync-time-ns-total",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("committed-time-ns-total",
CONSUMER_METRIC_GROUP)
+ ));
+ expectedMetrics.forEach(
+ metricName -> assertTrue(
+ metrics.metrics().containsKey(metricName),
+ "Missing metric: " + metricName
+ )
+ );
+
+ HashSet<MetricName> expectedConsumerMetrics = new
HashSet<>(Arrays.asList(
+ metrics.metricName("time-between-network-thread-poll-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("time-between-network-thread-poll-max",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("application-event-queue-size",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("application-event-queue-time-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("application-event-queue-time-max",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("application-event-queue-processing-time-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("application-event-queue-processing-time-max",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("unsent-requests-queue-size",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("unsent-requests-queue-time-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("unsent-requests-queue-time-max",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("background-event-queue-size",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("background-event-queue-time-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("background-event-queue-time-max",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("background-event-queue-processing-time-avg",
CONSUMER_METRIC_GROUP),
+ metrics.metricName("background-event-queue-processing-time-max",
CONSUMER_METRIC_GROUP)
+ ));
+ expectedConsumerMetrics.forEach(
+ metricName -> assertTrue(
+ metrics.metrics().containsKey(metricName),
+ "Missing metric: " + metricName
+ )
+ );
+
+ // close
+ consumerMetrics.close();
+ expectedMetrics.forEach(
+ metricName -> assertFalse(
+ metrics.metrics().containsKey(metricName),
+ "Metric present after close: " + metricName
+ )
+ );
+ expectedConsumerMetrics.forEach(
+ metricName -> assertFalse(
+ metrics.metrics().containsKey(metricName),
+ "Metric present after close: " + metricName
+ )
+ );
+ }
+
+ @Test
+ public void shouldRecordTimeBetweenNetworkThreadPoll() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("time-between-network-thread-poll-avg");
+ assertMetricValue("time-between-network-thread-poll-max");
+ }
+
+ @Test
+ public void shouldRecordApplicationEventQueueSize() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordApplicationEventQueueSize(10);
+
+ // Then:
+ assertEquals(
+ metrics.metric(
+ metrics.metricName(
+ "application-event-queue-size",
+ CONSUMER_METRIC_GROUP
+ )
+ ).metricValue(),
+ (double) 10
+ );
+ }
+
+ @Test
+ public void shouldRecordApplicationEventQueueTime() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("application-event-queue-time-avg");
+ assertMetricValue("application-event-queue-time-max");
+ }
+
+ @Test
+ public void shouldRecordApplicationEventQueueProcessingTime() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+
consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("application-event-queue-processing-time-avg");
+ assertMetricValue("application-event-queue-processing-time-max");
+ }
+
+ @Test
+ public void shouldRecordUnsentRequestsQueueSize() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordUnsentRequestsQueueSize(10, 100);
+
+ // Then:
+ assertEquals(
+ metrics.metric(
+ metrics.metricName(
+ "unsent-requests-queue-size",
+ CONSUMER_METRIC_GROUP
+ )
+ ).metricValue(),
+ (double) 10
+ );
+ }
+
+ @Test
+ public void shouldRecordUnsentRequestsQueueTime() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("unsent-requests-queue-time-avg");
+ assertMetricValue("unsent-requests-queue-time-max");
+ }
+
+ @Test
+ public void shouldRecordBackgroundEventQueueSize() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordBackgroundEventQueueSize(10);
+
+ // Then:
+ assertEquals(
+ metrics.metric(
+ metrics.metricName(
+ "background-event-queue-size",
+ CONSUMER_METRIC_GROUP
+ )
+ ).metricValue(),
+ (double) 10
+ );
+ }
+
+ @Test
+ public void shouldRecordBackgroundEventQueueTime() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("background-event-queue-time-avg");
+ assertMetricValue("background-event-queue-time-max");
+ }
+
+ @Test
+ public void shouldRecordBackgroundEventQueueProcessingTime() {
+ consumerMetrics = new AsyncConsumerMetrics(metrics);
+ // When:
+ consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE);
+
+ // Then:
+ assertMetricValue("background-event-queue-processing-time-avg");
+ assertMetricValue("background-event-queue-processing-time-avg");
+ }
+
+ private void assertMetricValue(final String name) {
+ assertEquals(
+ metrics.metric(
+ metrics.metricName(
+ name,
+ CONSUMER_METRIC_GROUP
+ )
+ ).metricValue(),
+ (double) METRIC_VALUE
+ );
+ }
+}