This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c74c0f2facd KAFKA-14758: Extract inner classes from Fetcher for reuse 
in refactoring (#13301)
c74c0f2facd is described below

commit c74c0f2facde2b392ab745144d6ad520575ab9ef
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Fri Mar 10 10:17:14 2023 -0800

    KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring 
(#13301)
    
    The Fetcher class is used internally by the KafkaConsumer to fetch records 
from the brokers. There is ongoing work to create a new consumer implementation 
with a significantly refactored threading model. The threading refactor work 
requires a similarly refactored Fetcher.
    
    This task includes refactoring Fetcher by extracting out the inner classes 
into top-level (though still in internal) so that those classes can be 
referenced by forthcoming refactored fetch logic.
    
    Reviewers: Philip Nee <philip...@gmail.com>, Guozhang Wang 
<wangg...@gmail.com>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |   4 +-
 .../clients/consumer/internals/CompletedFetch.java | 365 ++++++++++++++
 .../consumer/internals/ConsumerMetrics.java        |   4 +-
 .../consumer/internals/FetchMetricsAggregator.java |  95 ++++
 .../consumer/internals/FetchMetricsManager.java    | 203 ++++++++
 ...ricsRegistry.java => FetchMetricsRegistry.java} |   8 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 557 ++-------------------
 .../clients/consumer/internals/SensorBuilder.java  | 115 +++++
 .../consumer/internals/CompletedFetchTest.java     | 304 +++++++++++
 .../internals/FetchMetricsManagerTest.java         | 171 +++++++
 .../clients/consumer/internals/FetcherTest.java    |   4 +-
 .../consumer/internals/OffsetFetcherTest.java      |   2 +-
 12 files changed, 1300 insertions(+), 532 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3f966121d69..11ac675cb4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -28,7 +28,7 @@ import 
org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.consumer.internals.Fetch;
 import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
+import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
 import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
@@ -737,7 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata.bootstrap(addresses);
             String metricGrpPrefix = "consumer";
 
-            FetcherMetricsRegistry metricsRegistry = new 
FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), 
metricGrpPrefix);
+            FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG), 
metricGrpPrefix);
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config, time, logContext);
             this.isolationLevel = IsolationLevel.valueOf(
                     
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
new file mode 100644
index 00000000000..6a11b846810
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link 
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state 
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+    final TopicPartition partition;
+    final FetchResponseData.PartitionData partitionData;
+    final short requestVersion;
+
+    long nextFetchOffset;
+    Optional<Integer> lastEpoch;
+    boolean isConsumed = false;
+    boolean initialized = false;
+
+    private final Logger log;
+    private final SubscriptionState subscriptions;
+    private final boolean checkCrcs;
+    private final BufferSupplier decompressionBufferSupplier;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+    private final Iterator<? extends RecordBatch> batches;
+    private final Set<Long> abortedProducerIds;
+    private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
+    private final FetchMetricsAggregator metricAggregator;
+
+    private int recordsRead;
+    private int bytesRead;
+    private RecordBatch currentBatch;
+    private Record lastRecord;
+    private CloseableIterator<Record> records;
+    private Exception cachedRecordException = null;
+    private boolean corruptLastRecord = false;
+
+    CompletedFetch(LogContext logContext,
+                   SubscriptionState subscriptions,
+                   boolean checkCrcs,
+                   BufferSupplier decompressionBufferSupplier,
+                   Deserializer<K> keyDeserializer,
+                   Deserializer<V> valueDeserializer,
+                   IsolationLevel isolationLevel,
+                   TopicPartition partition,
+                   FetchResponseData.PartitionData partitionData,
+                   FetchMetricsAggregator metricAggregator,
+                   Long fetchOffset,
+                   short requestVersion) {
+        this.log = logContext.logger(CompletedFetch.class);
+        this.subscriptions = subscriptions;
+        this.checkCrcs = checkCrcs;
+        this.decompressionBufferSupplier = decompressionBufferSupplier;
+        this.keyDeserializer = keyDeserializer;
+        this.valueDeserializer = valueDeserializer;
+        this.isolationLevel = isolationLevel;
+        this.partition = partition;
+        this.partitionData = partitionData;
+        this.metricAggregator = metricAggregator;
+        this.batches = 
FetchResponse.recordsOrFail(partitionData).batches().iterator();
+        this.nextFetchOffset = fetchOffset;
+        this.requestVersion = requestVersion;
+        this.lastEpoch = Optional.empty();
+        this.abortedProducerIds = new HashSet<>();
+        this.abortedTransactions = abortedTransactions(partitionData);
+    }
+
+    /**
+     * After each partition is parsed, we update the current metric totals 
with the total bytes
+     * and number of records parsed. After all partitions have reported, we 
write the metric.
+     */
+    void recordAggregatedMetrics(int bytes, int records) {
+        metricAggregator.record(partition, bytes, records);
+    }
+
+    /**
+     * Draining a {@link CompletedFetch} will signal that the data has been 
consumed and the underlying resources
+     * are closed. This is somewhat analogous to {@link Closeable#close() 
closing}, though no error will result if a
+     * caller invokes {@link #fetchRecords(int)}; an empty {@link List list} 
will be returned instead.
+     */
+    void drain() {
+        if (!isConsumed) {
+            maybeCloseRecordStream();
+            cachedRecordException = null;
+            this.isConsumed = true;
+            recordAggregatedMetrics(bytesRead, recordsRead);
+
+            // we move the partition to the end if we received some bytes. 
This way, it's more likely that partitions
+            // for the same topic can remain together (allowing for more 
efficient serialization).
+            if (bytesRead > 0)
+                subscriptions.movePartitionToEnd(partition);
+        }
+    }
+
+    private void maybeEnsureValid(RecordBatch batch) {
+        if (checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+            try {
+                batch.ensureValid();
+            } catch (CorruptRecordException e) {
+                throw new KafkaException("Record batch for partition " + 
partition + " at offset " +
+                        batch.baseOffset() + " is invalid, cause: " + 
e.getMessage());
+            }
+        }
+    }
+
+    private void maybeEnsureValid(Record record) {
+        if (checkCrcs) {
+            try {
+                record.ensureValid();
+            } catch (CorruptRecordException e) {
+                throw new KafkaException("Record for partition " + partition + 
" at offset " + record.offset()
+                        + " is invalid, cause: " + e.getMessage());
+            }
+        }
+    }
+
+    private void maybeCloseRecordStream() {
+        if (records != null) {
+            records.close();
+            records = null;
+        }
+    }
+
+    private Record nextFetchedRecord() {
+        while (true) {
+            if (records == null || !records.hasNext()) {
+                maybeCloseRecordStream();
+
+                if (!batches.hasNext()) {
+                    // Message format v2 preserves the last offset in a batch 
even if the last record is removed
+                    // through compaction. By using the next offset computed 
from the last offset in the batch,
+                    // we ensure that the offset of the next fetch will point 
to the next batch, which avoids
+                    // unnecessary re-fetching of the same batch (in the worst 
case, the consumer could get stuck
+                    // fetching the same batch repeatedly).
+                    if (currentBatch != null)
+                        nextFetchOffset = currentBatch.nextOffset();
+                    drain();
+                    return null;
+                }
+
+                currentBatch = batches.next();
+                lastEpoch = 
maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
+                maybeEnsureValid(currentBatch);
+
+                if (isolationLevel == IsolationLevel.READ_COMMITTED && 
currentBatch.hasProducerId()) {
+                    // remove from the aborted transaction queue all aborted 
transactions which have begun
+                    // before the current batch's last offset and add the 
associated producerIds to the
+                    // aborted producer set
+                    consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
+
+                    long producerId = currentBatch.producerId();
+                    if (containsAbortMarker(currentBatch)) {
+                        abortedProducerIds.remove(producerId);
+                    } else if (isBatchAborted(currentBatch)) {
+                        log.debug("Skipping aborted record batch from 
partition {} with producerId {} and " +
+                                        "offsets {} to {}",
+                                partition, producerId, 
currentBatch.baseOffset(), currentBatch.lastOffset());
+                        nextFetchOffset = currentBatch.nextOffset();
+                        continue;
+                    }
+                }
+
+                records = 
currentBatch.streamingIterator(decompressionBufferSupplier);
+            } else {
+                Record record = records.next();
+                // skip any records out of range
+                if (record.offset() >= nextFetchOffset) {
+                    // we only do validation when the message should not be 
skipped.
+                    maybeEnsureValid(record);
+
+                    // control records are not returned to the user
+                    if (!currentBatch.isControlBatch()) {
+                        return record;
+                    } else {
+                        // Increment the next fetch offset when we skip a 
control batch.
+                        nextFetchOffset = record.offset() + 1;
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * The {@link RecordBatch batch} of {@link Record records} is converted to 
a {@link List list} of
+     * {@link ConsumerRecord consumer records} and returned. {@link 
BufferSupplier Decompression} and
+     * {@link Deserializer deserialization} of the {@link Record record's} key 
and value are performed in
+     * this step.
+     *
+     * @param maxRecords The number of records to return; the number returned 
may be {@code 0 <= maxRecords}
+     * @return {@link ConsumerRecord Consumer records}
+     */
+    List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
+        // Error when fetching the next record before deserialization.
+        if (corruptLastRecord)
+            throw new KafkaException("Received exception when fetching the 
next record from " + partition
+                    + ". If needed, please seek past the record to "
+                    + "continue consumption.", cachedRecordException);
+
+        if (isConsumed)
+            return Collections.emptyList();
+
+        List<ConsumerRecord<K, V>> records = new ArrayList<>();
+
+        try {
+            for (int i = 0; i < maxRecords; i++) {
+                // Only move to next record if there was no exception in the 
last fetch. Otherwise, we should
+                // use the last record to do deserialization again.
+                if (cachedRecordException == null) {
+                    corruptLastRecord = true;
+                    lastRecord = nextFetchedRecord();
+                    corruptLastRecord = false;
+                }
+
+                if (lastRecord == null)
+                    break;
+
+                Optional<Integer> leaderEpoch = 
maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
+                TimestampType timestampType = currentBatch.timestampType();
+                ConsumerRecord<K, V> record = parseRecord(partition, 
leaderEpoch, timestampType, lastRecord);
+                records.add(record);
+                recordsRead++;
+                bytesRead += lastRecord.sizeInBytes();
+                nextFetchOffset = lastRecord.offset() + 1;
+                // In some cases, the deserialization may have thrown an 
exception and the retry may succeed,
+                // we allow user to move forward in this case.
+                cachedRecordException = null;
+            }
+        } catch (SerializationException se) {
+            cachedRecordException = se;
+            if (records.isEmpty())
+                throw se;
+        } catch (KafkaException e) {
+            cachedRecordException = e;
+            if (records.isEmpty())
+                throw new KafkaException("Received exception when fetching the 
next record from " + partition
+                        + ". If needed, please seek past the record to "
+                        + "continue consumption.", e);
+        }
+        return records;
+    }
+
+    /**
+     * Parse the record entry, deserializing the key / value fields if 
necessary
+     */
+    ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+                                     Optional<Integer> leaderEpoch,
+                                     TimestampType timestampType,
+                                     Record record) {
+        try {
+            long offset = record.offset();
+            long timestamp = record.timestamp();
+            Headers headers = new RecordHeaders(record.headers());
+            ByteBuffer keyBytes = record.key();
+            byte[] keyByteArray = keyBytes == null ? null : 
org.apache.kafka.common.utils.Utils.toArray(keyBytes);
+            K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+            ByteBuffer valueBytes = record.value();
+            byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
+            V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
+                    timestamp, timestampType,
+                    keyByteArray == null ? ConsumerRecord.NULL_SIZE : 
keyByteArray.length,
+                    valueByteArray == null ? ConsumerRecord.NULL_SIZE : 
valueByteArray.length,
+                    key, value, headers, leaderEpoch);
+        } catch (RuntimeException e) {
+            throw new RecordDeserializationException(partition, 
record.offset(),
+                    "Error deserializing key/value for partition " + partition 
+
+                            " at offset " + record.offset() + ". If needed, 
please seek past the record to continue consumption.", e);
+        }
+    }
+
+    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
+        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? 
Optional.empty() : Optional.of(leaderEpoch);
+    }
+
+    private void consumeAbortedTransactionsUpTo(long offset) {
+        if (abortedTransactions == null)
+            return;
+
+        while (!abortedTransactions.isEmpty() && 
abortedTransactions.peek().firstOffset() <= offset) {
+            FetchResponseData.AbortedTransaction abortedTransaction = 
abortedTransactions.poll();
+            abortedProducerIds.add(abortedTransaction.producerId());
+        }
+    }
+
+    private boolean isBatchAborted(RecordBatch batch) {
+        return batch.isTransactional() && 
abortedProducerIds.contains(batch.producerId());
+    }
+
+    private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions(FetchResponseData.PartitionData partition) {
+        if (partition.abortedTransactions() == null || 
partition.abortedTransactions().isEmpty())
+            return null;
+
+        PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions = new PriorityQueue<>(
+                partition.abortedTransactions().size(), 
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
+        );
+        abortedTransactions.addAll(partition.abortedTransactions());
+        return abortedTransactions;
+    }
+
+    private boolean containsAbortMarker(RecordBatch batch) {
+        if (!batch.isControlBatch())
+            return false;
+
+        Iterator<Record> batchIterator = batch.iterator();
+        if (!batchIterator.hasNext())
+            return false;
+
+        Record firstRecord = batchIterator.next();
+        return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
index e58db82ee3c..e119153af01 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.metrics.Metrics;
 
 public class ConsumerMetrics {
     
-    public FetcherMetricsRegistry fetcherMetrics;
+    public FetchMetricsRegistry fetcherMetrics;
     
     public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) {
-        this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags, 
metricGrpPrefix);
+        this.fetcherMetrics = new FetchMetricsRegistry(metricsTags, 
metricGrpPrefix);
     }
 
     public ConsumerMetrics(String metricGroupPrefix) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
new file mode 100644
index 00000000000..4fe8199734d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Since we parse the message data for each partition from each fetch response 
lazily, fetch-level
+ * metrics need to be aggregated as the messages from each partition are 
parsed. This class is used
+ * to facilitate this incremental aggregation.
+ */
+class FetchMetricsAggregator {
+
+    private final FetchMetricsManager metricsManager;
+    private final Set<TopicPartition> unrecordedPartitions;
+    private final FetchMetrics fetchFetchMetrics = new FetchMetrics();
+    private final Map<String, FetchMetrics> perTopicFetchMetrics = new 
HashMap<>();
+
+    FetchMetricsAggregator(FetchMetricsManager metricsManager, 
Set<TopicPartition> partitions) {
+        this.metricsManager = metricsManager;
+        this.unrecordedPartitions = new HashSet<>(partitions);
+    }
+
+    /**
+     * After each partition is parsed, we update the current metric totals 
with the total bytes
+     * and number of records parsed. After all partitions have reported, we 
write the metric.
+     */
+    void record(TopicPartition partition, int bytes, int records) {
+        // Aggregate the metrics at the fetch level
+        fetchFetchMetrics.increment(bytes, records);
+
+        // Also aggregate the metrics on a per-topic basis.
+        perTopicFetchMetrics.computeIfAbsent(partition.topic(), t -> new 
FetchMetrics())
+                        .increment(bytes, records);
+
+        maybeRecordMetrics(partition);
+    }
+
+    /**
+     * Once we've detected that all of the {@link TopicPartition partitions} 
for the fetch have been handled, we
+     * can then record the aggregated metrics values. This is done at the 
fetch level and on a per-topic basis.
+     *
+     * @param partition {@link TopicPartition}
+     */
+    private void maybeRecordMetrics(TopicPartition partition) {
+        unrecordedPartitions.remove(partition);
+
+        if (!unrecordedPartitions.isEmpty())
+            return;
+
+        // Record the metrics aggregated at the fetch level.
+        metricsManager.recordBytesFetched(fetchFetchMetrics.bytes);
+        metricsManager.recordRecordsFetched(fetchFetchMetrics.records);
+
+        // Also record the metrics aggregated on a per-topic basis.
+        for (Map.Entry<String, FetchMetrics> entry: 
perTopicFetchMetrics.entrySet()) {
+            String topic = entry.getKey();
+            FetchMetrics fetchMetrics = entry.getValue();
+            metricsManager.recordBytesFetched(topic, fetchMetrics.bytes);
+            metricsManager.recordRecordsFetched(topic, fetchMetrics.records);
+        }
+    }
+
+    private static class FetchMetrics {
+
+        private int bytes;
+        private int records;
+
+        private void increment(int bytes, int records) {
+            this.bytes += bytes;
+            this.records += records;
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
new file mode 100644
index 00000000000..63bd7650701
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchMetricsManager} class provides wrapper methods to record 
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated 
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchMetricsManager {
+
+    private final Metrics metrics;
+    private final FetchMetricsRegistry metricsRegistry;
+    private final Sensor bytesFetched;
+    private final Sensor recordsFetched;
+    private final Sensor fetchLatency;
+    private final Sensor recordsLag;
+    private final Sensor recordsLead;
+
+    private int assignmentId = 0;
+    private Set<TopicPartition> assignedPartitions = Collections.emptySet();
+
+    FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry) 
{
+        this.metrics = metrics;
+        this.metricsRegistry = metricsRegistry;
+
+        this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched")
+                .withAvg(metricsRegistry.fetchSizeAvg)
+                .withMax(metricsRegistry.fetchSizeMax)
+                .withMeter(metricsRegistry.bytesConsumedRate, 
metricsRegistry.bytesConsumedTotal)
+                .build();
+        this.recordsFetched = new SensorBuilder(metrics, "records-fetched")
+                .withAvg(metricsRegistry.recordsPerRequestAvg)
+                .withMeter(metricsRegistry.recordsConsumedRate, 
metricsRegistry.recordsConsumedTotal)
+                .build();
+        this.fetchLatency = new SensorBuilder(metrics, "fetch-latency")
+                .withAvg(metricsRegistry.fetchLatencyAvg)
+                .withMax(metricsRegistry.fetchLatencyMax)
+                .withMeter(new WindowedCount(), 
metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal)
+                .build();
+        this.recordsLag = new SensorBuilder(metrics, "records-lag")
+                .withMax(metricsRegistry.recordsLagMax)
+                .build();
+        this.recordsLead = new SensorBuilder(metrics, "records-lead")
+                .withMin(metricsRegistry.recordsLeadMin)
+                .build();
+    }
+
+    void recordLatency(long requestLatencyMs) {
+        fetchLatency.record(requestLatencyMs);
+    }
+
+    void recordBytesFetched(int bytes) {
+        bytesFetched.record(bytes);
+    }
+
+    void recordRecordsFetched(int records) {
+        recordsFetched.record(records);
+    }
+
+    void recordBytesFetched(String topic, int bytes) {
+        String name = topicBytesFetchedMetricName(topic);
+        Sensor bytesFetched = new SensorBuilder(metrics, name, () -> 
topicTags(topic))
+                .withAvg(metricsRegistry.topicFetchSizeAvg)
+                .withMax(metricsRegistry.topicFetchSizeMax)
+                .withMeter(metricsRegistry.topicBytesConsumedRate, 
metricsRegistry.topicBytesConsumedTotal)
+                .build();
+        bytesFetched.record(bytes);
+    }
+
+    void recordRecordsFetched(String topic, int records) {
+        String name = topicRecordsFetchedMetricName(topic);
+        Sensor recordsFetched = new SensorBuilder(metrics, name, () -> 
topicTags(topic))
+                .withAvg(metricsRegistry.topicRecordsPerRequestAvg)
+                .withMeter(metricsRegistry.topicRecordsConsumedRate, 
metricsRegistry.topicRecordsConsumedTotal)
+                .build();
+        recordsFetched.record(records);
+    }
+
+    void recordPartitionLag(TopicPartition tp, long lag) {
+        this.recordsLag.record(lag);
+
+        String name = partitionRecordsLagMetricName(tp);
+        Sensor recordsLag = new SensorBuilder(metrics, name, () -> 
topicPartitionTags(tp))
+                .withValue(metricsRegistry.partitionRecordsLag)
+                .withMax(metricsRegistry.partitionRecordsLagMax)
+                .withAvg(metricsRegistry.partitionRecordsLagAvg)
+                .build();
+
+        recordsLag.record(lag);
+    }
+
+    void recordPartitionLead(TopicPartition tp, long lead) {
+        this.recordsLead.record(lead);
+
+        String name = partitionRecordsLeadMetricName(tp);
+        Sensor recordsLead = new SensorBuilder(metrics, name, () -> 
topicPartitionTags(tp))
+                .withValue(metricsRegistry.partitionRecordsLead)
+                .withMin(metricsRegistry.partitionRecordsLeadMin)
+                .withAvg(metricsRegistry.partitionRecordsLeadAvg)
+                .build();
+
+        recordsLead.record(lead);
+    }
+
+    /**
+     * This method is called by the {@link Fetch fetch} logic before it 
requests fetches in order to update the
+     * internal set of metrics that are tracked.
+     *
+     * @param subscription {@link SubscriptionState} that contains the set of 
assigned partitions
+     * @see SubscriptionState#assignmentId()
+     */
+    void maybeUpdateAssignment(SubscriptionState subscription) {
+        int newAssignmentId = subscription.assignmentId();
+
+        if (this.assignmentId != newAssignmentId) {
+            Set<TopicPartition> newAssignedPartitions = 
subscription.assignedPartitions();
+
+            for (TopicPartition tp : this.assignedPartitions) {
+                if (!newAssignedPartitions.contains(tp)) {
+                    metrics.removeSensor(partitionRecordsLagMetricName(tp));
+                    metrics.removeSensor(partitionRecordsLeadMetricName(tp));
+                    
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
+                }
+            }
+
+            for (TopicPartition tp : newAssignedPartitions) {
+                if (!this.assignedPartitions.contains(tp)) {
+                    MetricName metricName = 
partitionPreferredReadReplicaMetricName(tp);
+                    metrics.addMetricIfAbsent(
+                            metricName,
+                            null,
+                            (Gauge<Integer>) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
+                    );
+                }
+            }
+
+            this.assignedPartitions = newAssignedPartitions;
+            this.assignmentId = newAssignmentId;
+        }
+    }
+
+    static String topicBytesFetchedMetricName(String topic) {
+        return "topic." + topic + ".bytes-fetched";
+    }
+
+    private static String topicRecordsFetchedMetricName(String topic) {
+        return "topic." + topic + ".records-fetched";
+    }
+
+    private static String partitionRecordsLeadMetricName(TopicPartition tp) {
+        return tp + ".records-lead";
+    }
+
+    private static String partitionRecordsLagMetricName(TopicPartition tp) {
+        return tp + ".records-lag";
+    }
+
+    private MetricName partitionPreferredReadReplicaMetricName(TopicPartition 
tp) {
+        Map<String, String> metricTags = topicPartitionTags(tp);
+        return 
this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, 
metricTags);
+    }
+
+    static Map<String, String> topicTags(String topic) {
+        Map<String, String> metricTags = new HashMap<>(1);
+        metricTags.put("topic", topic.replace('.', '_'));
+        return metricTags;
+    }
+
+    static Map<String, String> topicPartitionTags(TopicPartition tp) {
+        Map<String, String> metricTags = new HashMap<>(2);
+        metricTags.put("topic", tp.topic().replace('.', '_'));
+        metricTags.put("partition", String.valueOf(tp.partition()));
+        return metricTags;
+    }
+
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
similarity index 97%
rename from 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
rename to 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
index f76a92462d5..fc4ac1d665e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
@@ -24,7 +24,7 @@ import java.util.Set;
 
 import org.apache.kafka.common.MetricNameTemplate;
 
-public class FetcherMetricsRegistry {
+public class FetchMetricsRegistry {
 
     public MetricNameTemplate fetchSizeAvg;
     public MetricNameTemplate fetchSizeMax;
@@ -56,15 +56,15 @@ public class FetcherMetricsRegistry {
     public MetricNameTemplate partitionRecordsLeadAvg;
     public MetricNameTemplate partitionPreferredReadReplica;
 
-    public FetcherMetricsRegistry() {
+    public FetchMetricsRegistry() {
         this(new HashSet<String>(), "");
     }
 
-    public FetcherMetricsRegistry(String metricGrpPrefix) {
+    public FetchMetricsRegistry(String metricGrpPrefix) {
         this(new HashSet<String>(), metricGrpPrefix);
     }
 
-    public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
+    public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
 
         /***** Client level *****/
         String groupName = metricGrpPrefix + "-fetch-manager-metrics";
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ae7973aa808..29cd7972cc6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -25,38 +25,23 @@ import 
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPositi
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.Min;
-import org.apache.kafka.common.metrics.stats.Value;
-import org.apache.kafka.common.metrics.stats.WindowedCount;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -65,12 +50,10 @@ import org.slf4j.Logger;
 import org.slf4j.helpers.MessageFormatter;
 
 import java.io.Closeable;
-import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -78,7 +61,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
@@ -116,9 +98,9 @@ public class Fetcher<K, V> implements Closeable {
     private final boolean checkCrcs;
     private final String clientRackId;
     private final ConsumerMetadata metadata;
-    private final FetchManagerMetrics sensors;
+    private final FetchMetricsManager metricsManager;
     private final SubscriptionState subscriptions;
-    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+    private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
     private final BufferSupplier decompressionBufferSupplier = 
BufferSupplier.create();
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
@@ -126,7 +108,7 @@ public class Fetcher<K, V> implements Closeable {
     private final Map<Integer, FetchSessionHandler> sessionHandlers;
     private final Set<Integer> nodesWithPendingFetchRequests;
     private final AtomicBoolean isClosed = new AtomicBoolean(false);
-    private CompletedFetch nextInLineFetch = null;
+    private CompletedFetch<K, V> nextInLineFetch = null;
 
     public Fetcher(LogContext logContext,
                    ConsumerNetworkClient client,
@@ -142,7 +124,7 @@ public class Fetcher<K, V> implements Closeable {
                    ConsumerMetadata metadata,
                    SubscriptionState subscriptions,
                    Metrics metrics,
-                   FetcherMetricsRegistry metricsRegistry,
+                   FetchMetricsRegistry metricsRegistry,
                    Time time,
                    IsolationLevel isolationLevel) {
         this.log = logContext.logger(Fetcher.class);
@@ -161,7 +143,7 @@ public class Fetcher<K, V> implements Closeable {
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
         this.completedFetches = new ConcurrentLinkedQueue<>();
-        this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
+        this.metricsManager = new FetchMetricsManager(metrics, 
metricsRegistry);
         this.isolationLevel = isolationLevel;
         this.sessionHandlers = new HashMap<>();
         this.nodesWithPendingFetchRequests = new HashSet<>();
@@ -191,7 +173,7 @@ public class Fetcher<K, V> implements Closeable {
      */
     public synchronized int sendFetches() {
         // Update metrics in case there was an assignment change
-        sensors.maybeUpdateAssignment(subscriptions);
+        metricsManager.maybeUpdateAssignment(subscriptions);
 
         Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap = 
prepareFetchRequests();
         for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : 
fetchRequestMap.entrySet()) {
@@ -224,7 +206,7 @@ public class Fetcher<K, V> implements Closeable {
 
                             Map<TopicPartition, 
FetchResponseData.PartitionData> responseData = 
response.responseData(handler.sessionTopicNames(), 
resp.requestHeader().apiVersion());
                             Set<TopicPartition> partitions = new 
HashSet<>(responseData.keySet());
-                            FetchResponseMetricAggregator metricAggregator = 
new FetchResponseMetricAggregator(sensors, partitions);
+                            FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metricsManager, partitions);
 
                             for (Map.Entry<TopicPartition, 
FetchResponseData.PartitionData> entry : responseData.entrySet()) {
                                 TopicPartition partition = entry.getKey();
@@ -245,20 +227,29 @@ public class Fetcher<K, V> implements Closeable {
                                     throw new IllegalStateException(message);
                                 } else {
                                     long fetchOffset = requestData.fetchOffset;
+                                    short requestVersion = 
resp.requestHeader().apiVersion();
                                     FetchResponseData.PartitionData 
partitionData = entry.getValue();
 
                                     log.debug("Fetch {} at offset {} for 
partition {} returned fetch data {}",
                                             isolationLevel, fetchOffset, 
partition, partitionData);
 
-                                    Iterator<? extends RecordBatch> batches = 
FetchResponse.recordsOrFail(partitionData).batches().iterator();
-                                    short responseVersion = 
resp.requestHeader().apiVersion();
-
-                                    completedFetches.add(new 
CompletedFetch(partition, partitionData,
-                                            metricAggregator, batches, 
fetchOffset, responseVersion));
+                                    CompletedFetch<K, V> completedFetch = new 
CompletedFetch<>(logContext,
+                                            subscriptions,
+                                            checkCrcs,
+                                            decompressionBufferSupplier,
+                                            keyDeserializer,
+                                            valueDeserializer,
+                                            isolationLevel,
+                                            partition,
+                                            partitionData,
+                                            metricAggregator,
+                                            fetchOffset,
+                                            requestVersion);
+                                    completedFetches.add(completedFetch);
                                 }
                             }
 
-                            
sensors.fetchLatency.record(resp.requestLatencyMs());
+                            
metricsManager.recordLatency(resp.requestLatencyMs());
                         } finally {
                             
nodesWithPendingFetchRequests.remove(fetchTarget.id());
                         }
@@ -328,16 +319,16 @@ public class Fetcher<K, V> implements Closeable {
      */
     public Fetch<K, V> collectFetch() {
         Fetch<K, V> fetch = Fetch.empty();
-        Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
+        Queue<CompletedFetch<K, V>> pausedCompletedFetches = new 
ArrayDeque<>();
         int recordsRemaining = maxPollRecords;
 
         try {
             while (recordsRemaining > 0) {
                 if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
-                    CompletedFetch records = completedFetches.peek();
+                    CompletedFetch<K, V> records = completedFetches.peek();
                     if (records == null) break;
 
-                    if (records.notInitialized()) {
+                    if (!records.initialized) {
                         try {
                             nextInLineFetch = 
initializeCompletedFetch(records);
                         } catch (Exception e) {
@@ -380,7 +371,7 @@ public class Fetcher<K, V> implements Closeable {
         return fetch;
     }
 
-    private Fetch<K, V> fetchRecords(CompletedFetch completedFetch, int 
maxRecords) {
+    private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int 
maxRecords) {
         if (!subscriptions.isAssigned(completedFetch.partition)) {
             // this can happen when a rebalance happened before fetched 
records are returned to the consumer's poll call
             log.debug("Not returning fetched records for partition {} since it 
is no longer assigned",
@@ -417,11 +408,11 @@ public class Fetcher<K, V> implements Closeable {
 
                 Long partitionLag = 
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
                 if (partitionLag != null)
-                    this.sensors.recordPartitionLag(completedFetch.partition, 
partitionLag);
+                    
this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag);
 
                 Long lead = 
subscriptions.partitionLead(completedFetch.partition);
                 if (lead != null) {
-                    this.sensors.recordPartitionLead(completedFetch.partition, 
lead);
+                    
this.metricsManager.recordPartitionLead(completedFetch.partition, lead);
                 }
 
                 return Fetch.forPartition(completedFetch.partition, 
partRecords, positionAdvanced);
@@ -444,7 +435,7 @@ public class Fetcher<K, V> implements Closeable {
         if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
             exclude.add(nextInLineFetch.partition);
         }
-        for (CompletedFetch completedFetch : completedFetches) {
+        for (CompletedFetch<K, V> completedFetch : completedFetches) {
             exclude.add(completedFetch.partition);
         }
         return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
@@ -537,11 +528,11 @@ public class Fetcher<K, V> implements Closeable {
     /**
      * Initialize a CompletedFetch object.
      */
-    private CompletedFetch initializeCompletedFetch(CompletedFetch 
nextCompletedFetch) {
+    private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V> 
nextCompletedFetch) {
         TopicPartition tp = nextCompletedFetch.partition;
         FetchResponseData.PartitionData partition = 
nextCompletedFetch.partitionData;
         long fetchOffset = nextCompletedFetch.nextFetchOffset;
-        CompletedFetch completedFetch = null;
+        CompletedFetch<K, V> completedFetch = null;
         Errors error = Errors.forCode(partition.errorCode());
 
         try {
@@ -564,7 +555,7 @@ public class Fetcher<K, V> implements Closeable {
                 completedFetch = nextCompletedFetch;
 
                 if (!batches.hasNext() && FetchResponse.recordsSize(partition) 
> 0) {
-                    if (completedFetch.responseVersion < 3) {
+                    if (completedFetch.requestVersion < 3) {
                         // Implement the pre KIP-74 behavior of throwing a 
RecordTooLargeException.
                         Map<TopicPartition, Long> recordTooLargePartitions = 
Collections.singletonMap(tp, fetchOffset);
                         throw new RecordTooLargeException("There are some 
messages at [Partition=Offset]: " +
@@ -660,7 +651,7 @@ public class Fetcher<K, V> implements Closeable {
             }
         } finally {
             if (completedFetch == null)
-                nextCompletedFetch.metricAggregator.record(tp, 0, 0);
+                nextCompletedFetch.recordAggregatedMetrics(0, 0);
 
             if (error != Errors.NONE)
                 // we move the partition to the end if there was an error. 
This way, it's more likely that partitions for
@@ -683,49 +674,15 @@ public class Fetcher<K, V> implements Closeable {
         }
     }
 
-    /**
-     * Parse the record entry, deserializing the key / value fields if 
necessary
-     */
-    private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
-                                             RecordBatch batch,
-                                             Record record) {
-        try {
-            long offset = record.offset();
-            long timestamp = record.timestamp();
-            Optional<Integer> leaderEpoch = 
maybeLeaderEpoch(batch.partitionLeaderEpoch());
-            TimestampType timestampType = batch.timestampType();
-            Headers headers = new RecordHeaders(record.headers());
-            ByteBuffer keyBytes = record.key();
-            byte[] keyByteArray = keyBytes == null ? null : 
Utils.toArray(keyBytes);
-            K key = keyBytes == null ? null : 
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
-            ByteBuffer valueBytes = record.value();
-            byte[] valueByteArray = valueBytes == null ? null : 
Utils.toArray(valueBytes);
-            V value = valueBytes == null ? null : 
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
-            return new ConsumerRecord<>(partition.topic(), 
partition.partition(), offset,
-                                        timestamp, timestampType,
-                                        keyByteArray == null ? 
ConsumerRecord.NULL_SIZE : keyByteArray.length,
-                                        valueByteArray == null ? 
ConsumerRecord.NULL_SIZE : valueByteArray.length,
-                                        key, value, headers, leaderEpoch);
-        } catch (RuntimeException e) {
-            throw new RecordDeserializationException(partition, 
record.offset(),
-                "Error deserializing key/value for partition " + partition +
-                    " at offset " + record.offset() + ". If needed, please 
seek past the record to continue consumption.", e);
-        }
-    }
-
-    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
-        return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ? 
Optional.empty() : Optional.of(leaderEpoch);
-    }
-
     /**
      * Clear the buffered data which are not a part of newly assigned 
partitions
      *
      * @param assignedPartitions  newly assigned {@link TopicPartition}
      */
     public void 
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> 
assignedPartitions) {
-        Iterator<CompletedFetch> completedFetchesItr = 
completedFetches.iterator();
+        Iterator<CompletedFetch<K, V>> completedFetchesItr = 
completedFetches.iterator();
         while (completedFetchesItr.hasNext()) {
-            CompletedFetch records = completedFetchesItr.next();
+            CompletedFetch<K, V> records = completedFetchesItr.next();
             TopicPartition tp = records.partition;
             if (!assignedPartitions.contains(tp)) {
                 records.drain();
@@ -759,7 +716,7 @@ public class Fetcher<K, V> implements Closeable {
         return sessionHandlers.get(node);
     }
 
-    public static Sensor throttleTimeSensor(Metrics metrics, 
FetcherMetricsRegistry metricsRegistry) {
+    public static Sensor throttleTimeSensor(Metrics metrics, 
FetchMetricsRegistry metricsRegistry) {
         Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
         
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
 new Avg());
 
@@ -768,448 +725,6 @@ public class Fetcher<K, V> implements Closeable {
         return fetchThrottleTimeSensor;
     }
 
-    private class CompletedFetch {
-        private final TopicPartition partition;
-        private final Iterator<? extends RecordBatch> batches;
-        private final Set<Long> abortedProducerIds;
-        private final PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions;
-        private final FetchResponseData.PartitionData partitionData;
-        private final FetchResponseMetricAggregator metricAggregator;
-        private final short responseVersion;
-
-        private int recordsRead;
-        private int bytesRead;
-        private RecordBatch currentBatch;
-        private Record lastRecord;
-        private CloseableIterator<Record> records;
-        private long nextFetchOffset;
-        private Optional<Integer> lastEpoch;
-        private boolean isConsumed = false;
-        private Exception cachedRecordException = null;
-        private boolean corruptLastRecord = false;
-        private boolean initialized = false;
-
-        private CompletedFetch(TopicPartition partition,
-                               FetchResponseData.PartitionData partitionData,
-                               FetchResponseMetricAggregator metricAggregator,
-                               Iterator<? extends RecordBatch> batches,
-                               Long fetchOffset,
-                               short responseVersion) {
-            this.partition = partition;
-            this.partitionData = partitionData;
-            this.metricAggregator = metricAggregator;
-            this.batches = batches;
-            this.nextFetchOffset = fetchOffset;
-            this.responseVersion = responseVersion;
-            this.lastEpoch = Optional.empty();
-            this.abortedProducerIds = new HashSet<>();
-            this.abortedTransactions = abortedTransactions(partitionData);
-        }
-
-        private void drain() {
-            if (!isConsumed) {
-                maybeCloseRecordStream();
-                cachedRecordException = null;
-                this.isConsumed = true;
-                this.metricAggregator.record(partition, bytesRead, 
recordsRead);
-
-                // we move the partition to the end if we received some bytes. 
This way, it's more likely that partitions
-                // for the same topic can remain together (allowing for more 
efficient serialization).
-                if (bytesRead > 0)
-                    subscriptions.movePartitionToEnd(partition);
-            }
-        }
-
-        private void maybeEnsureValid(RecordBatch batch) {
-            if (checkCrcs && currentBatch.magic() >= 
RecordBatch.MAGIC_VALUE_V2) {
-                try {
-                    batch.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record batch for partition " + 
partition + " at offset " +
-                            batch.baseOffset() + " is invalid, cause: " + 
e.getMessage());
-                }
-            }
-        }
-
-        private void maybeEnsureValid(Record record) {
-            if (checkCrcs) {
-                try {
-                    record.ensureValid();
-                } catch (CorruptRecordException e) {
-                    throw new KafkaException("Record for partition " + 
partition + " at offset " + record.offset()
-                            + " is invalid, cause: " + e.getMessage());
-                }
-            }
-        }
-
-        private void maybeCloseRecordStream() {
-            if (records != null) {
-                records.close();
-                records = null;
-            }
-        }
-
-        private Record nextFetchedRecord() {
-            while (true) {
-                if (records == null || !records.hasNext()) {
-                    maybeCloseRecordStream();
-
-                    if (!batches.hasNext()) {
-                        // Message format v2 preserves the last offset in a 
batch even if the last record is removed
-                        // through compaction. By using the next offset 
computed from the last offset in the batch,
-                        // we ensure that the offset of the next fetch will 
point to the next batch, which avoids
-                        // unnecessary re-fetching of the same batch (in the 
worst case, the consumer could get stuck
-                        // fetching the same batch repeatedly).
-                        if (currentBatch != null)
-                            nextFetchOffset = currentBatch.nextOffset();
-                        drain();
-                        return null;
-                    }
-
-                    currentBatch = batches.next();
-                    lastEpoch = currentBatch.partitionLeaderEpoch() == 
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
-                            Optional.empty() : 
Optional.of(currentBatch.partitionLeaderEpoch());
-
-                    maybeEnsureValid(currentBatch);
-
-                    if (isolationLevel == IsolationLevel.READ_COMMITTED && 
currentBatch.hasProducerId()) {
-                        // remove from the aborted transaction queue all 
aborted transactions which have begun
-                        // before the current batch's last offset and add the 
associated producerIds to the
-                        // aborted producer set
-                        
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
-
-                        long producerId = currentBatch.producerId();
-                        if (containsAbortMarker(currentBatch)) {
-                            abortedProducerIds.remove(producerId);
-                        } else if (isBatchAborted(currentBatch)) {
-                            log.debug("Skipping aborted record batch from 
partition {} with producerId {} and " +
-                                          "offsets {} to {}",
-                                      partition, producerId, 
currentBatch.baseOffset(), currentBatch.lastOffset());
-                            nextFetchOffset = currentBatch.nextOffset();
-                            continue;
-                        }
-                    }
-
-                    records = 
currentBatch.streamingIterator(decompressionBufferSupplier);
-                } else {
-                    Record record = records.next();
-                    // skip any records out of range
-                    if (record.offset() >= nextFetchOffset) {
-                        // we only do validation when the message should not 
be skipped.
-                        maybeEnsureValid(record);
-
-                        // control records are not returned to the user
-                        if (!currentBatch.isControlBatch()) {
-                            return record;
-                        } else {
-                            // Increment the next fetch offset when we skip a 
control batch.
-                            nextFetchOffset = record.offset() + 1;
-                        }
-                    }
-                }
-            }
-        }
-
-        private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
-            // Error when fetching the next record before deserialization.
-            if (corruptLastRecord)
-                throw new KafkaException("Received exception when fetching the 
next record from " + partition
-                                             + ". If needed, please seek past 
the record to "
-                                             + "continue consumption.", 
cachedRecordException);
-
-            if (isConsumed)
-                return Collections.emptyList();
-
-            List<ConsumerRecord<K, V>> records = new ArrayList<>();
-            try {
-                for (int i = 0; i < maxRecords; i++) {
-                    // Only move to next record if there was no exception in 
the last fetch. Otherwise we should
-                    // use the last record to do deserialization again.
-                    if (cachedRecordException == null) {
-                        corruptLastRecord = true;
-                        lastRecord = nextFetchedRecord();
-                        corruptLastRecord = false;
-                    }
-                    if (lastRecord == null)
-                        break;
-                    records.add(parseRecord(partition, currentBatch, 
lastRecord));
-                    recordsRead++;
-                    bytesRead += lastRecord.sizeInBytes();
-                    nextFetchOffset = lastRecord.offset() + 1;
-                    // In some cases, the deserialization may have thrown an 
exception and the retry may succeed,
-                    // we allow user to move forward in this case.
-                    cachedRecordException = null;
-                }
-            } catch (SerializationException se) {
-                cachedRecordException = se;
-                if (records.isEmpty())
-                    throw se;
-            } catch (KafkaException e) {
-                cachedRecordException = e;
-                if (records.isEmpty())
-                    throw new KafkaException("Received exception when fetching 
the next record from " + partition
-                                                 + ". If needed, please seek 
past the record to "
-                                                 + "continue consumption.", e);
-            }
-            return records;
-        }
-
-        private void consumeAbortedTransactionsUpTo(long offset) {
-            if (abortedTransactions == null)
-                return;
-
-            while (!abortedTransactions.isEmpty() && 
abortedTransactions.peek().firstOffset() <= offset) {
-                FetchResponseData.AbortedTransaction abortedTransaction = 
abortedTransactions.poll();
-                abortedProducerIds.add(abortedTransaction.producerId());
-            }
-        }
-
-        private boolean isBatchAborted(RecordBatch batch) {
-            return batch.isTransactional() && 
abortedProducerIds.contains(batch.producerId());
-        }
-
-        private PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions(FetchResponseData.PartitionData partition) {
-            if (partition.abortedTransactions() == null || 
partition.abortedTransactions().isEmpty())
-                return null;
-
-            PriorityQueue<FetchResponseData.AbortedTransaction> 
abortedTransactions = new PriorityQueue<>(
-                    partition.abortedTransactions().size(), 
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
-            );
-            abortedTransactions.addAll(partition.abortedTransactions());
-            return abortedTransactions;
-        }
-
-        private boolean containsAbortMarker(RecordBatch batch) {
-            if (!batch.isControlBatch())
-                return false;
-
-            Iterator<Record> batchIterator = batch.iterator();
-            if (!batchIterator.hasNext())
-                return false;
-
-            Record firstRecord = batchIterator.next();
-            return ControlRecordType.ABORT == 
ControlRecordType.parse(firstRecord.key());
-        }
-
-        private boolean notInitialized() {
-            return !this.initialized;
-        }
-    }
-
-    /**
-     * Since we parse the message data for each partition from each fetch 
response lazily, fetch-level
-     * metrics need to be aggregated as the messages from each partition are 
parsed. This class is used
-     * to facilitate this incremental aggregation.
-     */
-    private static class FetchResponseMetricAggregator {
-        private final FetchManagerMetrics sensors;
-        private final Set<TopicPartition> unrecordedPartitions;
-
-        private final FetchMetrics fetchMetrics = new FetchMetrics();
-        private final Map<String, FetchMetrics> topicFetchMetrics = new 
HashMap<>();
-
-        private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
-                                              Set<TopicPartition> partitions) {
-            this.sensors = sensors;
-            this.unrecordedPartitions = partitions;
-        }
-
-        /**
-         * After each partition is parsed, we update the current metric totals 
with the total bytes
-         * and number of records parsed. After all partitions have reported, 
we write the metric.
-         */
-        public void record(TopicPartition partition, int bytes, int records) {
-            this.unrecordedPartitions.remove(partition);
-            this.fetchMetrics.increment(bytes, records);
-
-            // collect and aggregate per-topic metrics
-            String topic = partition.topic();
-            FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
-            if (topicFetchMetric == null) {
-                topicFetchMetric = new FetchMetrics();
-                this.topicFetchMetrics.put(topic, topicFetchMetric);
-            }
-            topicFetchMetric.increment(bytes, records);
-
-            if (this.unrecordedPartitions.isEmpty()) {
-                // once all expected partitions from the fetch have reported 
in, record the metrics
-                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
-                
this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
-
-                // also record per-topic metrics
-                for (Map.Entry<String, FetchMetrics> entry: 
this.topicFetchMetrics.entrySet()) {
-                    FetchMetrics metric = entry.getValue();
-                    this.sensors.recordTopicFetchMetrics(entry.getKey(), 
metric.fetchBytes, metric.fetchRecords);
-                }
-            }
-        }
-
-        private static class FetchMetrics {
-            private int fetchBytes;
-            private int fetchRecords;
-
-            protected void increment(int bytes, int records) {
-                this.fetchBytes += bytes;
-                this.fetchRecords += records;
-            }
-        }
-    }
-
-    private static class FetchManagerMetrics {
-        private final Metrics metrics;
-        private final FetcherMetricsRegistry metricsRegistry;
-        private final Sensor bytesFetched;
-        private final Sensor recordsFetched;
-        private final Sensor fetchLatency;
-        private final Sensor recordsFetchLag;
-        private final Sensor recordsFetchLead;
-
-        private int assignmentId = 0;
-        private Set<TopicPartition> assignedPartitions = 
Collections.emptySet();
-
-        private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry 
metricsRegistry) {
-            this.metrics = metrics;
-            this.metricsRegistry = metricsRegistry;
-
-            this.bytesFetched = metrics.sensor("bytes-fetched");
-            
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new 
Avg());
-            
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new 
Max());
-            this.bytesFetched.add(new 
Meter(metrics.metricInstance(metricsRegistry.bytesConsumedRate),
-                    
metrics.metricInstance(metricsRegistry.bytesConsumedTotal)));
-
-            this.recordsFetched = metrics.sensor("records-fetched");
-            
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg),
 new Avg());
-            this.recordsFetched.add(new 
Meter(metrics.metricInstance(metricsRegistry.recordsConsumedRate),
-                    
metrics.metricInstance(metricsRegistry.recordsConsumedTotal)));
-
-            this.fetchLatency = metrics.sensor("fetch-latency");
-            
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg), 
new Avg());
-            
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax), 
new Max());
-            this.fetchLatency.add(new Meter(new WindowedCount(), 
metrics.metricInstance(metricsRegistry.fetchRequestRate),
-                    
metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
-
-            this.recordsFetchLag = metrics.sensor("records-lag");
-            
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax), 
new Max());
-
-            this.recordsFetchLead = metrics.sensor("records-lead");
-            
this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin),
 new Min());
-        }
-
-        private void recordTopicFetchMetrics(String topic, int bytes, int 
records) {
-            // record bytes fetched
-            String name = "topic." + topic + ".bytes-fetched";
-            Sensor bytesFetched = this.metrics.getSensor(name);
-            if (bytesFetched == null) {
-                Map<String, String> metricTags = 
Collections.singletonMap("topic", topic.replace('.', '_'));
-
-                bytesFetched = this.metrics.sensor(name);
-                
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg,
-                        metricTags), new Avg());
-                
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
-                        metricTags), new Max());
-                bytesFetched.add(new 
Meter(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate, 
metricTags),
-                        
this.metrics.metricInstance(metricsRegistry.topicBytesConsumedTotal, 
metricTags)));
-            }
-            bytesFetched.record(bytes);
-
-            // record records fetched
-            name = "topic." + topic + ".records-fetched";
-            Sensor recordsFetched = this.metrics.getSensor(name);
-            if (recordsFetched == null) {
-                Map<String, String> metricTags = new HashMap<>(1);
-                metricTags.put("topic", topic.replace('.', '_'));
-
-                recordsFetched = this.metrics.sensor(name);
-                
recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
-                        metricTags), new Avg());
-                recordsFetched.add(new 
Meter(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate, 
metricTags),
-                        
this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedTotal, 
metricTags)));
-            }
-            recordsFetched.record(records);
-        }
-
-        private void maybeUpdateAssignment(SubscriptionState subscription) {
-            int newAssignmentId = subscription.assignmentId();
-            if (this.assignmentId != newAssignmentId) {
-                Set<TopicPartition> newAssignedPartitions = 
subscription.assignedPartitions();
-                for (TopicPartition tp : this.assignedPartitions) {
-                    if (!newAssignedPartitions.contains(tp)) {
-                        metrics.removeSensor(partitionLagMetricName(tp));
-                        metrics.removeSensor(partitionLeadMetricName(tp));
-                        
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
-                    }
-                }
-
-                for (TopicPartition tp : newAssignedPartitions) {
-                    if (!this.assignedPartitions.contains(tp)) {
-                        MetricName metricName = 
partitionPreferredReadReplicaMetricName(tp);
-                        metrics.addMetricIfAbsent(
-                            metricName,
-                            null,
-                            (Gauge<Integer>) (config, now) -> 
subscription.preferredReadReplica(tp, 0L).orElse(-1)
-                        );
-                    }
-                }
-
-                this.assignedPartitions = newAssignedPartitions;
-                this.assignmentId = newAssignmentId;
-            }
-        }
-
-        private void recordPartitionLead(TopicPartition tp, long lead) {
-            this.recordsFetchLead.record(lead);
-
-            String name = partitionLeadMetricName(tp);
-            Sensor recordsLead = this.metrics.getSensor(name);
-            if (recordsLead == null) {
-                Map<String, String> metricTags = topicPartitionTags(tp);
-
-                recordsLead = this.metrics.sensor(name);
-
-                
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead,
 metricTags), new Value());
-                
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin,
 metricTags), new Min());
-                
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg,
 metricTags), new Avg());
-            }
-            recordsLead.record(lead);
-        }
-
-        private void recordPartitionLag(TopicPartition tp, long lag) {
-            this.recordsFetchLag.record(lag);
-
-            String name = partitionLagMetricName(tp);
-            Sensor recordsLag = this.metrics.getSensor(name);
-            if (recordsLag == null) {
-                Map<String, String> metricTags = topicPartitionTags(tp);
-                recordsLag = this.metrics.sensor(name);
-
-                
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag, 
metricTags), new Value());
-                
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax,
 metricTags), new Max());
-                
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg,
 metricTags), new Avg());
-            }
-            recordsLag.record(lag);
-        }
-
-        private static String partitionLagMetricName(TopicPartition tp) {
-            return tp + ".records-lag";
-        }
-
-        private static String partitionLeadMetricName(TopicPartition tp) {
-            return tp + ".records-lead";
-        }
-
-        private MetricName 
partitionPreferredReadReplicaMetricName(TopicPartition tp) {
-            Map<String, String> metricTags = topicPartitionTags(tp);
-            return 
this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica, 
metricTags);
-        }
-
-        private Map<String, String> topicPartitionTags(TopicPartition tp) {
-            Map<String, String> metricTags = new HashMap<>(2);
-            metricTags.put("topic", tp.topic().replace('.', '_'));
-            metricTags.put("partition", String.valueOf(tp.partition()));
-            return metricTags;
-        }
-    }
-
     // Visible for testing
     void maybeCloseFetchSessions(final Timer timer) {
         final Cluster cluster = metadata.fetch();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
new file mode 100644
index 00000000000..2272ee5c0a3
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link 
Sensor sensors} for recording
+ * {@link Metric metrics}.
+ */
+public class SensorBuilder {
+
+    private final Metrics metrics;
+
+    private final Sensor sensor;
+
+    private final boolean prexisting;
+
+    private final Map<String, String> tags;
+
+    public SensorBuilder(Metrics metrics, String name) {
+        this(metrics, name, Collections::emptyMap);
+    }
+
+    public SensorBuilder(Metrics metrics, String name, Supplier<Map<String, 
String>> tagsSupplier) {
+        this.metrics = metrics;
+        Sensor s = metrics.getSensor(name);
+
+        if (s != null) {
+            sensor = s;
+            tags = Collections.emptyMap();
+            prexisting = true;
+        } else {
+            sensor = metrics.sensor(name);
+            tags = tagsSupplier.get();
+            prexisting = false;
+        }
+    }
+
+    SensorBuilder withAvg(MetricNameTemplate name) {
+        if (!prexisting)
+            sensor.add(metrics.metricInstance(name, tags), new Avg());
+
+        return this;
+    }
+
+    SensorBuilder withMin(MetricNameTemplate name) {
+        if (!prexisting)
+            sensor.add(metrics.metricInstance(name, tags), new Min());
+
+        return this;
+    }
+
+    SensorBuilder withMax(MetricNameTemplate name) {
+        if (!prexisting)
+            sensor.add(metrics.metricInstance(name, tags), new Max());
+
+        return this;
+    }
+
+    SensorBuilder withValue(MetricNameTemplate name) {
+        if (!prexisting)
+            sensor.add(metrics.metricInstance(name, tags), new Value());
+
+        return this;
+    }
+
+    SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate 
totalName) {
+        if (!prexisting) {
+            sensor.add(new Meter(metrics.metricInstance(rateName, tags), 
metrics.metricInstance(totalName, tags)));
+        }
+
+        return this;
+    }
+
+    SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate 
rateName, MetricNameTemplate totalName) {
+        if (!prexisting) {
+            sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName, 
tags), metrics.metricInstance(totalName, tags)));
+        }
+
+        return this;
+    }
+
+    Sensor build() {
+        return sensor;
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
new file mode 100644
index 00000000000..b420852852a
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.UUIDDeserializer;
+import org.apache.kafka.common.serialization.UUIDSerializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CompletedFetchTest {
+
+    private final static String TOPIC_NAME = "test";
+    private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0);
+    private final static long PRODUCER_ID = 1000L;
+    private final static short PRODUCER_EPOCH = 0;
+
+    private BufferSupplier bufferSupplier;
+
+    @BeforeEach
+    public void setup() {
+        bufferSupplier = BufferSupplier.create();
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (bufferSupplier != null)
+            bufferSupplier.close();
+    }
+
+    @Test
+    public void testSimple() {
+        long fetchOffset = 5;
+        int startingOffset = 10;
+        int numRecords = 11;        // Records for 10-20
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords, 
fetchOffset));
+
+        CompletedFetch<String, String> completedFetch = 
newCompletedFetch(fetchOffset, partitionData);
+
+        List<ConsumerRecord<String, String>> records = 
completedFetch.fetchRecords(10);
+        assertEquals(10, records.size());
+        ConsumerRecord<String, String> record = records.get(0);
+        assertEquals(10, record.offset());
+
+        records = completedFetch.fetchRecords(10);
+        assertEquals(1, records.size());
+        record = records.get(0);
+        assertEquals(20, record.offset());
+
+        records = completedFetch.fetchRecords(10);
+        assertEquals(0, records.size());
+    }
+
+    @Test
+    public void testAbortedTransactionRecordsRemoved() {
+        int numRecords = 10;
+        Records rawRecords = newTranscactionalRecords(ControlRecordType.ABORT, 
numRecords);
+
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setRecords(rawRecords)
+                .setAbortedTransactions(newAbortedTransactions());
+
+        CompletedFetch<String, String> completedFetch = 
newCompletedFetch(IsolationLevel.READ_COMMITTED,
+                OffsetResetStrategy.NONE,
+                true,
+                0,
+                partitionData);
+        List<ConsumerRecord<String, String>> records = 
completedFetch.fetchRecords(10);
+        assertEquals(0, records.size());
+
+        completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
+                OffsetResetStrategy.NONE,
+                true,
+                0,
+                partitionData);
+        records = completedFetch.fetchRecords(10);
+        assertEquals(numRecords, records.size());
+    }
+
+    @Test
+    public void testCommittedTransactionRecordsIncluded() {
+        int numRecords = 10;
+        Records rawRecords = 
newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setRecords(rawRecords);
+        CompletedFetch<String, String> completedFetch = 
newCompletedFetch(IsolationLevel.READ_COMMITTED,
+                OffsetResetStrategy.NONE,
+                true,
+                0,
+                partitionData);
+        List<ConsumerRecord<String, String>> records = 
completedFetch.fetchRecords(10);
+        assertEquals(10, records.size());
+    }
+
+    @Test
+    public void testNegativeFetchCount() {
+        long fetchOffset = 0;
+        int startingOffset = 0;
+        int numRecords = 10;
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords, 
fetchOffset));
+
+        CompletedFetch<String, String> completedFetch = 
newCompletedFetch(fetchOffset, partitionData);
+
+        List<ConsumerRecord<String, String>> records = 
completedFetch.fetchRecords(-10);
+        assertEquals(0, records.size());
+    }
+
+    @Test
+    public void testNoRecordsInFetch() {
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setPartitionIndex(0)
+                .setHighWatermark(10)
+                .setLastStableOffset(20)
+                .setLogStartOffset(0);
+
+        CompletedFetch<String, String> completedFetch = 
newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
+                OffsetResetStrategy.NONE,
+                false,
+                1,
+                partitionData);
+
+        List<ConsumerRecord<String, String>> records = 
completedFetch.fetchRecords(10);
+        assertEquals(0, records.size());
+    }
+
+    @Test
+    public void testCorruptedMessage() {
+        // Create one good record and then one "corrupted" record.
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, 0);
+        builder.append(new SimpleRecord(new 
UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID())));
+        builder.append(0L, "key".getBytes(), "value".getBytes());
+        Records records = builder.build();
+
+        FetchResponseData.PartitionData partitionData = new 
FetchResponseData.PartitionData()
+                .setPartitionIndex(0)
+                .setHighWatermark(10)
+                .setLastStableOffset(20)
+                .setLogStartOffset(0)
+                .setRecords(records);
+
+        CompletedFetch<UUID, UUID> completedFetch = newCompletedFetch(new 
UUIDDeserializer(),
+                new UUIDDeserializer(),
+                IsolationLevel.READ_COMMITTED,
+                OffsetResetStrategy.NONE,
+                false,
+                0,
+                partitionData);
+
+        completedFetch.fetchRecords(10);
+
+        assertThrows(RecordDeserializationException.class, () -> 
completedFetch.fetchRecords(10));
+    }
+
+    private CompletedFetch<String, String> newCompletedFetch(long fetchOffset,
+                                                             
FetchResponseData.PartitionData partitionData) {
+        return newCompletedFetch(
+                IsolationLevel.READ_UNCOMMITTED,
+                OffsetResetStrategy.NONE,
+                true,
+                fetchOffset,
+                partitionData);
+    }
+
+    private CompletedFetch<String, String> newCompletedFetch(IsolationLevel 
isolationLevel,
+                                                             
OffsetResetStrategy offsetResetStrategy,
+                                                             boolean checkCrcs,
+                                                             long fetchOffset,
+                                                             
FetchResponseData.PartitionData partitionData) {
+        return newCompletedFetch(new StringDeserializer(),
+                new StringDeserializer(),
+                isolationLevel,
+                offsetResetStrategy,
+                checkCrcs,
+                fetchOffset,
+                partitionData);
+    }
+
+    private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K> 
keyDeserializer,
+                                                          Deserializer<V> 
valueDeserializer,
+                                                          IsolationLevel 
isolationLevel,
+                                                          OffsetResetStrategy 
offsetResetStrategy,
+                                                          boolean checkCrcs,
+                                                          long fetchOffset,
+                                                          
FetchResponseData.PartitionData partitionData) {
+        LogContext logContext = new LogContext();
+        SubscriptionState subscriptions = new SubscriptionState(logContext, 
offsetResetStrategy);
+        FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry();
+        FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(), 
metricsRegistry);
+        FetchMetricsAggregator metricAggregator = new 
FetchMetricsAggregator(metrics, Collections.singleton(TP));
+
+        return new CompletedFetch<>(logContext,
+                subscriptions,
+                checkCrcs,
+                bufferSupplier,
+                keyDeserializer,
+                valueDeserializer,
+                isolationLevel,
+                TP,
+                partitionData,
+                metricAggregator,
+                fetchOffset,
+                ApiKeys.FETCH.latestVersion());
+    }
+
+    private Records newRecords(long baseOffset, int count, long 
firstMessageId) {
+        MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, 
TimestampType.CREATE_TIME, baseOffset);
+        for (int i = 0; i < count; i++)
+            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + 
i)).getBytes());
+        return builder.build();
+    }
+
+    private Records newTranscactionalRecords(ControlRecordType 
controlRecordType, int numRecords) {
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(1024);
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                CompressionType.NONE,
+                TimestampType.CREATE_TIME,
+                0,
+                time.milliseconds(),
+                PRODUCER_ID,
+                PRODUCER_EPOCH,
+                0,
+                true,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH);
+
+        for (int i = 0; i < numRecords; i++)
+            builder.append(new SimpleRecord(time.milliseconds(), 
"key".getBytes(), "value".getBytes()));
+
+        builder.build();
+        writeTransactionMarker(buffer, controlRecordType, numRecords, time);
+        buffer.flip();
+
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private void writeTransactionMarker(ByteBuffer buffer,
+                                        ControlRecordType controlRecordType,
+                                        int offset,
+                                        Time time) {
+        MemoryRecords.writeEndTransactionalMarker(buffer,
+                offset,
+                time.milliseconds(),
+                0,
+                PRODUCER_ID,
+                PRODUCER_EPOCH,
+                new EndTransactionMarker(controlRecordType, 0));
+    }
+
+    private List<FetchResponseData.AbortedTransaction> 
newAbortedTransactions() {
+        FetchResponseData.AbortedTransaction abortedTransaction = new 
FetchResponseData.AbortedTransaction();
+        abortedTransaction.setFirstOffset(0);
+        abortedTransaction.setProducerId(PRODUCER_ID);
+        return Collections.singletonList(abortedTransaction);
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
new file mode 100644
index 00000000000..dc00ce33366
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags;
+import static 
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FetchMetricsManagerTest {
+
+    private static final double EPSILON = 0.0001;
+
+    private final Time time = new MockTime(1, 0, 0);
+    private final static String TOPIC_NAME = "test";
+    private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0);
+
+    private Metrics metrics;
+    private FetchMetricsRegistry metricsRegistry;
+    private FetchMetricsManager metricsManager;
+
+
+    @BeforeEach
+    public void setup() {
+        metrics = new Metrics(time);
+        metricsRegistry = new 
FetchMetricsRegistry(metrics.config().tags().keySet(), "test");
+        metricsManager = new FetchMetricsManager(metrics, metricsRegistry);
+    }
+
+    @AfterEach
+    public void tearDown() {
+        if (metrics != null) {
+            metrics.close();
+            metrics = null;
+        }
+
+        metricsManager = null;
+    }
+
+    @Test
+    public void testLatency() {
+        metricsManager.recordLatency(123);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordLatency(456);
+
+        assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg), 
EPSILON);
+        assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax), 
EPSILON);
+    }
+
+    @Test
+    public void testBytesFetched() {
+        metricsManager.recordBytesFetched(2);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordBytesFetched(10);
+
+        assertEquals(6, metricValue(metricsRegistry.fetchSizeAvg), EPSILON);
+        assertEquals(10, metricValue(metricsRegistry.fetchSizeMax), EPSILON);
+    }
+
+    @Test
+    public void testBytesFetchedTopic() {
+        String topicName1 = TOPIC_NAME;
+        String topicName2 = "another-topic";
+        Map<String, String> tags1 = topicTags(topicName1);
+        Map<String, String> tags2 = topicTags(topicName2);
+
+        metricsManager.recordBytesFetched(topicName1, 2);
+        metricsManager.recordBytesFetched(topicName2, 1);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordBytesFetched(topicName1, 10);
+        metricsManager.recordBytesFetched(topicName2, 5);
+
+        assertEquals(6, metricValue(metricsRegistry.topicFetchSizeAvg, tags1), 
EPSILON);
+        assertEquals(10, metricValue(metricsRegistry.topicFetchSizeMax, 
tags1), EPSILON);
+        assertEquals(3, metricValue(metricsRegistry.topicFetchSizeAvg, tags2), 
EPSILON);
+        assertEquals(5, metricValue(metricsRegistry.topicFetchSizeMax, tags2), 
EPSILON);
+    }
+
+    @Test
+    public void testRecordsFetched() {
+        metricsManager.recordRecordsFetched(3);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordRecordsFetched(15);
+
+        assertEquals(9, metricValue(metricsRegistry.recordsPerRequestAvg), 
EPSILON);
+    }
+
+    @Test
+    public void testRecordsFetchedTopic() {
+        String topicName1 = TOPIC_NAME;
+        String topicName2 = "another-topic";
+        Map<String, String> tags1 = topicTags(topicName1);
+        Map<String, String> tags2 = topicTags(topicName2);
+
+        metricsManager.recordRecordsFetched(topicName1, 2);
+        metricsManager.recordRecordsFetched(topicName2, 1);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordRecordsFetched(topicName1, 10);
+        metricsManager.recordRecordsFetched(topicName2, 5);
+
+        assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags1), EPSILON);
+        assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg, 
tags2), EPSILON);
+    }
+
+    @Test
+    public void testPartitionLag() {
+        Map<String, String> tags = topicPartitionTags(TP);
+        metricsManager.recordPartitionLag(TP, 14);
+        metricsManager.recordPartitionLag(TP, 8);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLag(TP, 5);
+
+        assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+        assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag, 
tags), EPSILON);
+        assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax, 
tags), EPSILON);
+        assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg, 
tags), EPSILON);
+    }
+
+    @Test
+    public void testPartitionLead() {
+        Map<String, String> tags = topicPartitionTags(TP);
+        metricsManager.recordPartitionLead(TP, 15);
+        metricsManager.recordPartitionLead(TP, 11);
+        time.sleep(metrics.config().timeWindowMs() + 1);
+        metricsManager.recordPartitionLead(TP, 13);
+
+        assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead, 
tags), EPSILON);
+        assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin, 
tags), EPSILON);
+        assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg, 
tags), EPSILON);
+    }
+
+    private double metricValue(MetricNameTemplate name) {
+        MetricName metricName = metrics.metricInstance(name);
+        KafkaMetric metric = metrics.metric(metricName);
+        return (Double) metric.metricValue();
+    }
+
+    private double metricValue(MetricNameTemplate name, Map<String, String> 
tags) {
+        MetricName metricName = metrics.metricInstance(name, tags);
+        KafkaMetric metric = metrics.metric(metricName);
+        return (Double) metric.metricValue();
+    }
+
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a22739e1413..e60edbfb6c1 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -175,7 +175,7 @@ public class FetcherTest {
     private MockTime time = new MockTime(1);
     private SubscriptionState subscriptions;
     private ConsumerMetadata metadata;
-    private FetcherMetricsRegistry metricsRegistry;
+    private FetchMetricsRegistry metricsRegistry;
     private MockClient client;
     private Metrics metrics;
     private ApiVersions apiVersions = new ApiVersions();
@@ -3686,7 +3686,7 @@ public class FetcherTest {
         metrics = new Metrics(metricConfig, time);
         consumerClient = spy(new ConsumerNetworkClient(logContext, client, 
metadata, time,
                 100, 1000, Integer.MAX_VALUE));
-        metricsRegistry = new 
FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId);
+        metricsRegistry = new 
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId);
     }
 
     private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> 
records) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index d57acc243e1..ffaeab4709d 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -1244,7 +1244,7 @@ public class OffsetFetcherTest {
 
         buildFetcher(metricConfig, isolationLevel, metadataExpireMs, 
subscriptionState, logContext);
 
-        FetcherMetricsRegistry metricsRegistry = new 
FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
+        FetchMetricsRegistry metricsRegistry = new 
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
         Fetcher<byte[], byte[]> fetcher = new Fetcher<>(
                 logContext,
                 consumerClient,

Reply via email to