This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 770d64d2ccb KAFKA-16143: New JMX metrics for AsyncKafkaConsumer 
(#17199)
770d64d2ccb is described below

commit 770d64d2ccbc02ba8579c6f50b2901a3f2245928
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Dec 13 20:20:27 2024 +0800

    KAFKA-16143: New JMX metrics for AsyncKafkaConsumer (#17199)
    
    Reviewers: Andrew Schofield <[email protected]>, Kirk True 
<[email protected]>, Lianet Magrans <[email protected]>
---
 .../consumer/internals/AsyncKafkaConsumer.java     |  84 ++++---
 .../consumer/internals/ConsumerNetworkThread.java  |  22 +-
 .../clients/consumer/internals/ConsumerUtils.java  |   1 +
 .../consumer/internals/NetworkClientDelegate.java  |  31 ++-
 .../consumer/internals/ShareConsumerImpl.java      |  26 +-
 .../internals/events/ApplicationEvent.java         |  16 +-
 .../internals/events/ApplicationEventHandler.java  |  13 +-
 .../consumer/internals/events/BackgroundEvent.java |  16 +-
 .../internals/events/BackgroundEventHandler.java   |  30 ++-
 .../internals/events/CompletableEventReaper.java   |  24 +-
 .../internals/metrics/AsyncConsumerMetrics.java    | 262 +++++++++++++++++++++
 .../internals/ApplicationEventHandlerTest.java     |  73 ++++++
 .../consumer/internals/AsyncKafkaConsumerTest.java |  33 ++-
 .../internals/BackgroundEventHandlerTest.java      |  65 +++++
 .../internals/ConsumerMembershipManagerTest.java   |   3 +-
 .../internals/ConsumerNetworkThreadTest.java       |  99 +++++++-
 .../internals/FetchRequestManagerTest.java         |   3 +-
 .../internals/NetworkClientDelegateTest.java       |  53 ++++-
 .../internals/ShareConsumeRequestManagerTest.java  |  15 +-
 .../consumer/internals/ShareConsumerImplTest.java  |   2 +-
 .../events/CompletableEventReaperTest.java         |  16 +-
 .../metrics/AsyncConsumerMetricsTest.java          | 237 +++++++++++++++++++
 22 files changed, 1036 insertions(+), 88 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 5bbabe61a6c..ff679e5542d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -69,7 +69,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscr
 import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
-import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaConsumerMetrics;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
@@ -109,7 +109,6 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -215,10 +214,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private final AtomicReference<Optional<ConsumerGroupMetadata>> 
groupMetadata = new AtomicReference<>(Optional.empty());
-    private final KafkaConsumerMetrics kafkaConsumerMetrics;
+    private final AsyncConsumerMetrics kafkaConsumerMetrics;
     private Logger log;
     private final String clientId;
     private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final BackgroundEventHandler backgroundEventHandler;
     private final BackgroundEventProcessor backgroundEventProcessor;
     private final CompletableEventReaper backgroundEventReaper;
     private final Deserializers<K, V> deserializers;
@@ -320,6 +320,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             this.clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
             this.clientTelemetryReporter.ifPresent(reporters::add);
             this.metrics = createMetrics(config, time, reporters);
+            this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
 
             List<ConsumerInterceptor<K, V>> interceptorList = 
configuredConsumerInterceptors(config);
@@ -339,7 +340,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-            final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+            this.backgroundEventHandler = new BackgroundEventHandler(
+                backgroundEventQueue,
+                time,
+                kafkaConsumerMetrics
+            );
 
             // This FetchBuffer is shared between the application and network 
threads.
             this.fetchBuffer = new FetchBuffer(logContext);
@@ -352,7 +357,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     fetchMetricsManager.throttleTimeSensor(),
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
                     backgroundEventHandler,
-                    false);
+                    false,
+                    kafkaConsumerMetrics
+            );
             this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
             this.groupMetadata.set(initializeGroupMetadata(config, 
groupRebalanceConfig));
             final Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(time,
@@ -382,7 +389,9 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     new CompletableEventReaper(logContext),
                     applicationEventProcessorSupplier,
                     networkClientDelegateSupplier,
-                    requestManagersSupplier);
+                    requestManagersSupplier,
+                    kafkaConsumerMetrics
+            );
 
             this.rebalanceListenerInvoker = new 
ConsumerRebalanceListenerInvoker(
                     logContext,
@@ -402,8 +411,6 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     fetchMetricsManager,
                     time);
 
-            this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
CONSUMER_METRIC_GROUP_PREFIX);
-
             if (groupMetadata.get().isPresent() &&
                 
GroupProtocol.of(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG)) == 
GroupProtocol.CONSUMER) {
                 config.ignore(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG); // 
Used by background thread
@@ -460,10 +467,15 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
         this.defaultApiTimeoutMs = Duration.ofMillis(defaultApiTimeoutMs);
         this.deserializers = deserializers;
         this.applicationEventHandler = applicationEventHandler;
-        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
"consumer");
+        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
         this.clientTelemetryReporter = Optional.empty();
         this.autoCommitEnabled = autoCommitEnabled;
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
+        this.backgroundEventHandler = new BackgroundEventHandler(
+            backgroundEventQueue,
+            time,
+            kafkaConsumerMetrics
+        );
     }
 
     AsyncKafkaConsumer(LogContext logContext,
@@ -498,7 +510,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 deserializers,
                 fetchMetricsManager,
                 time);
-        this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, 
"consumer");
+        this.kafkaConsumerMetrics = new AsyncConsumerMetrics(metrics);
 
         GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
             config,
@@ -509,7 +521,11 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
 
         BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         this.backgroundEventQueue = new LinkedBlockingQueue<>();
-        BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+        this.backgroundEventHandler = new BackgroundEventHandler(
+            backgroundEventQueue,
+            time,
+            kafkaConsumerMetrics
+        );
         this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
             logContext,
             subscriptions,
@@ -524,7 +540,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             client,
             metadata,
             backgroundEventHandler,
-            false
+            false,
+            kafkaConsumerMetrics
         );
         this.offsetCommitCallbackInvoker = new 
OffsetCommitCallbackInvoker(interceptors);
         Supplier<RequestManagers> requestManagersSupplier = 
RequestManagers.supplier(
@@ -556,7 +573,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 new CompletableEventReaper(logContext),
                 applicationEventProcessorSupplier,
                 networkClientDelegateSupplier,
-                requestManagersSupplier);
+                requestManagersSupplier,
+                kafkaConsumerMetrics);
         this.backgroundEventProcessor = new BackgroundEventProcessor();
         this.backgroundEventReaper = new CompletableEventReaper(logContext);
     }
@@ -571,7 +589,8 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
             final CompletableEventReaper applicationEventReaper,
             final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
             final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
-            final Supplier<RequestManagers> requestManagersSupplier
+            final Supplier<RequestManagers> requestManagersSupplier,
+            final AsyncConsumerMetrics asyncConsumerMetrics
         );
 
     }
@@ -1941,25 +1960,30 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
      * It is possible that {@link ErrorEvent an error}
      * could occur when processing the events. In such cases, the processor 
will take a reference to the first
      * error, continue to process the remaining events, and then throw the 
first error that occurred.
+     *
+     * Visible for testing.
      */
-    private boolean processBackgroundEvents() {
+    boolean processBackgroundEvents() {
         AtomicReference<KafkaException> firstError = new AtomicReference<>();
 
-        LinkedList<BackgroundEvent> events = new LinkedList<>();
-        backgroundEventQueue.drainTo(events);
-
-        for (BackgroundEvent event : events) {
-            try {
-                if (event instanceof CompletableEvent)
-                    backgroundEventReaper.add((CompletableEvent<?>) event);
-
-                backgroundEventProcessor.process(event);
-            } catch (Throwable t) {
-                KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
-
-                if (!firstError.compareAndSet(null, e))
-                    log.warn("An error occurred when processing the background 
event: {}", e.getMessage(), e);
+        List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
+        if (!events.isEmpty()) {
+            long startMs = time.milliseconds();
+            for (BackgroundEvent event : events) {
+                
kafkaConsumerMetrics.recordBackgroundEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
+                try {
+                    if (event instanceof CompletableEvent)
+                        backgroundEventReaper.add((CompletableEvent<?>) event);
+
+                    backgroundEventProcessor.process(event);
+                } catch (Throwable t) {
+                    KafkaException e = 
ConsumerUtils.maybeWrapAsKafkaException(t);
+
+                    if (!firstError.compareAndSet(null, e))
+                        log.warn("An error occurred when processing the 
background event: {}", e.getMessage(), e);
+                }
             }
+            
kafkaConsumerMetrics.recordBackgroundEventQueueProcessingTime(time.milliseconds()
 - startMs);
         }
 
         backgroundEventReaper.reap(time.milliseconds());
@@ -2088,7 +2112,7 @@ public class AsyncKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
     }
 
     @Override
-    public KafkaConsumerMetrics kafkaConsumerMetrics() {
+    public AsyncConsumerMetrics kafkaConsumerMetrics() {
         return kafkaConsumerMetrics;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
index 7ca01ce2578..0e7b58acc21 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThread.java
@@ -23,6 +23,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
 import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.utils.KafkaThread;
@@ -62,6 +63,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
     private final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier;
     private final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier;
     private final Supplier<RequestManagers> requestManagersSupplier;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
     private ApplicationEventProcessor applicationEventProcessor;
     private NetworkClientDelegate networkClientDelegate;
     private RequestManagers requestManagers;
@@ -69,6 +71,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
     private final IdempotentCloser closer = new IdempotentCloser();
     private volatile Duration closeTimeout = 
Duration.ofMillis(DEFAULT_CLOSE_TIMEOUT_MS);
     private volatile long cachedMaximumTimeToWait = MAX_POLL_TIMEOUT_MS;
+    private long lastPollTimeMs = 0L;
 
     public ConsumerNetworkThread(LogContext logContext,
                                  Time time,
@@ -76,7 +79,8 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
                                  CompletableEventReaper applicationEventReaper,
                                  Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
                                  Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
-                                 Supplier<RequestManagers> 
requestManagersSupplier) {
+                                 Supplier<RequestManagers> 
requestManagersSupplier,
+                                 AsyncConsumerMetrics asyncConsumerMetrics) {
         super(BACKGROUND_THREAD_NAME, true);
         this.time = time;
         this.log = logContext.logger(getClass());
@@ -86,6 +90,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
         this.networkClientDelegateSupplier = networkClientDelegateSupplier;
         this.requestManagersSupplier = requestManagersSupplier;
         this.running = true;
+        this.asyncConsumerMetrics = asyncConsumerMetrics;
     }
 
     @Override
@@ -141,6 +146,11 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
         processApplicationEvents();
 
         final long currentTimeMs = time.milliseconds();
+        if (lastPollTimeMs != 0L) {
+            
asyncConsumerMetrics.recordTimeBetweenNetworkThreadPoll(currentTimeMs - 
lastPollTimeMs);
+        }
+        lastPollTimeMs = currentTimeMs;
+
         final long pollWaitTimeMs = requestManagers.entries().stream()
                 .filter(Optional::isPresent)
                 .map(Optional::get)
@@ -166,8 +176,13 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
     private void processApplicationEvents() {
         LinkedList<ApplicationEvent> events = new LinkedList<>();
         applicationEventQueue.drainTo(events);
+        if (events.isEmpty())
+            return;
 
+        asyncConsumerMetrics.recordApplicationEventQueueSize(0);
+        long startMs = time.milliseconds();
         for (ApplicationEvent event : events) {
+            
asyncConsumerMetrics.recordApplicationEventQueueTime(time.milliseconds() - 
event.enqueuedMs());
             try {
                 if (event instanceof CompletableEvent) {
                     applicationEventReaper.add((CompletableEvent<?>) event);
@@ -181,6 +196,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
                 log.warn("Error processing event {}", t.getMessage(), t);
             }
         }
+        
asyncConsumerMetrics.recordApplicationEventQueueProcessingTime(time.milliseconds()
 - startMs);
     }
 
     /**
@@ -189,7 +205,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
      * is given least one attempt to satisfy any network requests 
<em>before</em> checking if a timeout has expired.
      */
     private void reapExpiredApplicationEvents(long currentTimeMs) {
-        applicationEventReaper.reap(currentTimeMs);
+        
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(currentTimeMs));
     }
 
     /**
@@ -326,7 +342,7 @@ public class ConsumerNetworkThread extends KafkaThread 
implements Closeable {
             log.error("Unexpected error during shutdown. Proceed with 
closing.", e);
         } finally {
             sendUnsentRequests(timer);
-            applicationEventReaper.reap(applicationEventQueue);
+            
asyncConsumerMetrics.recordApplicationEventExpiredSize(applicationEventReaper.reap(applicationEventQueue));
 
             closeQuietly(requestManagers, "request managers");
             closeQuietly(networkClientDelegate, "network client delegate");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
index d803b66780b..e4b0fa924c0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerUtils.java
@@ -68,6 +68,7 @@ public final class ConsumerUtils {
     public static final String CONSUMER_SHARE_METRIC_GROUP_PREFIX = 
"consumer-share";
     public static final String COORDINATOR_METRICS_SUFFIX = 
"-coordinator-metrics";
     public static final String CONSUMER_METRICS_SUFFIX = "-metrics";
+    public static final String CONSUMER_METRIC_GROUP = 
CONSUMER_METRIC_GROUP_PREFIX + CONSUMER_METRICS_SUFFIX;
 
     /**
      * A fixed, large enough value will suffice for max.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index 6c1ab43a4f2..3c280e39d02 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
@@ -71,6 +72,7 @@ public class NetworkClientDelegate implements AutoCloseable {
     private final long retryBackoffMs;
     private Optional<Exception> metadataError;
     private final boolean notifyMetadataErrorsViaErrorQueue;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
 
     public NetworkClientDelegate(
             final Time time,
@@ -79,7 +81,8 @@ public class NetworkClientDelegate implements AutoCloseable {
             final KafkaClient client,
             final Metadata metadata,
             final BackgroundEventHandler backgroundEventHandler,
-            final boolean notifyMetadataErrorsViaErrorQueue) {
+            final boolean notifyMetadataErrorsViaErrorQueue,
+            final AsyncConsumerMetrics asyncConsumerMetrics) {
         this.time = time;
         this.client = client;
         this.metadata = metadata;
@@ -90,6 +93,7 @@ public class NetworkClientDelegate implements AutoCloseable {
         this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
         this.metadataError = Optional.empty();
         this.notifyMetadataErrorsViaErrorQueue = 
notifyMetadataErrorsViaErrorQueue;
+        this.asyncConsumerMetrics = asyncConsumerMetrics;
     }
 
     // Visible for testing
@@ -149,6 +153,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
         this.client.poll(pollTimeoutMs, currentTimeMs);
         maybePropagateMetadataError();
         checkDisconnects(currentTimeMs);
+        
asyncConsumerMetrics.recordUnsentRequestsQueueSize(unsentRequests.size(), 
currentTimeMs);
     }
 
     private void maybePropagateMetadataError() {
@@ -182,6 +187,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
             unsent.timer.update(currentTimeMs);
             if (unsent.timer.isExpired()) {
                 iterator.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
unsent.enqueueTimeMs());
                 unsent.handler.onFailure(currentTimeMs, new TimeoutException(
                     "Failed to send request after " + unsent.timer.timeoutMs() 
+ " ms."));
                 continue;
@@ -192,6 +198,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
                 continue;
             }
             iterator.remove();
+            
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
unsent.enqueueTimeMs());
         }
     }
 
@@ -219,6 +226,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
             UnsentRequest u = iter.next();
             if (u.node.isPresent() && client.connectionFailed(u.node.get())) {
                 iter.remove();
+                
asyncConsumerMetrics.recordUnsentRequestsQueueTime(time.milliseconds() - 
u.enqueueTimeMs());
                 AuthenticationException authenticationException = 
client.authenticationException(u.node.get());
                 u.handler.onFailure(currentTimeMs, authenticationException);
             }
@@ -282,6 +290,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
     public void add(final UnsentRequest r) {
         Objects.requireNonNull(r);
         r.setTimer(this.time, this.requestTimeoutMs);
+        r.setEnqueueTimeMs(time.milliseconds());
         unsentRequests.add(r);
     }
 
@@ -315,6 +324,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
         private final Optional<Node> node; // empty if random node can be 
chosen
 
         private Timer timer;
+        private long enqueueTimeMs; // time when the request was enqueued to 
unsentRequests, not duration in the queue.
 
         public UnsentRequest(final AbstractRequest.Builder<?> requestBuilder,
                              final Optional<Node> node) {
@@ -332,6 +342,20 @@ public class NetworkClientDelegate implements 
AutoCloseable {
             return timer;
         }
 
+        /**
+         * Set the time when the request was enqueued to {@link 
NetworkClientDelegate#unsentRequests}.
+         */
+        private void setEnqueueTimeMs(final long enqueueTimeMs) {
+            this.enqueueTimeMs = enqueueTimeMs;
+        }
+
+        /**
+         * Return the time when the request was enqueued to {@link 
NetworkClientDelegate#unsentRequests}.
+         */
+        private long enqueueTimeMs() {
+            return enqueueTimeMs;
+        }
+
         CompletableFuture<ClientResponse> future() {
             return handler.future;
         }
@@ -428,7 +452,8 @@ public class NetworkClientDelegate implements AutoCloseable 
{
                                                            final Sensor 
throttleTimeSensor,
                                                            final 
ClientTelemetrySender clientTelemetrySender,
                                                            final 
BackgroundEventHandler backgroundEventHandler,
-                                                           final boolean 
notifyMetadataErrorsViaErrorQueue) {
+                                                           final boolean 
notifyMetadataErrorsViaErrorQueue,
+                                                           final 
AsyncConsumerMetrics asyncConsumerMetrics) {
         return new CachedSupplier<>() {
             @Override
             protected NetworkClientDelegate create() {
@@ -442,7 +467,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
                         metadata,
                         throttleTimeSensor,
                         clientTelemetrySender);
-                return new NetworkClientDelegate(time, config, logContext, 
client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+                return new NetworkClientDelegate(time, config, logContext, 
client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, 
asyncConsumerMetrics);
             }
         };
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
index 0ca371253bc..4a39c75745e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImpl.java
@@ -47,6 +47,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCo
 import org.apache.kafka.clients.consumer.internals.events.ShareFetchEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareSubscriptionChangeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareUnsubscribeEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.KafkaShareConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -160,6 +161,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
     private final ApplicationEventHandler applicationEventHandler;
     private final Time time;
     private final KafkaShareConsumerMetrics kafkaShareConsumerMetrics;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
     private Logger log;
     private final String clientId;
     private final String groupId;
@@ -252,6 +254,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             this.clientTelemetryReporter = 
CommonClientConfigs.telemetryReporter(clientId, config);
             this.clientTelemetryReporter.ifPresent(reporters::add);
             this.metrics = createMetrics(config, time, reporters);
+            this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
 
             this.deserializers = new Deserializers<>(config, keyDeserializer, 
valueDeserializer);
             this.currentFetch = ShareFetch.empty();
@@ -266,7 +269,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             ShareFetchMetricsManager shareFetchMetricsManager = 
createShareFetchMetricsManager(metrics);
             ApiVersions apiVersions = new ApiVersions();
             final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
-            final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+            final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+                backgroundEventQueue, time, asyncConsumerMetrics);
 
             // This FetchBuffer is shared between the application and network 
threads.
             this.fetchBuffer = new ShareFetchBuffer(logContext);
@@ -280,7 +284,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     shareFetchMetricsManager.throttleTimeSensor(),
                     
clientTelemetryReporter.map(ClientTelemetryReporter::telemetrySender).orElse(null),
                     backgroundEventHandler,
-                    true
+                    true,
+                    asyncConsumerMetrics
             );
             this.completedAcknowledgements = new LinkedList<>();
 
@@ -311,7 +316,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                     new CompletableEventReaper(logContext),
                     applicationEventProcessorSupplier,
                     networkClientDelegateSupplier,
-                    requestManagersSupplier);
+                    requestManagersSupplier,
+                    asyncConsumerMetrics);
 
             this.backgroundEventProcessor = new BackgroundEventProcessor();
             this.backgroundEventReaper = 
backgroundEventReaperFactory.build(logContext);
@@ -373,13 +379,15 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 new FetchConfig(config),
                 deserializers);
         this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
 
         final BlockingQueue<ApplicationEvent> applicationEventQueue = new 
LinkedBlockingQueue<>();
         final BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
-        final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+        final BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+            backgroundEventQueue, time, asyncConsumerMetrics);
 
         final Supplier<NetworkClientDelegate> networkClientDelegateSupplier =
-                () -> new NetworkClientDelegate(time, config, logContext, 
client, metadata, backgroundEventHandler, true);
+                () -> new NetworkClientDelegate(time, config, logContext, 
client, metadata, backgroundEventHandler, true, asyncConsumerMetrics);
 
         GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
                 config,
@@ -412,7 +420,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 new CompletableEventReaper(logContext),
                 applicationEventProcessorSupplier,
                 networkClientDelegateSupplier,
-                requestManagersSupplier);
+                requestManagersSupplier,
+                asyncConsumerMetrics);
 
         this.backgroundEventQueue = new LinkedBlockingQueue<>();
         this.backgroundEventProcessor = new BackgroundEventProcessor();
@@ -458,6 +467,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
         this.kafkaShareConsumerMetrics = new 
KafkaShareConsumerMetrics(metrics, CONSUMER_SHARE_METRIC_GROUP_PREFIX);
         this.clientTelemetryReporter = Optional.empty();
         this.completedAcknowledgements = Collections.emptyList();
+        this.asyncConsumerMetrics = new AsyncConsumerMetrics(metrics);
     }
 
     // auxiliary interface for testing
@@ -470,7 +480,8 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
                 final CompletableEventReaper applicationEventReaper,
                 final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
                 final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
-                final Supplier<RequestManagers> requestManagersSupplier
+                final Supplier<RequestManagers> requestManagersSupplier,
+                final AsyncConsumerMetrics asyncConsumerMetrics
         );
     }
 
@@ -855,6 +866,7 @@ public class ShareConsumerImpl<K, V> implements 
ShareConsumerDelegate<K, V> {
             backgroundEventReaper.reap(backgroundEventQueue);
 
         closeQuietly(kafkaShareConsumerMetrics, "kafka share consumer 
metrics", firstException);
+        closeQuietly(asyncConsumerMetrics, "kafka async consumer metrics", 
firstException);
         closeQuietly(metrics, "consumer metrics", firstException);
         closeQuietly(deserializers, "consumer deserializers", firstException);
         clientTelemetryReporter.ifPresent(reporter -> closeQuietly(reporter, 
"consumer telemetry reporter", firstException));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
index c30c0c2edde..dfb775f8947 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEvent.java
@@ -51,6 +51,12 @@ public abstract class ApplicationEvent {
      */
     private final Uuid id;
 
+    /**
+     * The time in milliseconds when this event was enqueued.
+     * This field can be changed after the event is created, so it should not 
be used in hashCode or equals.
+     */
+    private long enqueuedMs;
+
     protected ApplicationEvent(Type type) {
         this.type = Objects.requireNonNull(type);
         this.id = Uuid.randomUuid();
@@ -64,6 +70,14 @@ public abstract class ApplicationEvent {
         return id;
     }
 
+    public void setEnqueuedMs(long enqueuedMs) {
+        this.enqueuedMs = enqueuedMs;
+    }
+
+    public long enqueuedMs() {
+        return enqueuedMs;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (this == o) return true;
@@ -78,7 +92,7 @@ public abstract class ApplicationEvent {
     }
 
     protected String toStringBase() {
-        return "type=" + type + ", id=" + id;
+        return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
index 0baafcd3038..dd6a1666c7b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventHandler.java
@@ -20,6 +20,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
 import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
 import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
 import org.apache.kafka.clients.consumer.internals.RequestManagers;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.internals.IdempotentCloser;
 import org.apache.kafka.common.utils.LogContext;
@@ -42,9 +43,11 @@ import java.util.function.Supplier;
 public class ApplicationEventHandler implements Closeable {
 
     private final Logger log;
+    private final Time time;
     private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final ConsumerNetworkThread networkThread;
     private final IdempotentCloser closer = new IdempotentCloser();
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
 
     public ApplicationEventHandler(final LogContext logContext,
                                    final Time time,
@@ -52,16 +55,20 @@ public class ApplicationEventHandler implements Closeable {
                                    final CompletableEventReaper 
applicationEventReaper,
                                    final Supplier<ApplicationEventProcessor> 
applicationEventProcessorSupplier,
                                    final Supplier<NetworkClientDelegate> 
networkClientDelegateSupplier,
-                                   final Supplier<RequestManagers> 
requestManagersSupplier) {
+                                   final Supplier<RequestManagers> 
requestManagersSupplier,
+                                   final AsyncConsumerMetrics 
asyncConsumerMetrics) {
         this.log = logContext.logger(ApplicationEventHandler.class);
+        this.time = time;
         this.applicationEventQueue = applicationEventQueue;
+        this.asyncConsumerMetrics = asyncConsumerMetrics;
         this.networkThread = new ConsumerNetworkThread(logContext,
                 time,
                 applicationEventQueue,
                 applicationEventReaper,
                 applicationEventProcessorSupplier,
                 networkClientDelegateSupplier,
-                requestManagersSupplier);
+                requestManagersSupplier,
+                asyncConsumerMetrics);
         this.networkThread.start();
     }
 
@@ -73,7 +80,9 @@ public class ApplicationEventHandler implements Closeable {
      */
     public void add(final ApplicationEvent event) {
         Objects.requireNonNull(event, "ApplicationEvent provided to add must 
be non-null");
+        event.setEnqueuedMs(time.milliseconds());
         applicationEventQueue.add(event);
+        
asyncConsumerMetrics.recordApplicationEventQueueSize(applicationEventQueue.size());
         wakeupNetworkThread();
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
index 7e9fdaed2d8..02fc4b4a29b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEvent.java
@@ -38,6 +38,12 @@ public abstract class BackgroundEvent {
      */
     private final Uuid id;
 
+    /**
+     * The time in milliseconds when this event was enqueued.
+     * This field can be changed after the event is created, so it should not 
be used in hashCode or equals.
+     */
+    private long enqueuedMs;
+
     protected BackgroundEvent(Type type) {
         this.type = Objects.requireNonNull(type);
         this.id = Uuid.randomUuid();
@@ -51,6 +57,14 @@ public abstract class BackgroundEvent {
         return id;
     }
 
+    public void setEnqueuedMs(long enqueuedMs) {
+        this.enqueuedMs = enqueuedMs;
+    }
+
+    public long enqueuedMs() {
+        return enqueuedMs;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (this == o) return true;
@@ -65,7 +79,7 @@ public abstract class BackgroundEvent {
     }
 
     protected String toStringBase() {
-        return "type=" + type + ", id=" + id;
+        return "type=" + type + ", id=" + id + ", enqueuedMs=" + enqueuedMs;
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
index f6ded0bf735..adc621d5f2e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/BackgroundEventHandler.java
@@ -17,9 +17,13 @@
 package org.apache.kafka.clients.consumer.internals.events;
 
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.utils.Time;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
-import java.util.Queue;
+import java.util.concurrent.BlockingQueue;
 
 /**
  * An event handler that receives {@link BackgroundEvent background events} 
from the
@@ -29,10 +33,16 @@ import java.util.Queue;
 
 public class BackgroundEventHandler {
 
-    private final Queue<BackgroundEvent> backgroundEventQueue;
+    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
+    private final Time time;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
 
-    public BackgroundEventHandler(final Queue<BackgroundEvent> 
backgroundEventQueue) {
+    public BackgroundEventHandler(final BlockingQueue<BackgroundEvent> 
backgroundEventQueue,
+                                  final Time time,
+                                  final AsyncConsumerMetrics 
asyncConsumerMetrics) {
         this.backgroundEventQueue = backgroundEventQueue;
+        this.time = time;
+        this.asyncConsumerMetrics = asyncConsumerMetrics;
     }
 
     /**
@@ -42,6 +52,20 @@ public class BackgroundEventHandler {
      */
     public void add(BackgroundEvent event) {
         Objects.requireNonNull(event, "BackgroundEvent provided to add must be 
non-null");
+        event.setEnqueuedMs(time.milliseconds());
         backgroundEventQueue.add(event);
+        
asyncConsumerMetrics.recordBackgroundEventQueueSize(backgroundEventQueue.size());
+    }
+
+    /**
+     * Drain all the {@link BackgroundEvent events} from the handler.
+     *
+     * @return A list of {@link BackgroundEvent events} that were drained
+     */
+    public List<BackgroundEvent> drainEvents() {
+        List<BackgroundEvent> events = new ArrayList<>();
+        backgroundEventQueue.drainTo(events);
+        asyncConsumerMetrics.recordBackgroundEventQueueSize(0);
+        return events;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
index 019526836af..5a0358df896 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaper.java
@@ -82,8 +82,9 @@ public class CompletableEventReaper {
      *
      * @param currentTimeMs <em>Current</em> time with which to compare 
against the
      *                      <em>{@link CompletableEvent#deadlineMs() 
expiration time}</em>
+     * @return The number of events that were expired
      */
-    public void reap(long currentTimeMs) {
+    public long reap(long currentTimeMs) {
         Consumer<CompletableEvent<?>> expireEvent = event -> {
             long pastDueMs = currentTimeMs - event.deadlineMs();
             TimeoutException error = new TimeoutException(String.format("%s 
was %s ms past its expiration of %s", event.getClass().getSimpleName(), 
pastDueMs, event.deadlineMs()));
@@ -96,13 +97,16 @@ public class CompletableEventReaper {
         };
 
         // First, complete (exceptionally) any events that have passed their 
deadline AND aren't already complete.
-        tracked.stream()
+        long count = tracked.stream()
             .filter(e -> !e.future().isDone())
             .filter(e -> currentTimeMs >= e.deadlineMs())
-            .forEach(expireEvent);
+            .peek(expireEvent)
+            .count();
         // Second, remove any events that are already complete, just to make 
sure we don't hold references. This will
         // include any events that finished successfully as well as any events 
we just completed exceptionally above.
         tracked.removeIf(e -> e.future().isDone());
+
+        return count;
     }
 
     /**
@@ -122,8 +126,9 @@ public class CompletableEventReaper {
      * don't take the deadline into consideration, just close it regardless.
      *
      * @param events Events from a queue that have not yet been tracked that 
also need to be reviewed
+     * @return The number of events that were expired
      */
-    public void reap(Collection<?> events) {
+    public long reap(Collection<?> events) {
         Objects.requireNonNull(events, "Event queue to reap must be non-null");
 
         Consumer<CompletableEvent<?>> expireEvent = event -> {
@@ -136,17 +141,20 @@ public class CompletableEventReaper {
             }
         };
 
-        tracked.stream()
+        long trackedExpiredCount = tracked.stream()
             .filter(e -> !e.future().isDone())
-            .forEach(expireEvent);
+            .peek(expireEvent)
+            .count();
         tracked.clear();
 
-        events.stream()
+        long eventExpiredCount = events.stream()
             .filter(e -> e instanceof CompletableEvent<?>)
             .map(e -> (CompletableEvent<?>) e)
             .filter(e -> !e.future().isDone())
-            .forEach(expireEvent);
+            .peek(expireEvent)
+            .count();
         events.clear();
+        return trackedExpiredCount + eventExpiredCount;
     }
 
     public int size() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
new file mode 100644
index 00000000000..09e84cbe985
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetrics.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Arrays;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP_PREFIX;
+
+public class AsyncConsumerMetrics extends KafkaConsumerMetrics implements 
AutoCloseable {
+    private final Metrics metrics;
+
+    public static final String TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME = 
"time-between-network-thread-poll";
+    public static final String APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"application-event-queue-size";
+    public static final String APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME = 
"application-event-queue-time";
+    public static final String 
APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"application-event-queue-processing-time";
+    public static final String APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME = 
"application-events-expired-count";
+    public static final String BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME = 
"background-event-queue-size";
+    public static final String BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME = 
"background-event-queue-time";
+    public static final String 
BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME = 
"background-event-queue-processing-time";
+    public static final String UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME = 
"unsent-requests-queue-size";
+    public static final String UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME = 
"unsent-requests-queue-time";
+    private final Sensor timeBetweenNetworkThreadPollSensor;
+    private final Sensor applicationEventQueueSizeSensor;
+    private final Sensor applicationEventQueueTimeSensor;
+    private final Sensor applicationEventQueueProcessingTimeSensor;
+    private final Sensor applicationEventExpiredSizeSensor;
+    private final Sensor backgroundEventQueueSizeSensor;
+    private final Sensor backgroundEventQueueTimeSensor;
+    private final Sensor backgroundEventQueueProcessingTimeSensor;
+    private final Sensor unsentRequestsQueueSizeSensor;
+    private final Sensor unsentRequestsQueueTimeSensor;
+
+    public AsyncConsumerMetrics(Metrics metrics) {
+        super(metrics, CONSUMER_METRIC_GROUP_PREFIX);
+
+        this.metrics = metrics;
+        this.timeBetweenNetworkThreadPollSensor = 
metrics.sensor(TIME_BETWEEN_NETWORK_THREAD_POLL_SENSOR_NAME);
+        this.timeBetweenNetworkThreadPollSensor.add(
+            metrics.metricName(
+                "time-between-network-thread-poll-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time taken, in milliseconds, between each poll in 
the network thread."
+            ),
+            new Avg()
+        );
+        this.timeBetweenNetworkThreadPollSensor.add(
+            metrics.metricName(
+                "time-between-network-thread-poll-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time taken, in milliseconds, between each poll in 
the network thread."
+            ),
+            new Max()
+        );
+
+        this.applicationEventQueueSizeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME);
+        this.applicationEventQueueSizeSensor.add(
+            metrics.metricName(
+                APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
+                CONSUMER_METRIC_GROUP,
+                "The current number of events in the queue to send from the 
application thread to the background thread."
+            ),
+            new Value()
+        );
+
+        this.applicationEventQueueTimeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_TIME_SENSOR_NAME);
+        this.applicationEventQueueTimeSensor.add(
+            metrics.metricName(
+                "application-event-queue-time-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time, in milliseconds, that application events 
are taking to be dequeued."
+            ),
+            new Avg()
+        );
+        this.applicationEventQueueTimeSensor.add(
+            metrics.metricName(
+                "application-event-queue-time-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time, in milliseconds, that an application event 
took to be dequeued."
+            ),
+            new Max()
+        );
+
+        this.applicationEventQueueProcessingTimeSensor = 
metrics.sensor(APPLICATION_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+        this.applicationEventQueueProcessingTimeSensor.add(
+            metrics.metricName(
+                "application-event-queue-processing-time-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time, in milliseconds, that the background thread 
takes to process all available application events."
+            ),
+            new Avg()
+        );
+        this.applicationEventQueueProcessingTimeSensor.add(
+            metrics.metricName("application-event-queue-processing-time-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time, in milliseconds, that the background thread 
took to process all available application events."
+            ),
+            new Max()
+        );
+
+        this.applicationEventExpiredSizeSensor = 
metrics.sensor(APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME);
+        this.applicationEventExpiredSizeSensor.add(
+            metrics.metricName(
+                APPLICATION_EVENT_EXPIRED_SIZE_SENSOR_NAME,
+                CONSUMER_METRIC_GROUP,
+                "The current number of expired application events."
+            ),
+            new Value()
+        );
+
+        this.unsentRequestsQueueSizeSensor = 
metrics.sensor(UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME);
+        this.unsentRequestsQueueSizeSensor.add(
+            metrics.metricName(
+                UNSENT_REQUESTS_QUEUE_SIZE_SENSOR_NAME,
+                CONSUMER_METRIC_GROUP,
+                "The current number of unsent requests in the background 
thread."
+            ),
+            new Value()
+        );
+
+        this.unsentRequestsQueueTimeSensor = 
metrics.sensor(UNSENT_REQUESTS_QUEUE_TIME_SENSOR_NAME);
+        this.unsentRequestsQueueTimeSensor.add(
+            metrics.metricName(
+                "unsent-requests-queue-time-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time, in milliseconds, that requests are taking 
to be sent in the background thread."
+            ),
+            new Avg()
+        );
+        this.unsentRequestsQueueTimeSensor.add(
+            metrics.metricName(
+                "unsent-requests-queue-time-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time, in milliseconds, that a request remained 
unsent in the background thread."
+            ),
+            new Max()
+        );
+
+        this.backgroundEventQueueSizeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME);
+        this.backgroundEventQueueSizeSensor.add(
+            metrics.metricName(
+                BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME,
+                CONSUMER_METRIC_GROUP,
+                "The current number of events in the queue to send from the 
background thread to the application thread."
+            ),
+            new Value()
+        );
+
+        this.backgroundEventQueueTimeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_TIME_SENSOR_NAME);
+        this.backgroundEventQueueTimeSensor.add(
+            metrics.metricName(
+                "background-event-queue-time-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time, in milliseconds, that background events are 
taking to be dequeued."
+            ),
+            new Avg()
+        );
+        this.backgroundEventQueueTimeSensor.add(
+            metrics.metricName(
+                "background-event-queue-time-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time, in milliseconds, that background events are 
taking to be dequeued."
+            ),
+            new Max()
+        );
+
+        this.backgroundEventQueueProcessingTimeSensor = 
metrics.sensor(BACKGROUND_EVENT_QUEUE_PROCESSING_TIME_SENSOR_NAME);
+        this.backgroundEventQueueProcessingTimeSensor.add(
+            metrics.metricName(
+                "background-event-queue-processing-time-avg",
+                CONSUMER_METRIC_GROUP,
+                "The average time, in milliseconds, that the consumer took to 
process all available background events."
+            ),
+            new Avg()
+        );
+        this.backgroundEventQueueProcessingTimeSensor.add(
+            metrics.metricName(
+                "background-event-queue-processing-time-max",
+                CONSUMER_METRIC_GROUP,
+                "The maximum time, in milliseconds, that the consumer took to 
process all available background events."
+            ),
+            new Max()
+        );
+    }
+
+    public void recordTimeBetweenNetworkThreadPoll(long 
timeBetweenNetworkThreadPoll) {
+        
this.timeBetweenNetworkThreadPollSensor.record(timeBetweenNetworkThreadPoll);
+    }
+
+    public void recordApplicationEventQueueSize(int size) {
+        this.applicationEventQueueSizeSensor.record(size);
+    }
+
+    public void recordApplicationEventQueueTime(long time) {
+        this.applicationEventQueueTimeSensor.record(time);
+    }
+
+    public void recordApplicationEventQueueProcessingTime(long processingTime) 
{
+        this.applicationEventQueueProcessingTimeSensor.record(processingTime);
+    }
+
+    public void recordApplicationEventExpiredSize(long size) {
+        this.applicationEventExpiredSizeSensor.record(size);
+    }
+
+    public void recordUnsentRequestsQueueSize(int size, long timeMs) {
+        this.unsentRequestsQueueSizeSensor.record(size, timeMs);
+    }
+
+    public void recordUnsentRequestsQueueTime(long time) {
+        this.unsentRequestsQueueTimeSensor.record(time);
+    }
+
+    public void recordBackgroundEventQueueSize(int size) {
+        this.backgroundEventQueueSizeSensor.record(size);
+    }
+
+    public void recordBackgroundEventQueueTime(long time) {
+        this.backgroundEventQueueTimeSensor.record(time);
+    }
+
+    public void recordBackgroundEventQueueProcessingTime(long processingTime) {
+        this.backgroundEventQueueProcessingTimeSensor.record(processingTime);
+    }
+
+    @Override
+    public void close() {
+        Arrays.asList(
+            timeBetweenNetworkThreadPollSensor.name(),
+            applicationEventQueueSizeSensor.name(),
+            applicationEventQueueTimeSensor.name(),
+            applicationEventQueueProcessingTimeSensor.name(),
+            applicationEventExpiredSizeSensor.name(),
+            backgroundEventQueueSizeSensor.name(),
+            backgroundEventQueueTimeSensor.name(),
+            backgroundEventQueueProcessingTimeSensor.name(),
+            unsentRequestsQueueSizeSensor.name(),
+            unsentRequestsQueueTimeSensor.name()
+        ).forEach(metrics::removeSensor);
+        super.close();
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
new file mode 100644
index 00000000000..a8ce990a23d
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ApplicationEventHandlerTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
+import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class ApplicationEventHandlerTest {
+    private final Time time = new MockTime();
+    private final BlockingQueue<ApplicationEvent> applicationEventsQueue =  
new LinkedBlockingQueue<>();
+    private final ApplicationEventProcessor applicationEventProcessor = 
mock(ApplicationEventProcessor.class);
+    private final NetworkClientDelegate networkClientDelegate = 
mock(NetworkClientDelegate.class);
+    private final RequestManagers requestManagers = 
mock(RequestManagers.class);
+    private final CompletableEventReaper applicationEventReaper = 
mock(CompletableEventReaper.class);
+
+    @Test
+    public void testRecordApplicationEventQueueSize() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             ApplicationEventHandler applicationEventHandler = new 
ApplicationEventHandler(
+                     new LogContext(),
+                     time,
+                     applicationEventsQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            // add event
+            applicationEventHandler.add(new PollEvent(time.milliseconds()));
+            assertEquals(
+                1,
+                (double) metrics.metric(
+                    metrics.metricName(
+                        
AsyncConsumerMetrics.APPLICATION_EVENT_QUEUE_SIZE_SENSOR_NAME,
+                        ConsumerUtils.CONSUMER_METRIC_GROUP
+                    )
+                ).metricValue()
+            );
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 5239482bdd0..819365e9712 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -57,6 +57,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscr
 import 
org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
 import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.Node;
@@ -133,6 +134,7 @@ import static 
org.apache.kafka.clients.consumer.internals.AbstractMembershipMana
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
@@ -211,7 +213,7 @@ public class AsyncKafkaConsumerTest {
             new StringDeserializer(),
             new StringDeserializer(),
             time,
-            (a, b, c, d, e, f, g) -> applicationEventHandler,
+            (a, b, c, d, e, f, g, h) -> applicationEventHandler,
             a -> backgroundEventReaper,
             (a, b, c, d, e, f, g) -> fetchCollector,
             (a, b, c, d) -> metadata,
@@ -225,7 +227,7 @@ public class AsyncKafkaConsumerTest {
             new StringDeserializer(),
             new StringDeserializer(),
             time,
-            (a, b, c, d, e, f, g) -> applicationEventHandler,
+            (a, b, c, d, e, f, g, h) -> applicationEventHandler,
             a -> backgroundEventReaper,
             (a, b, c, d, e, f, g) -> fetchCollector,
             (a, b, c, d) -> metadata,
@@ -358,7 +360,7 @@ public class AsyncKafkaConsumerTest {
         assertEquals(topicPartitionOffsets, 
consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000)));
         
verify(applicationEventHandler).addAndGet(ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
         final Metric metric = consumer.metrics()
-            
.get(consumer.metricsRegistry().metricName("committed-time-ns-total", 
"consumer-metrics"));
+            
.get(consumer.metricsRegistry().metricName("committed-time-ns-total", 
CONSUMER_METRIC_GROUP));
         assertTrue((double) metric.metricValue() > 0);
     }
 
@@ -1915,6 +1917,31 @@ public class AsyncKafkaConsumerTest {
         }, "Consumer did not throw the expected UnsupportedVersionException on 
poll");
     }
 
+    @Test
+    public void 
testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
+        consumer = newConsumer(
+                mock(FetchBuffer.class),
+                mock(ConsumerInterceptors.class),
+                mock(ConsumerRebalanceListenerInvoker.class),
+                mock(SubscriptionState.class),
+                "group-id",
+                "client-id",
+                false);
+        Metrics metrics = consumer.metricsRegistry();
+        AsyncConsumerMetrics kafkaConsumerMetrics = 
consumer.kafkaConsumerMetrics();
+
+        ConsumerRebalanceListenerCallbackNeededEvent event = new 
ConsumerRebalanceListenerCallbackNeededEvent(ON_PARTITIONS_REVOKED, 
Collections.emptySortedSet());
+        event.setEnqueuedMs(time.milliseconds());
+        backgroundEventQueue.add(event);
+        kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
+
+        time.sleep(10);
+        consumer.processBackgroundEvents();
+        assertEquals(0, (double) 
metrics.metric(metrics.metricName("background-event-queue-size", 
CONSUMER_METRIC_GROUP)).metricValue());
+        assertEquals(10, (double) 
metrics.metric(metrics.metricName("background-event-queue-time-avg", 
CONSUMER_METRIC_GROUP)).metricValue());
+        assertEquals(10, (double) 
metrics.metric(metrics.metricName("background-event-queue-time-max", 
CONSUMER_METRIC_GROUP)).metricValue());
+    }
+
     private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
         final TopicPartition t0 = new TopicPartition("t0", 2);
         final TopicPartition t1 = new TopicPartition("t0", 3);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
new file mode 100644
index 00000000000..63269b6f554
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BackgroundEventHandlerTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
+import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics.BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class BackgroundEventHandlerTest {
+    private final BlockingQueue<BackgroundEvent> backgroundEventsQueue =  new 
LinkedBlockingQueue<>();
+
+    @Test
+    public void testRecordBackgroundEventQueueSize() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics)) {
+            BackgroundEventHandler backgroundEventHandler = new 
BackgroundEventHandler(
+                backgroundEventsQueue,
+                new MockTime(0),
+                asyncConsumerMetrics);
+            // add event
+            backgroundEventHandler.add(new ErrorEvent(new Throwable()));
+            assertEquals(
+                1,
+                (double) metrics.metric(
+                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+
+            // drain event
+            backgroundEventHandler.drainEvents();
+            assertEquals(
+                0,
+                (double) metrics.metric(
+                    
metrics.metricName(BACKGROUND_EVENT_QUEUE_SIZE_SENSOR_NAME, 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 9d09aee2697..9517e04e054 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -21,6 +21,7 @@ import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
 import 
org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
 import org.apache.kafka.common.KafkaException;
@@ -117,8 +118,8 @@ public class ConsumerMembershipManagerTest {
         subscriptionState = mock(SubscriptionState.class);
         commitRequestManager = mock(CommitRequestManager.class);
         backgroundEventQueue = new LinkedBlockingQueue<>();
-        backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
         time = new MockTime(0);
+        backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue, time, 
mock(AsyncConsumerMetrics.class));
         metrics = new Metrics(time);
         rebalanceMetricsManager = new ConsumerRebalanceMetricsManager(metrics);
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
index 0e85fe2d799..520279fc8d4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java
@@ -19,6 +19,9 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
 import 
org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
+import org.apache.kafka.clients.consumer.internals.events.PollEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -40,6 +43,7 @@ import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -53,7 +57,7 @@ import static org.mockito.Mockito.when;
 
 public class ConsumerNetworkThreadTest {
     private final Time time;
-    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
+    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
     private final ApplicationEventProcessor applicationEventProcessor;
     private final OffsetsRequestManager offsetsRequestManager;
     private final ConsumerHeartbeatRequestManager heartbeatRequestManager;
@@ -62,6 +66,7 @@ public class ConsumerNetworkThreadTest {
     private final NetworkClientDelegate networkClientDelegate;
     private final RequestManagers requestManagers;
     private final CompletableEventReaper applicationEventReaper;
+    private final AsyncConsumerMetrics asyncConsumerMetrics;
 
     ConsumerNetworkThreadTest() {
         this.networkClientDelegate = mock(NetworkClientDelegate.class);
@@ -72,17 +77,19 @@ public class ConsumerNetworkThreadTest {
         this.applicationEventProcessor = mock(ApplicationEventProcessor.class);
         this.applicationEventReaper = mock(CompletableEventReaper.class);
         this.time = new MockTime();
-        this.applicationEventsQueue = new LinkedBlockingQueue<>();
+        this.applicationEventQueue = new LinkedBlockingQueue<>();
+        this.asyncConsumerMetrics = mock(AsyncConsumerMetrics.class);
         LogContext logContext = new LogContext();
 
         this.consumerNetworkThread = new ConsumerNetworkThread(
                 logContext,
                 time,
-                applicationEventsQueue,
+                applicationEventQueue,
                 applicationEventReaper,
                 () -> applicationEventProcessor,
                 () -> networkClientDelegate,
-                () -> requestManagers
+                () -> requestManagers,
+                asyncConsumerMetrics
         );
     }
 
@@ -183,14 +190,18 @@ public class ConsumerNetworkThreadTest {
     public void testCleanupInvokesReaper() {
         LinkedList<NetworkClientDelegate.UnsentRequest> queue = new 
LinkedList<>();
         when(networkClientDelegate.unsentRequests()).thenReturn(queue);
+        
when(applicationEventReaper.reap(applicationEventQueue)).thenReturn(1L);
         consumerNetworkThread.cleanup();
-        verify(applicationEventReaper).reap(applicationEventsQueue);
+        verify(applicationEventReaper).reap(applicationEventQueue);
+        verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L);
     }
 
     @Test
     public void testRunOnceInvokesReaper() {
+        when(applicationEventReaper.reap(any(Long.class))).thenReturn(1L);
         consumerNetworkThread.runOnce();
         verify(applicationEventReaper).reap(any(Long.class));
+        verify(asyncConsumerMetrics).recordApplicationEventExpiredSize(1L);
     }
 
     @Test
@@ -199,4 +210,82 @@ public class ConsumerNetworkThreadTest {
         consumerNetworkThread.cleanup();
         verify(networkClientDelegate, times(2)).poll(anyLong(), anyLong());
     }
+
+    @Test
+    public void testRunOnceRecordTimeBetweenNetworkThreadPoll() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             ConsumerNetworkThread consumerNetworkThread = new 
ConsumerNetworkThread(
+                     new LogContext(),
+                     time,
+                     applicationEventQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            consumerNetworkThread.initializeResources();
+
+            consumerNetworkThread.runOnce();
+            time.sleep(10);
+            consumerNetworkThread.runOnce();
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("time-between-network-thread-poll-avg", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("time-between-network-thread-poll-max", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+        }
+    }
+
+    @Test
+    public void 
testRunOnceRecordApplicationEventQueueSizeAndApplicationEventQueueTime() {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             ConsumerNetworkThread consumerNetworkThread = new 
ConsumerNetworkThread(
+                     new LogContext(),
+                     time,
+                     applicationEventQueue,
+                     applicationEventReaper,
+                     () -> applicationEventProcessor,
+                     () -> networkClientDelegate,
+                     () -> requestManagers,
+                     asyncConsumerMetrics
+             )) {
+            consumerNetworkThread.initializeResources();
+
+            PollEvent event = new PollEvent(0);
+            event.setEnqueuedMs(time.milliseconds());
+            applicationEventQueue.add(event);
+            asyncConsumerMetrics.recordApplicationEventQueueSize(1);
+
+            time.sleep(10);
+            consumerNetworkThread.runOnce();
+            assertEquals(
+                0,
+                (double) metrics.metric(
+                    metrics.metricName("application-event-queue-size", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("application-event-queue-time-avg", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("application-event-queue-time-max", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index e97efd2b75f..ab665aac695 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
@@ -3779,7 +3780,7 @@ public class FetchRequestManagerTest {
                                              Metadata metadata,
                                              BackgroundEventHandler 
backgroundEventHandler,
                                              boolean 
notifyMetadataErrorsViaErrorQueue) {
-            super(time, config, logContext, client, metadata, 
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+            super(time, config, logContext, client, metadata, 
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, 
mock(AsyncConsumerMetrics.class));
         }
 
         @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
index 4177419fa53..81eb5187fec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegateTest.java
@@ -23,11 +23,13 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.message.FindCoordinatorRequestData;
+import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
 import org.apache.kafka.common.requests.FindCoordinatorResponse;
@@ -41,15 +43,17 @@ import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.LinkedList;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -228,8 +232,8 @@ public class NetworkClientDelegateTest {
         AuthenticationException authException = new 
AuthenticationException("Test Auth Exception");
         doThrow(authException).when(metadata).maybeThrowAnyException();
 
-        LinkedList<BackgroundEvent> backgroundEventQueue = new LinkedList<>();
-        this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue);
+        BlockingQueue<BackgroundEvent> backgroundEventQueue = new 
LinkedBlockingQueue<>();
+        this.backgroundEventHandler = new 
BackgroundEventHandler(backgroundEventQueue, time, 
mock(AsyncConsumerMetrics.class));
         NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(true);
 
         assertEquals(0, backgroundEventQueue.size());
@@ -242,20 +246,59 @@ public class NetworkClientDelegateTest {
         assertEquals(authException, ((ErrorEvent) event).error());
     }
 
+    @Test
+    public void testRecordUnsentRequestsQueueTime() throws Exception {
+        try (Metrics metrics = new Metrics();
+             AsyncConsumerMetrics asyncConsumerMetrics = new 
AsyncConsumerMetrics(metrics);
+             NetworkClientDelegate networkClientDelegate = 
newNetworkClientDelegate(false, asyncConsumerMetrics)) {
+            NetworkClientDelegate.UnsentRequest unsentRequest = 
newUnsentFindCoordinatorRequest();
+            networkClientDelegate.add(unsentRequest);
+            asyncConsumerMetrics.recordUnsentRequestsQueueSize(1, 
time.milliseconds());
+
+            time.sleep(10);
+            long timeMs = time.milliseconds();
+            networkClientDelegate.poll(0, timeMs);
+            assertEquals(
+                0,
+                (double) metrics.metric(
+                    metrics.metricName("unsent-requests-queue-size", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("unsent-requests-queue-time-avg", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+            assertEquals(
+                10,
+                (double) metrics.metric(
+                    metrics.metricName("unsent-requests-queue-time-max", 
CONSUMER_METRIC_GROUP)
+                ).metricValue()
+            );
+        }
+    }
+
     public NetworkClientDelegate newNetworkClientDelegate(boolean 
notifyMetadataErrorsViaErrorQueue) {
+        return newNetworkClientDelegate(notifyMetadataErrorsViaErrorQueue, 
mock(AsyncConsumerMetrics.class));
+    }
+
+    public NetworkClientDelegate newNetworkClientDelegate(boolean 
notifyMetadataErrorsViaErrorQueue, AsyncConsumerMetrics asyncConsumerMetrics) {
         LogContext logContext = new LogContext();
         Properties properties = new Properties();
         properties.put(KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
         properties.put(GROUP_ID_CONFIG, GROUP_ID);
         properties.put(REQUEST_TIMEOUT_MS_CONFIG, REQUEST_TIMEOUT_MS);
-        return new NetworkClientDelegate(this.time,
+        return new NetworkClientDelegate(time,
                 new ConsumerConfig(properties),
                 logContext,
                 this.client,
                 this.metadata,
                 this.backgroundEventHandler,
-                notifyMetadataErrorsViaErrorQueue);
+                notifyMetadataErrorsViaErrorQueue,
+                asyncConsumerMetrics
+        );
     }
 
     public NetworkClientDelegate.UnsentRequest 
newUnsentFindCoordinatorRequest() {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
index 7231e0b6390..1b1ed587203 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
 import 
org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
 import 
org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
+import 
org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
@@ -113,6 +114,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -1656,7 +1658,7 @@ public class ShareConsumeRequestManagerTest {
                 subscriptions,
                 fetchConfig,
                 deserializers);
-        BackgroundEventHandler backgroundEventHandler = new 
TestableBackgroundEventHandler(completedAcknowledgements);
+        BackgroundEventHandler backgroundEventHandler = new 
TestableBackgroundEventHandler(time, completedAcknowledgements);
         shareConsumeRequestManager = spy(new 
TestableShareConsumeRequestManager<>(
                 logContext,
                 groupId,
@@ -1687,8 +1689,9 @@ public class ShareConsumeRequestManagerTest {
         properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 
String.valueOf(requestTimeoutMs));
         properties.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, 
String.valueOf(retryBackoffMs));
         ConsumerConfig config = new ConsumerConfig(properties);
-        networkClientDelegate = spy(new TestableNetworkClientDelegate(time, 
config, logContext, client, metadata,
-                new BackgroundEventHandler(new LinkedBlockingQueue<>()), 
false));
+        networkClientDelegate = spy(new TestableNetworkClientDelegate(
+            time, config, logContext, client, metadata,
+            new BackgroundEventHandler(new LinkedBlockingQueue<>(), time, 
mock(AsyncConsumerMetrics.class)), false));
     }
 
     private class TestableShareConsumeRequestManager<K, V> extends 
ShareConsumeRequestManager {
@@ -1742,7 +1745,7 @@ public class ShareConsumeRequestManagerTest {
                                              Metadata metadata,
                                              BackgroundEventHandler 
backgroundEventHandler,
                                              boolean 
notifyMetadataErrorsViaErrorQueue) {
-            super(time, config, logContext, client, metadata, 
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue);
+            super(time, config, logContext, client, metadata, 
backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, 
mock(AsyncConsumerMetrics.class));
         }
 
         @Override
@@ -1837,8 +1840,8 @@ public class ShareConsumeRequestManagerTest {
     private static class TestableBackgroundEventHandler extends 
BackgroundEventHandler {
         List<Map<TopicIdPartition, Acknowledgements>> 
completedAcknowledgements;
 
-        public TestableBackgroundEventHandler(List<Map<TopicIdPartition, 
Acknowledgements>> completedAcknowledgements) {
-            super(new LinkedBlockingQueue<>());
+        public TestableBackgroundEventHandler(Time time, 
List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements) {
+            super(new LinkedBlockingQueue<>(), time, 
mock(AsyncConsumerMetrics.class));
             this.completedAcknowledgements = completedAcknowledgements;
         }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
index 7fd0bc3fe60..04db229c8df 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumerImplTest.java
@@ -122,7 +122,7 @@ public class ShareConsumerImplTest {
                 new StringDeserializer(),
                 new StringDeserializer(),
                 time,
-                (a, b, c, d, e, f, g) -> applicationEventHandler,
+                (a, b, c, d, e, f, g, h) -> applicationEventHandler,
                 a -> backgroundEventReaper,
                 (a, b, c, d, e) -> fetchCollector,
                 backgroundEventQueue
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
index eabcb8773e1..71e44631ee8 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/CompletableEventReaperTest.java
@@ -49,7 +49,7 @@ public class CompletableEventReaperTest {
 
         // Without any time passing, we check the reaper and verify that the 
event is not done amd is still
         // being tracked.
-        reaper.reap(time.milliseconds());
+        assertEquals(0, reaper.reap(time.milliseconds()));
         assertFalse(event.future().isDone());
         assertEquals(1, reaper.size());
 
@@ -62,7 +62,7 @@ public class CompletableEventReaperTest {
 
         // Call the reaper and validate that the event is now "done" 
(expired), the correct exception type is
         // thrown, and the event is no longer tracked.
-        reaper.reap(time.milliseconds());
+        assertEquals(1, reaper.reap(time.milliseconds()));
         assertTrue(event.future().isDone());
         assertThrows(TimeoutException.class, () -> 
ConsumerUtils.getResult(event.future()));
         assertEquals(0, reaper.size());
@@ -77,7 +77,7 @@ public class CompletableEventReaperTest {
 
         // Without any time passing, we check the reaper and verify that the 
event is not done amd is still
         // being tracked.
-        reaper.reap(time.milliseconds());
+        assertEquals(0, reaper.reap(time.milliseconds()));
         assertFalse(event.future().isDone());
         assertEquals(1, reaper.size());
 
@@ -91,7 +91,7 @@ public class CompletableEventReaperTest {
         time.sleep(timeoutMs + 1);
 
         // Call the reaper and validate that the event is not considered 
expired, but is still no longer tracked.
-        reaper.reap(time.milliseconds());
+        assertEquals(0, reaper.reap(time.milliseconds()));
         assertTrue(event.future().isDone());
         assertNull(ConsumerUtils.getResult(event.future()));
         assertEquals(0, reaper.size());
@@ -108,7 +108,7 @@ public class CompletableEventReaperTest {
 
         // Without any time passing, we check the reaper and verify that the 
event is not done amd is still
         // being tracked.
-        reaper.reap(time.milliseconds());
+        assertEquals(0, reaper.reap(time.milliseconds()));
         assertFalse(event1.future().isDone());
         assertFalse(event2.future().isDone());
         assertEquals(2, reaper.size());
@@ -124,7 +124,7 @@ public class CompletableEventReaperTest {
 
         // Validate that the first (completed) event is not expired, but the 
second one is expired. In either case,
         // both should be completed and neither should be tracked anymore.
-        reaper.reap(time.milliseconds());
+        assertEquals(1, reaper.reap(time.milliseconds()));
         assertTrue(event1.future().isDone());
         assertTrue(event2.future().isDone());
         assertNull(ConsumerUtils.getResult(event1.future()));
@@ -150,7 +150,7 @@ public class CompletableEventReaperTest {
         assertEquals(2, queue.size());
 
         // Go ahead and reap the incomplete from the queue.
-        reaper.reap(queue);
+        assertEquals(1, reaper.reap(queue));
 
         // The first event was completed, so we didn't expire it in the reaper.
         assertTrue(event1.future().isDone());
@@ -186,7 +186,7 @@ public class CompletableEventReaperTest {
         assertEquals(2, reaper.size());
 
         // Go ahead and reap the incomplete events. Both sets should be zero 
after that.
-        reaper.reap(queue);
+        assertEquals(1, reaper.reap(queue));
         assertEquals(0, reaper.size());
         assertEquals(0, queue.size());
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
new file mode 100644
index 00000000000..2913bcfad70
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/metrics/AsyncConsumerMetricsTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Metrics;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+
+import static 
org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_METRIC_GROUP;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AsyncConsumerMetricsTest {
+    private static final long METRIC_VALUE = 123L;
+
+    private final Metrics metrics = new Metrics();
+    private AsyncConsumerMetrics consumerMetrics;
+
+    @AfterEach
+    public void tearDown() {
+        if (consumerMetrics != null) {
+            consumerMetrics.close();
+        }
+        metrics.close();
+    }
+
+    @Test
+    public void shouldMetricNames() {
+        // create
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        HashSet<MetricName> expectedMetrics = new HashSet<>(Arrays.asList(
+            metrics.metricName("last-poll-seconds-ago", CONSUMER_METRIC_GROUP),
+            metrics.metricName("time-between-poll-avg", CONSUMER_METRIC_GROUP),
+            metrics.metricName("time-between-poll-max", CONSUMER_METRIC_GROUP),
+            metrics.metricName("poll-idle-ratio-avg", CONSUMER_METRIC_GROUP),
+            metrics.metricName("commit-sync-time-ns-total", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("committed-time-ns-total", 
CONSUMER_METRIC_GROUP)
+        ));
+        expectedMetrics.forEach(
+            metricName -> assertTrue(
+                metrics.metrics().containsKey(metricName),
+                "Missing metric: " + metricName
+            )
+        );
+
+        HashSet<MetricName> expectedConsumerMetrics = new 
HashSet<>(Arrays.asList(
+            metrics.metricName("time-between-network-thread-poll-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("time-between-network-thread-poll-max", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("application-event-queue-size", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("application-event-queue-time-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("application-event-queue-time-max", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("application-event-queue-processing-time-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("application-event-queue-processing-time-max", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("unsent-requests-queue-size", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("unsent-requests-queue-time-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("unsent-requests-queue-time-max", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("background-event-queue-size", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("background-event-queue-time-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("background-event-queue-time-max", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("background-event-queue-processing-time-avg", 
CONSUMER_METRIC_GROUP),
+            metrics.metricName("background-event-queue-processing-time-max", 
CONSUMER_METRIC_GROUP)
+        ));
+        expectedConsumerMetrics.forEach(
+            metricName -> assertTrue(
+                metrics.metrics().containsKey(metricName),
+                "Missing metric: " + metricName
+            )
+        );
+
+        // close
+        consumerMetrics.close();
+        expectedMetrics.forEach(
+            metricName -> assertFalse(
+                metrics.metrics().containsKey(metricName),
+                "Metric present after close: " + metricName
+            )
+        );
+        expectedConsumerMetrics.forEach(
+            metricName -> assertFalse(
+                metrics.metrics().containsKey(metricName),
+                "Metric present after close: " + metricName
+            )
+        );
+    }
+
+    @Test
+    public void shouldRecordTimeBetweenNetworkThreadPoll() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordTimeBetweenNetworkThreadPoll(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("time-between-network-thread-poll-avg");
+        assertMetricValue("time-between-network-thread-poll-max");
+    }
+
+    @Test
+    public void shouldRecordApplicationEventQueueSize() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordApplicationEventQueueSize(10);
+
+        // Then:
+        assertEquals(
+            metrics.metric(
+                metrics.metricName(
+                    "application-event-queue-size",
+                    CONSUMER_METRIC_GROUP
+                )
+            ).metricValue(),
+            (double) 10
+        );
+    }
+
+    @Test
+    public void shouldRecordApplicationEventQueueTime() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordApplicationEventQueueTime(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("application-event-queue-time-avg");
+        assertMetricValue("application-event-queue-time-max");
+    }
+
+    @Test
+    public void shouldRecordApplicationEventQueueProcessingTime() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        
consumerMetrics.recordApplicationEventQueueProcessingTime(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("application-event-queue-processing-time-avg");
+        assertMetricValue("application-event-queue-processing-time-max");
+    }
+
+    @Test
+    public void shouldRecordUnsentRequestsQueueSize() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordUnsentRequestsQueueSize(10, 100);
+
+        // Then:
+        assertEquals(
+            metrics.metric(
+                metrics.metricName(
+                    "unsent-requests-queue-size",
+                    CONSUMER_METRIC_GROUP
+                )
+            ).metricValue(),
+            (double) 10
+        );
+    }
+
+    @Test
+    public void shouldRecordUnsentRequestsQueueTime() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordUnsentRequestsQueueTime(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("unsent-requests-queue-time-avg");
+        assertMetricValue("unsent-requests-queue-time-max");
+    }
+
+    @Test
+    public void shouldRecordBackgroundEventQueueSize() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordBackgroundEventQueueSize(10);
+
+        // Then:
+        assertEquals(
+            metrics.metric(
+                metrics.metricName(
+                    "background-event-queue-size",
+                    CONSUMER_METRIC_GROUP
+                )
+            ).metricValue(),
+            (double) 10
+        );
+    }
+
+    @Test
+    public void shouldRecordBackgroundEventQueueTime() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordBackgroundEventQueueTime(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("background-event-queue-time-avg");
+        assertMetricValue("background-event-queue-time-max");
+    }
+
+    @Test
+    public void shouldRecordBackgroundEventQueueProcessingTime() {
+        consumerMetrics = new AsyncConsumerMetrics(metrics);
+        // When:
+        consumerMetrics.recordBackgroundEventQueueProcessingTime(METRIC_VALUE);
+
+        // Then:
+        assertMetricValue("background-event-queue-processing-time-avg");
+        assertMetricValue("background-event-queue-processing-time-avg");
+    }
+
+    private void assertMetricValue(final String name) {
+        assertEquals(
+            metrics.metric(
+                metrics.metricName(
+                    name,
+                    CONSUMER_METRIC_GROUP
+                )
+            ).metricValue(),
+            (double) METRIC_VALUE
+        );
+    }
+}

Reply via email to