This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c74c0f2facd KAFKA-14758: Extract inner classes from Fetcher for reuse
in refactoring (#13301)
c74c0f2facd is described below
commit c74c0f2facde2b392ab745144d6ad520575ab9ef
Author: Kirk True <[email protected]>
AuthorDate: Fri Mar 10 10:17:14 2023 -0800
KAFKA-14758: Extract inner classes from Fetcher for reuse in refactoring
(#13301)
The Fetcher class is used internally by the KafkaConsumer to fetch records
from the brokers. There is ongoing work to create a new consumer implementation
with a significantly refactored threading model. The threading refactor work
requires a similarly refactored Fetcher.
This task includes refactoring Fetcher by extracting out the inner classes
into top-level (though still in internal) so that those classes can be
referenced by forthcoming refactored fetch logic.
Reviewers: Philip Nee <[email protected]>, Guozhang Wang
<[email protected]>
---
.../kafka/clients/consumer/KafkaConsumer.java | 4 +-
.../clients/consumer/internals/CompletedFetch.java | 365 ++++++++++++++
.../consumer/internals/ConsumerMetrics.java | 4 +-
.../consumer/internals/FetchMetricsAggregator.java | 95 ++++
.../consumer/internals/FetchMetricsManager.java | 203 ++++++++
...ricsRegistry.java => FetchMetricsRegistry.java} | 8 +-
.../kafka/clients/consumer/internals/Fetcher.java | 557 ++-------------------
.../clients/consumer/internals/SensorBuilder.java | 115 +++++
.../consumer/internals/CompletedFetchTest.java | 304 +++++++++++
.../internals/FetchMetricsManagerTest.java | 171 +++++++
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../consumer/internals/OffsetFetcherTest.java | 2 +-
12 files changed, 1300 insertions(+), 532 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3f966121d69..11ac675cb4b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -28,7 +28,7 @@ import
org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.Fetcher;
-import org.apache.kafka.clients.consumer.internals.FetcherMetricsRegistry;
+import org.apache.kafka.clients.consumer.internals.FetchMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.KafkaConsumerMetrics;
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.OffsetFetcher;
@@ -737,7 +737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
this.metadata.bootstrap(addresses);
String metricGrpPrefix = "consumer";
- FetcherMetricsRegistry metricsRegistry = new
FetcherMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG),
metricGrpPrefix);
+ FetchMetricsRegistry metricsRegistry = new
FetchMetricsRegistry(Collections.singleton(CLIENT_ID_METRIC_TAG),
metricGrpPrefix);
ChannelBuilder channelBuilder =
ClientUtils.createChannelBuilder(config, time, logContext);
this.isolationLevel = IsolationLevel.valueOf(
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
new file mode 100644
index 00000000000..6a11b846810
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CompletedFetch.java
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.CloseableIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * {@link CompletedFetch} represents a {@link RecordBatch batch} of {@link
Record records} that was returned from the
+ * broker via a {@link FetchRequest}. It contains logic to maintain state
between calls to {@link #fetchRecords(int)}.
+ *
+ * @param <K> Record key type
+ * @param <V> Record value type
+ */
+class CompletedFetch<K, V> {
+
+ final TopicPartition partition;
+ final FetchResponseData.PartitionData partitionData;
+ final short requestVersion;
+
+ long nextFetchOffset;
+ Optional<Integer> lastEpoch;
+ boolean isConsumed = false;
+ boolean initialized = false;
+
+ private final Logger log;
+ private final SubscriptionState subscriptions;
+ private final boolean checkCrcs;
+ private final BufferSupplier decompressionBufferSupplier;
+ private final Deserializer<K> keyDeserializer;
+ private final Deserializer<V> valueDeserializer;
+ private final IsolationLevel isolationLevel;
+ private final Iterator<? extends RecordBatch> batches;
+ private final Set<Long> abortedProducerIds;
+ private final PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions;
+ private final FetchMetricsAggregator metricAggregator;
+
+ private int recordsRead;
+ private int bytesRead;
+ private RecordBatch currentBatch;
+ private Record lastRecord;
+ private CloseableIterator<Record> records;
+ private Exception cachedRecordException = null;
+ private boolean corruptLastRecord = false;
+
+ CompletedFetch(LogContext logContext,
+ SubscriptionState subscriptions,
+ boolean checkCrcs,
+ BufferSupplier decompressionBufferSupplier,
+ Deserializer<K> keyDeserializer,
+ Deserializer<V> valueDeserializer,
+ IsolationLevel isolationLevel,
+ TopicPartition partition,
+ FetchResponseData.PartitionData partitionData,
+ FetchMetricsAggregator metricAggregator,
+ Long fetchOffset,
+ short requestVersion) {
+ this.log = logContext.logger(CompletedFetch.class);
+ this.subscriptions = subscriptions;
+ this.checkCrcs = checkCrcs;
+ this.decompressionBufferSupplier = decompressionBufferSupplier;
+ this.keyDeserializer = keyDeserializer;
+ this.valueDeserializer = valueDeserializer;
+ this.isolationLevel = isolationLevel;
+ this.partition = partition;
+ this.partitionData = partitionData;
+ this.metricAggregator = metricAggregator;
+ this.batches =
FetchResponse.recordsOrFail(partitionData).batches().iterator();
+ this.nextFetchOffset = fetchOffset;
+ this.requestVersion = requestVersion;
+ this.lastEpoch = Optional.empty();
+ this.abortedProducerIds = new HashSet<>();
+ this.abortedTransactions = abortedTransactions(partitionData);
+ }
+
+ /**
+ * After each partition is parsed, we update the current metric totals
with the total bytes
+ * and number of records parsed. After all partitions have reported, we
write the metric.
+ */
+ void recordAggregatedMetrics(int bytes, int records) {
+ metricAggregator.record(partition, bytes, records);
+ }
+
+ /**
+ * Draining a {@link CompletedFetch} will signal that the data has been
consumed and the underlying resources
+ * are closed. This is somewhat analogous to {@link Closeable#close()
closing}, though no error will result if a
+ * caller invokes {@link #fetchRecords(int)}; an empty {@link List list}
will be returned instead.
+ */
+ void drain() {
+ if (!isConsumed) {
+ maybeCloseRecordStream();
+ cachedRecordException = null;
+ this.isConsumed = true;
+ recordAggregatedMetrics(bytesRead, recordsRead);
+
+ // we move the partition to the end if we received some bytes.
This way, it's more likely that partitions
+ // for the same topic can remain together (allowing for more
efficient serialization).
+ if (bytesRead > 0)
+ subscriptions.movePartitionToEnd(partition);
+ }
+ }
+
+ private void maybeEnsureValid(RecordBatch batch) {
+ if (checkCrcs && batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
+ try {
+ batch.ensureValid();
+ } catch (CorruptRecordException e) {
+ throw new KafkaException("Record batch for partition " +
partition + " at offset " +
+ batch.baseOffset() + " is invalid, cause: " +
e.getMessage());
+ }
+ }
+ }
+
+ private void maybeEnsureValid(Record record) {
+ if (checkCrcs) {
+ try {
+ record.ensureValid();
+ } catch (CorruptRecordException e) {
+ throw new KafkaException("Record for partition " + partition +
" at offset " + record.offset()
+ + " is invalid, cause: " + e.getMessage());
+ }
+ }
+ }
+
+ private void maybeCloseRecordStream() {
+ if (records != null) {
+ records.close();
+ records = null;
+ }
+ }
+
+ private Record nextFetchedRecord() {
+ while (true) {
+ if (records == null || !records.hasNext()) {
+ maybeCloseRecordStream();
+
+ if (!batches.hasNext()) {
+ // Message format v2 preserves the last offset in a batch
even if the last record is removed
+ // through compaction. By using the next offset computed
from the last offset in the batch,
+ // we ensure that the offset of the next fetch will point
to the next batch, which avoids
+ // unnecessary re-fetching of the same batch (in the worst
case, the consumer could get stuck
+ // fetching the same batch repeatedly).
+ if (currentBatch != null)
+ nextFetchOffset = currentBatch.nextOffset();
+ drain();
+ return null;
+ }
+
+ currentBatch = batches.next();
+ lastEpoch =
maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
+ maybeEnsureValid(currentBatch);
+
+ if (isolationLevel == IsolationLevel.READ_COMMITTED &&
currentBatch.hasProducerId()) {
+ // remove from the aborted transaction queue all aborted
transactions which have begun
+ // before the current batch's last offset and add the
associated producerIds to the
+ // aborted producer set
+ consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
+
+ long producerId = currentBatch.producerId();
+ if (containsAbortMarker(currentBatch)) {
+ abortedProducerIds.remove(producerId);
+ } else if (isBatchAborted(currentBatch)) {
+ log.debug("Skipping aborted record batch from
partition {} with producerId {} and " +
+ "offsets {} to {}",
+ partition, producerId,
currentBatch.baseOffset(), currentBatch.lastOffset());
+ nextFetchOffset = currentBatch.nextOffset();
+ continue;
+ }
+ }
+
+ records =
currentBatch.streamingIterator(decompressionBufferSupplier);
+ } else {
+ Record record = records.next();
+ // skip any records out of range
+ if (record.offset() >= nextFetchOffset) {
+ // we only do validation when the message should not be
skipped.
+ maybeEnsureValid(record);
+
+ // control records are not returned to the user
+ if (!currentBatch.isControlBatch()) {
+ return record;
+ } else {
+ // Increment the next fetch offset when we skip a
control batch.
+ nextFetchOffset = record.offset() + 1;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * The {@link RecordBatch batch} of {@link Record records} is converted to
a {@link List list} of
+ * {@link ConsumerRecord consumer records} and returned. {@link
BufferSupplier Decompression} and
+ * {@link Deserializer deserialization} of the {@link Record record's} key
and value are performed in
+ * this step.
+ *
+ * @param maxRecords The number of records to return; the number returned
may be {@code 0 <= maxRecords}
+ * @return {@link ConsumerRecord Consumer records}
+ */
+ List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
+ // Error when fetching the next record before deserialization.
+ if (corruptLastRecord)
+ throw new KafkaException("Received exception when fetching the
next record from " + partition
+ + ". If needed, please seek past the record to "
+ + "continue consumption.", cachedRecordException);
+
+ if (isConsumed)
+ return Collections.emptyList();
+
+ List<ConsumerRecord<K, V>> records = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < maxRecords; i++) {
+ // Only move to next record if there was no exception in the
last fetch. Otherwise, we should
+ // use the last record to do deserialization again.
+ if (cachedRecordException == null) {
+ corruptLastRecord = true;
+ lastRecord = nextFetchedRecord();
+ corruptLastRecord = false;
+ }
+
+ if (lastRecord == null)
+ break;
+
+ Optional<Integer> leaderEpoch =
maybeLeaderEpoch(currentBatch.partitionLeaderEpoch());
+ TimestampType timestampType = currentBatch.timestampType();
+ ConsumerRecord<K, V> record = parseRecord(partition,
leaderEpoch, timestampType, lastRecord);
+ records.add(record);
+ recordsRead++;
+ bytesRead += lastRecord.sizeInBytes();
+ nextFetchOffset = lastRecord.offset() + 1;
+ // In some cases, the deserialization may have thrown an
exception and the retry may succeed,
+ // we allow user to move forward in this case.
+ cachedRecordException = null;
+ }
+ } catch (SerializationException se) {
+ cachedRecordException = se;
+ if (records.isEmpty())
+ throw se;
+ } catch (KafkaException e) {
+ cachedRecordException = e;
+ if (records.isEmpty())
+ throw new KafkaException("Received exception when fetching the
next record from " + partition
+ + ". If needed, please seek past the record to "
+ + "continue consumption.", e);
+ }
+ return records;
+ }
+
+ /**
+ * Parse the record entry, deserializing the key / value fields if
necessary
+ */
+ ConsumerRecord<K, V> parseRecord(TopicPartition partition,
+ Optional<Integer> leaderEpoch,
+ TimestampType timestampType,
+ Record record) {
+ try {
+ long offset = record.offset();
+ long timestamp = record.timestamp();
+ Headers headers = new RecordHeaders(record.headers());
+ ByteBuffer keyBytes = record.key();
+ byte[] keyByteArray = keyBytes == null ? null :
org.apache.kafka.common.utils.Utils.toArray(keyBytes);
+ K key = keyBytes == null ? null :
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
+ ByteBuffer valueBytes = record.value();
+ byte[] valueByteArray = valueBytes == null ? null :
Utils.toArray(valueBytes);
+ V value = valueBytes == null ? null :
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
+ return new ConsumerRecord<>(partition.topic(),
partition.partition(), offset,
+ timestamp, timestampType,
+ keyByteArray == null ? ConsumerRecord.NULL_SIZE :
keyByteArray.length,
+ valueByteArray == null ? ConsumerRecord.NULL_SIZE :
valueByteArray.length,
+ key, value, headers, leaderEpoch);
+ } catch (RuntimeException e) {
+ throw new RecordDeserializationException(partition,
record.offset(),
+ "Error deserializing key/value for partition " + partition
+
+ " at offset " + record.offset() + ". If needed,
please seek past the record to continue consumption.", e);
+ }
+ }
+
+ private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
+ return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
+ }
+
+ private void consumeAbortedTransactionsUpTo(long offset) {
+ if (abortedTransactions == null)
+ return;
+
+ while (!abortedTransactions.isEmpty() &&
abortedTransactions.peek().firstOffset() <= offset) {
+ FetchResponseData.AbortedTransaction abortedTransaction =
abortedTransactions.poll();
+ abortedProducerIds.add(abortedTransaction.producerId());
+ }
+ }
+
+ private boolean isBatchAborted(RecordBatch batch) {
+ return batch.isTransactional() &&
abortedProducerIds.contains(batch.producerId());
+ }
+
+ private PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions(FetchResponseData.PartitionData partition) {
+ if (partition.abortedTransactions() == null ||
partition.abortedTransactions().isEmpty())
+ return null;
+
+ PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions = new PriorityQueue<>(
+ partition.abortedTransactions().size(),
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
+ );
+ abortedTransactions.addAll(partition.abortedTransactions());
+ return abortedTransactions;
+ }
+
+ private boolean containsAbortMarker(RecordBatch batch) {
+ if (!batch.isControlBatch())
+ return false;
+
+ Iterator<Record> batchIterator = batch.iterator();
+ if (!batchIterator.hasNext())
+ return false;
+
+ Record firstRecord = batchIterator.next();
+ return ControlRecordType.ABORT ==
ControlRecordType.parse(firstRecord.key());
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
index e58db82ee3c..e119153af01 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMetrics.java
@@ -26,10 +26,10 @@ import org.apache.kafka.common.metrics.Metrics;
public class ConsumerMetrics {
- public FetcherMetricsRegistry fetcherMetrics;
+ public FetchMetricsRegistry fetcherMetrics;
public ConsumerMetrics(Set<String> metricsTags, String metricGrpPrefix) {
- this.fetcherMetrics = new FetcherMetricsRegistry(metricsTags,
metricGrpPrefix);
+ this.fetcherMetrics = new FetchMetricsRegistry(metricsTags,
metricGrpPrefix);
}
public ConsumerMetrics(String metricGroupPrefix) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
new file mode 100644
index 00000000000..4fe8199734d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsAggregator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Since we parse the message data for each partition from each fetch response
lazily, fetch-level
+ * metrics need to be aggregated as the messages from each partition are
parsed. This class is used
+ * to facilitate this incremental aggregation.
+ */
+class FetchMetricsAggregator {
+
+ private final FetchMetricsManager metricsManager;
+ private final Set<TopicPartition> unrecordedPartitions;
+ private final FetchMetrics fetchFetchMetrics = new FetchMetrics();
+ private final Map<String, FetchMetrics> perTopicFetchMetrics = new
HashMap<>();
+
+ FetchMetricsAggregator(FetchMetricsManager metricsManager,
Set<TopicPartition> partitions) {
+ this.metricsManager = metricsManager;
+ this.unrecordedPartitions = new HashSet<>(partitions);
+ }
+
+ /**
+ * After each partition is parsed, we update the current metric totals
with the total bytes
+ * and number of records parsed. After all partitions have reported, we
write the metric.
+ */
+ void record(TopicPartition partition, int bytes, int records) {
+ // Aggregate the metrics at the fetch level
+ fetchFetchMetrics.increment(bytes, records);
+
+ // Also aggregate the metrics on a per-topic basis.
+ perTopicFetchMetrics.computeIfAbsent(partition.topic(), t -> new
FetchMetrics())
+ .increment(bytes, records);
+
+ maybeRecordMetrics(partition);
+ }
+
+ /**
+ * Once we've detected that all of the {@link TopicPartition partitions}
for the fetch have been handled, we
+ * can then record the aggregated metrics values. This is done at the
fetch level and on a per-topic basis.
+ *
+ * @param partition {@link TopicPartition}
+ */
+ private void maybeRecordMetrics(TopicPartition partition) {
+ unrecordedPartitions.remove(partition);
+
+ if (!unrecordedPartitions.isEmpty())
+ return;
+
+ // Record the metrics aggregated at the fetch level.
+ metricsManager.recordBytesFetched(fetchFetchMetrics.bytes);
+ metricsManager.recordRecordsFetched(fetchFetchMetrics.records);
+
+ // Also record the metrics aggregated on a per-topic basis.
+ for (Map.Entry<String, FetchMetrics> entry:
perTopicFetchMetrics.entrySet()) {
+ String topic = entry.getKey();
+ FetchMetrics fetchMetrics = entry.getValue();
+ metricsManager.recordBytesFetched(topic, fetchMetrics.bytes);
+ metricsManager.recordRecordsFetched(topic, fetchMetrics.records);
+ }
+ }
+
+ private static class FetchMetrics {
+
+ private int bytes;
+ private int records;
+
+ private void increment(int bytes, int records) {
+ this.bytes += bytes;
+ this.records += records;
+ }
+
+ }
+
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
new file mode 100644
index 00000000000..63bd7650701
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManager.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Gauge;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.WindowedCount;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The {@link FetchMetricsManager} class provides wrapper methods to record
lag, lead, latency, and fetch metrics.
+ * It keeps an internal ID of the assigned set of partitions which is updated
to ensure the set of metrics it
+ * records matches up with the topic-partitions in use.
+ */
+class FetchMetricsManager {
+
+ private final Metrics metrics;
+ private final FetchMetricsRegistry metricsRegistry;
+ private final Sensor bytesFetched;
+ private final Sensor recordsFetched;
+ private final Sensor fetchLatency;
+ private final Sensor recordsLag;
+ private final Sensor recordsLead;
+
+ private int assignmentId = 0;
+ private Set<TopicPartition> assignedPartitions = Collections.emptySet();
+
+ FetchMetricsManager(Metrics metrics, FetchMetricsRegistry metricsRegistry)
{
+ this.metrics = metrics;
+ this.metricsRegistry = metricsRegistry;
+
+ this.bytesFetched = new SensorBuilder(metrics, "bytes-fetched")
+ .withAvg(metricsRegistry.fetchSizeAvg)
+ .withMax(metricsRegistry.fetchSizeMax)
+ .withMeter(metricsRegistry.bytesConsumedRate,
metricsRegistry.bytesConsumedTotal)
+ .build();
+ this.recordsFetched = new SensorBuilder(metrics, "records-fetched")
+ .withAvg(metricsRegistry.recordsPerRequestAvg)
+ .withMeter(metricsRegistry.recordsConsumedRate,
metricsRegistry.recordsConsumedTotal)
+ .build();
+ this.fetchLatency = new SensorBuilder(metrics, "fetch-latency")
+ .withAvg(metricsRegistry.fetchLatencyAvg)
+ .withMax(metricsRegistry.fetchLatencyMax)
+ .withMeter(new WindowedCount(),
metricsRegistry.fetchRequestRate, metricsRegistry.fetchRequestTotal)
+ .build();
+ this.recordsLag = new SensorBuilder(metrics, "records-lag")
+ .withMax(metricsRegistry.recordsLagMax)
+ .build();
+ this.recordsLead = new SensorBuilder(metrics, "records-lead")
+ .withMin(metricsRegistry.recordsLeadMin)
+ .build();
+ }
+
+ void recordLatency(long requestLatencyMs) {
+ fetchLatency.record(requestLatencyMs);
+ }
+
+ void recordBytesFetched(int bytes) {
+ bytesFetched.record(bytes);
+ }
+
+ void recordRecordsFetched(int records) {
+ recordsFetched.record(records);
+ }
+
+ void recordBytesFetched(String topic, int bytes) {
+ String name = topicBytesFetchedMetricName(topic);
+ Sensor bytesFetched = new SensorBuilder(metrics, name, () ->
topicTags(topic))
+ .withAvg(metricsRegistry.topicFetchSizeAvg)
+ .withMax(metricsRegistry.topicFetchSizeMax)
+ .withMeter(metricsRegistry.topicBytesConsumedRate,
metricsRegistry.topicBytesConsumedTotal)
+ .build();
+ bytesFetched.record(bytes);
+ }
+
+ void recordRecordsFetched(String topic, int records) {
+ String name = topicRecordsFetchedMetricName(topic);
+ Sensor recordsFetched = new SensorBuilder(metrics, name, () ->
topicTags(topic))
+ .withAvg(metricsRegistry.topicRecordsPerRequestAvg)
+ .withMeter(metricsRegistry.topicRecordsConsumedRate,
metricsRegistry.topicRecordsConsumedTotal)
+ .build();
+ recordsFetched.record(records);
+ }
+
+ void recordPartitionLag(TopicPartition tp, long lag) {
+ this.recordsLag.record(lag);
+
+ String name = partitionRecordsLagMetricName(tp);
+ Sensor recordsLag = new SensorBuilder(metrics, name, () ->
topicPartitionTags(tp))
+ .withValue(metricsRegistry.partitionRecordsLag)
+ .withMax(metricsRegistry.partitionRecordsLagMax)
+ .withAvg(metricsRegistry.partitionRecordsLagAvg)
+ .build();
+
+ recordsLag.record(lag);
+ }
+
+ void recordPartitionLead(TopicPartition tp, long lead) {
+ this.recordsLead.record(lead);
+
+ String name = partitionRecordsLeadMetricName(tp);
+ Sensor recordsLead = new SensorBuilder(metrics, name, () ->
topicPartitionTags(tp))
+ .withValue(metricsRegistry.partitionRecordsLead)
+ .withMin(metricsRegistry.partitionRecordsLeadMin)
+ .withAvg(metricsRegistry.partitionRecordsLeadAvg)
+ .build();
+
+ recordsLead.record(lead);
+ }
+
+ /**
+ * This method is called by the {@link Fetch fetch} logic before it
requests fetches in order to update the
+ * internal set of metrics that are tracked.
+ *
+ * @param subscription {@link SubscriptionState} that contains the set of
assigned partitions
+ * @see SubscriptionState#assignmentId()
+ */
+ void maybeUpdateAssignment(SubscriptionState subscription) {
+ int newAssignmentId = subscription.assignmentId();
+
+ if (this.assignmentId != newAssignmentId) {
+ Set<TopicPartition> newAssignedPartitions =
subscription.assignedPartitions();
+
+ for (TopicPartition tp : this.assignedPartitions) {
+ if (!newAssignedPartitions.contains(tp)) {
+ metrics.removeSensor(partitionRecordsLagMetricName(tp));
+ metrics.removeSensor(partitionRecordsLeadMetricName(tp));
+
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
+ }
+ }
+
+ for (TopicPartition tp : newAssignedPartitions) {
+ if (!this.assignedPartitions.contains(tp)) {
+ MetricName metricName =
partitionPreferredReadReplicaMetricName(tp);
+ metrics.addMetricIfAbsent(
+ metricName,
+ null,
+ (Gauge<Integer>) (config, now) ->
subscription.preferredReadReplica(tp, 0L).orElse(-1)
+ );
+ }
+ }
+
+ this.assignedPartitions = newAssignedPartitions;
+ this.assignmentId = newAssignmentId;
+ }
+ }
+
+ static String topicBytesFetchedMetricName(String topic) {
+ return "topic." + topic + ".bytes-fetched";
+ }
+
+ private static String topicRecordsFetchedMetricName(String topic) {
+ return "topic." + topic + ".records-fetched";
+ }
+
+ private static String partitionRecordsLeadMetricName(TopicPartition tp) {
+ return tp + ".records-lead";
+ }
+
+ private static String partitionRecordsLagMetricName(TopicPartition tp) {
+ return tp + ".records-lag";
+ }
+
+ private MetricName partitionPreferredReadReplicaMetricName(TopicPartition
tp) {
+ Map<String, String> metricTags = topicPartitionTags(tp);
+ return
this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica,
metricTags);
+ }
+
+ static Map<String, String> topicTags(String topic) {
+ Map<String, String> metricTags = new HashMap<>(1);
+ metricTags.put("topic", topic.replace('.', '_'));
+ return metricTags;
+ }
+
+ static Map<String, String> topicPartitionTags(TopicPartition tp) {
+ Map<String, String> metricTags = new HashMap<>(2);
+ metricTags.put("topic", tp.topic().replace('.', '_'));
+ metricTags.put("partition", String.valueOf(tp.partition()));
+ return metricTags;
+ }
+
+}
\ No newline at end of file
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
similarity index 97%
rename from
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
rename to
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
index f76a92462d5..fc4ac1d665e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetcherMetricsRegistry.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchMetricsRegistry.java
@@ -24,7 +24,7 @@ import java.util.Set;
import org.apache.kafka.common.MetricNameTemplate;
-public class FetcherMetricsRegistry {
+public class FetchMetricsRegistry {
public MetricNameTemplate fetchSizeAvg;
public MetricNameTemplate fetchSizeMax;
@@ -56,15 +56,15 @@ public class FetcherMetricsRegistry {
public MetricNameTemplate partitionRecordsLeadAvg;
public MetricNameTemplate partitionPreferredReadReplica;
- public FetcherMetricsRegistry() {
+ public FetchMetricsRegistry() {
this(new HashSet<String>(), "");
}
- public FetcherMetricsRegistry(String metricGrpPrefix) {
+ public FetchMetricsRegistry(String metricGrpPrefix) {
this(new HashSet<String>(), metricGrpPrefix);
}
- public FetcherMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
+ public FetchMetricsRegistry(Set<String> tags, String metricGrpPrefix) {
/***** Client level *****/
String groupName = metricGrpPrefix + "-fetch-manager-metrics";
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index ae7973aa808..29cd7972cc6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -25,38 +25,23 @@ import
org.apache.kafka.clients.consumer.internals.SubscriptionState.FetchPositi
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.CorruptRecordException;
-import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.RecordTooLargeException;
-import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.header.Headers;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.FetchResponseData;
-import org.apache.kafka.common.metrics.Gauge;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.Min;
-import org.apache.kafka.common.metrics.stats.Value;
-import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.BufferSupplier;
-import org.apache.kafka.common.record.ControlRecordType;
-import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
@@ -65,12 +50,10 @@ import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;
import java.io.Closeable;
-import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -78,7 +61,6 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -116,9 +98,9 @@ public class Fetcher<K, V> implements Closeable {
private final boolean checkCrcs;
private final String clientRackId;
private final ConsumerMetadata metadata;
- private final FetchManagerMetrics sensors;
+ private final FetchMetricsManager metricsManager;
private final SubscriptionState subscriptions;
- private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
+ private final ConcurrentLinkedQueue<CompletedFetch<K, V>> completedFetches;
private final BufferSupplier decompressionBufferSupplier =
BufferSupplier.create();
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
@@ -126,7 +108,7 @@ public class Fetcher<K, V> implements Closeable {
private final Map<Integer, FetchSessionHandler> sessionHandlers;
private final Set<Integer> nodesWithPendingFetchRequests;
private final AtomicBoolean isClosed = new AtomicBoolean(false);
- private CompletedFetch nextInLineFetch = null;
+ private CompletedFetch<K, V> nextInLineFetch = null;
public Fetcher(LogContext logContext,
ConsumerNetworkClient client,
@@ -142,7 +124,7 @@ public class Fetcher<K, V> implements Closeable {
ConsumerMetadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
- FetcherMetricsRegistry metricsRegistry,
+ FetchMetricsRegistry metricsRegistry,
Time time,
IsolationLevel isolationLevel) {
this.log = logContext.logger(Fetcher.class);
@@ -161,7 +143,7 @@ public class Fetcher<K, V> implements Closeable {
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.completedFetches = new ConcurrentLinkedQueue<>();
- this.sensors = new FetchManagerMetrics(metrics, metricsRegistry);
+ this.metricsManager = new FetchMetricsManager(metrics,
metricsRegistry);
this.isolationLevel = isolationLevel;
this.sessionHandlers = new HashMap<>();
this.nodesWithPendingFetchRequests = new HashSet<>();
@@ -191,7 +173,7 @@ public class Fetcher<K, V> implements Closeable {
*/
public synchronized int sendFetches() {
// Update metrics in case there was an assignment change
- sensors.maybeUpdateAssignment(subscriptions);
+ metricsManager.maybeUpdateAssignment(subscriptions);
Map<Node, FetchSessionHandler.FetchRequestData> fetchRequestMap =
prepareFetchRequests();
for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry :
fetchRequestMap.entrySet()) {
@@ -224,7 +206,7 @@ public class Fetcher<K, V> implements Closeable {
Map<TopicPartition,
FetchResponseData.PartitionData> responseData =
response.responseData(handler.sessionTopicNames(),
resp.requestHeader().apiVersion());
Set<TopicPartition> partitions = new
HashSet<>(responseData.keySet());
- FetchResponseMetricAggregator metricAggregator =
new FetchResponseMetricAggregator(sensors, partitions);
+ FetchMetricsAggregator metricAggregator = new
FetchMetricsAggregator(metricsManager, partitions);
for (Map.Entry<TopicPartition,
FetchResponseData.PartitionData> entry : responseData.entrySet()) {
TopicPartition partition = entry.getKey();
@@ -245,20 +227,29 @@ public class Fetcher<K, V> implements Closeable {
throw new IllegalStateException(message);
} else {
long fetchOffset = requestData.fetchOffset;
+ short requestVersion =
resp.requestHeader().apiVersion();
FetchResponseData.PartitionData
partitionData = entry.getValue();
log.debug("Fetch {} at offset {} for
partition {} returned fetch data {}",
isolationLevel, fetchOffset,
partition, partitionData);
- Iterator<? extends RecordBatch> batches =
FetchResponse.recordsOrFail(partitionData).batches().iterator();
- short responseVersion =
resp.requestHeader().apiVersion();
-
- completedFetches.add(new
CompletedFetch(partition, partitionData,
- metricAggregator, batches,
fetchOffset, responseVersion));
+ CompletedFetch<K, V> completedFetch = new
CompletedFetch<>(logContext,
+ subscriptions,
+ checkCrcs,
+ decompressionBufferSupplier,
+ keyDeserializer,
+ valueDeserializer,
+ isolationLevel,
+ partition,
+ partitionData,
+ metricAggregator,
+ fetchOffset,
+ requestVersion);
+ completedFetches.add(completedFetch);
}
}
-
sensors.fetchLatency.record(resp.requestLatencyMs());
+
metricsManager.recordLatency(resp.requestLatencyMs());
} finally {
nodesWithPendingFetchRequests.remove(fetchTarget.id());
}
@@ -328,16 +319,16 @@ public class Fetcher<K, V> implements Closeable {
*/
public Fetch<K, V> collectFetch() {
Fetch<K, V> fetch = Fetch.empty();
- Queue<CompletedFetch> pausedCompletedFetches = new ArrayDeque<>();
+ Queue<CompletedFetch<K, V>> pausedCompletedFetches = new
ArrayDeque<>();
int recordsRemaining = maxPollRecords;
try {
while (recordsRemaining > 0) {
if (nextInLineFetch == null || nextInLineFetch.isConsumed) {
- CompletedFetch records = completedFetches.peek();
+ CompletedFetch<K, V> records = completedFetches.peek();
if (records == null) break;
- if (records.notInitialized()) {
+ if (!records.initialized) {
try {
nextInLineFetch =
initializeCompletedFetch(records);
} catch (Exception e) {
@@ -380,7 +371,7 @@ public class Fetcher<K, V> implements Closeable {
return fetch;
}
- private Fetch<K, V> fetchRecords(CompletedFetch completedFetch, int
maxRecords) {
+ private Fetch<K, V> fetchRecords(CompletedFetch<K, V> completedFetch, int
maxRecords) {
if (!subscriptions.isAssigned(completedFetch.partition)) {
// this can happen when a rebalance happened before fetched
records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it
is no longer assigned",
@@ -417,11 +408,11 @@ public class Fetcher<K, V> implements Closeable {
Long partitionLag =
subscriptions.partitionLag(completedFetch.partition, isolationLevel);
if (partitionLag != null)
- this.sensors.recordPartitionLag(completedFetch.partition,
partitionLag);
+
this.metricsManager.recordPartitionLag(completedFetch.partition, partitionLag);
Long lead =
subscriptions.partitionLead(completedFetch.partition);
if (lead != null) {
- this.sensors.recordPartitionLead(completedFetch.partition,
lead);
+
this.metricsManager.recordPartitionLead(completedFetch.partition, lead);
}
return Fetch.forPartition(completedFetch.partition,
partRecords, positionAdvanced);
@@ -444,7 +435,7 @@ public class Fetcher<K, V> implements Closeable {
if (nextInLineFetch != null && !nextInLineFetch.isConsumed) {
exclude.add(nextInLineFetch.partition);
}
- for (CompletedFetch completedFetch : completedFetches) {
+ for (CompletedFetch<K, V> completedFetch : completedFetches) {
exclude.add(completedFetch.partition);
}
return subscriptions.fetchablePartitions(tp -> !exclude.contains(tp));
@@ -537,11 +528,11 @@ public class Fetcher<K, V> implements Closeable {
/**
* Initialize a CompletedFetch object.
*/
- private CompletedFetch initializeCompletedFetch(CompletedFetch
nextCompletedFetch) {
+ private CompletedFetch<K, V> initializeCompletedFetch(CompletedFetch<K, V>
nextCompletedFetch) {
TopicPartition tp = nextCompletedFetch.partition;
FetchResponseData.PartitionData partition =
nextCompletedFetch.partitionData;
long fetchOffset = nextCompletedFetch.nextFetchOffset;
- CompletedFetch completedFetch = null;
+ CompletedFetch<K, V> completedFetch = null;
Errors error = Errors.forCode(partition.errorCode());
try {
@@ -564,7 +555,7 @@ public class Fetcher<K, V> implements Closeable {
completedFetch = nextCompletedFetch;
if (!batches.hasNext() && FetchResponse.recordsSize(partition)
> 0) {
- if (completedFetch.responseVersion < 3) {
+ if (completedFetch.requestVersion < 3) {
// Implement the pre KIP-74 behavior of throwing a
RecordTooLargeException.
Map<TopicPartition, Long> recordTooLargePartitions =
Collections.singletonMap(tp, fetchOffset);
throw new RecordTooLargeException("There are some
messages at [Partition=Offset]: " +
@@ -660,7 +651,7 @@ public class Fetcher<K, V> implements Closeable {
}
} finally {
if (completedFetch == null)
- nextCompletedFetch.metricAggregator.record(tp, 0, 0);
+ nextCompletedFetch.recordAggregatedMetrics(0, 0);
if (error != Errors.NONE)
// we move the partition to the end if there was an error.
This way, it's more likely that partitions for
@@ -683,49 +674,15 @@ public class Fetcher<K, V> implements Closeable {
}
}
- /**
- * Parse the record entry, deserializing the key / value fields if
necessary
- */
- private ConsumerRecord<K, V> parseRecord(TopicPartition partition,
- RecordBatch batch,
- Record record) {
- try {
- long offset = record.offset();
- long timestamp = record.timestamp();
- Optional<Integer> leaderEpoch =
maybeLeaderEpoch(batch.partitionLeaderEpoch());
- TimestampType timestampType = batch.timestampType();
- Headers headers = new RecordHeaders(record.headers());
- ByteBuffer keyBytes = record.key();
- byte[] keyByteArray = keyBytes == null ? null :
Utils.toArray(keyBytes);
- K key = keyBytes == null ? null :
this.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
- ByteBuffer valueBytes = record.value();
- byte[] valueByteArray = valueBytes == null ? null :
Utils.toArray(valueBytes);
- V value = valueBytes == null ? null :
this.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
- return new ConsumerRecord<>(partition.topic(),
partition.partition(), offset,
- timestamp, timestampType,
- keyByteArray == null ?
ConsumerRecord.NULL_SIZE : keyByteArray.length,
- valueByteArray == null ?
ConsumerRecord.NULL_SIZE : valueByteArray.length,
- key, value, headers, leaderEpoch);
- } catch (RuntimeException e) {
- throw new RecordDeserializationException(partition,
record.offset(),
- "Error deserializing key/value for partition " + partition +
- " at offset " + record.offset() + ". If needed, please
seek past the record to continue consumption.", e);
- }
- }
-
- private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
- return leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH ?
Optional.empty() : Optional.of(leaderEpoch);
- }
-
/**
* Clear the buffered data which are not a part of newly assigned
partitions
*
* @param assignedPartitions newly assigned {@link TopicPartition}
*/
public void
clearBufferedDataForUnassignedPartitions(Collection<TopicPartition>
assignedPartitions) {
- Iterator<CompletedFetch> completedFetchesItr =
completedFetches.iterator();
+ Iterator<CompletedFetch<K, V>> completedFetchesItr =
completedFetches.iterator();
while (completedFetchesItr.hasNext()) {
- CompletedFetch records = completedFetchesItr.next();
+ CompletedFetch<K, V> records = completedFetchesItr.next();
TopicPartition tp = records.partition;
if (!assignedPartitions.contains(tp)) {
records.drain();
@@ -759,7 +716,7 @@ public class Fetcher<K, V> implements Closeable {
return sessionHandlers.get(node);
}
- public static Sensor throttleTimeSensor(Metrics metrics,
FetcherMetricsRegistry metricsRegistry) {
+ public static Sensor throttleTimeSensor(Metrics metrics,
FetchMetricsRegistry metricsRegistry) {
Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
fetchThrottleTimeSensor.add(metrics.metricInstance(metricsRegistry.fetchThrottleTimeAvg),
new Avg());
@@ -768,448 +725,6 @@ public class Fetcher<K, V> implements Closeable {
return fetchThrottleTimeSensor;
}
- private class CompletedFetch {
- private final TopicPartition partition;
- private final Iterator<? extends RecordBatch> batches;
- private final Set<Long> abortedProducerIds;
- private final PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions;
- private final FetchResponseData.PartitionData partitionData;
- private final FetchResponseMetricAggregator metricAggregator;
- private final short responseVersion;
-
- private int recordsRead;
- private int bytesRead;
- private RecordBatch currentBatch;
- private Record lastRecord;
- private CloseableIterator<Record> records;
- private long nextFetchOffset;
- private Optional<Integer> lastEpoch;
- private boolean isConsumed = false;
- private Exception cachedRecordException = null;
- private boolean corruptLastRecord = false;
- private boolean initialized = false;
-
- private CompletedFetch(TopicPartition partition,
- FetchResponseData.PartitionData partitionData,
- FetchResponseMetricAggregator metricAggregator,
- Iterator<? extends RecordBatch> batches,
- Long fetchOffset,
- short responseVersion) {
- this.partition = partition;
- this.partitionData = partitionData;
- this.metricAggregator = metricAggregator;
- this.batches = batches;
- this.nextFetchOffset = fetchOffset;
- this.responseVersion = responseVersion;
- this.lastEpoch = Optional.empty();
- this.abortedProducerIds = new HashSet<>();
- this.abortedTransactions = abortedTransactions(partitionData);
- }
-
- private void drain() {
- if (!isConsumed) {
- maybeCloseRecordStream();
- cachedRecordException = null;
- this.isConsumed = true;
- this.metricAggregator.record(partition, bytesRead,
recordsRead);
-
- // we move the partition to the end if we received some bytes.
This way, it's more likely that partitions
- // for the same topic can remain together (allowing for more
efficient serialization).
- if (bytesRead > 0)
- subscriptions.movePartitionToEnd(partition);
- }
- }
-
- private void maybeEnsureValid(RecordBatch batch) {
- if (checkCrcs && currentBatch.magic() >=
RecordBatch.MAGIC_VALUE_V2) {
- try {
- batch.ensureValid();
- } catch (CorruptRecordException e) {
- throw new KafkaException("Record batch for partition " +
partition + " at offset " +
- batch.baseOffset() + " is invalid, cause: " +
e.getMessage());
- }
- }
- }
-
- private void maybeEnsureValid(Record record) {
- if (checkCrcs) {
- try {
- record.ensureValid();
- } catch (CorruptRecordException e) {
- throw new KafkaException("Record for partition " +
partition + " at offset " + record.offset()
- + " is invalid, cause: " + e.getMessage());
- }
- }
- }
-
- private void maybeCloseRecordStream() {
- if (records != null) {
- records.close();
- records = null;
- }
- }
-
- private Record nextFetchedRecord() {
- while (true) {
- if (records == null || !records.hasNext()) {
- maybeCloseRecordStream();
-
- if (!batches.hasNext()) {
- // Message format v2 preserves the last offset in a
batch even if the last record is removed
- // through compaction. By using the next offset
computed from the last offset in the batch,
- // we ensure that the offset of the next fetch will
point to the next batch, which avoids
- // unnecessary re-fetching of the same batch (in the
worst case, the consumer could get stuck
- // fetching the same batch repeatedly).
- if (currentBatch != null)
- nextFetchOffset = currentBatch.nextOffset();
- drain();
- return null;
- }
-
- currentBatch = batches.next();
- lastEpoch = currentBatch.partitionLeaderEpoch() ==
RecordBatch.NO_PARTITION_LEADER_EPOCH ?
- Optional.empty() :
Optional.of(currentBatch.partitionLeaderEpoch());
-
- maybeEnsureValid(currentBatch);
-
- if (isolationLevel == IsolationLevel.READ_COMMITTED &&
currentBatch.hasProducerId()) {
- // remove from the aborted transaction queue all
aborted transactions which have begun
- // before the current batch's last offset and add the
associated producerIds to the
- // aborted producer set
-
consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
-
- long producerId = currentBatch.producerId();
- if (containsAbortMarker(currentBatch)) {
- abortedProducerIds.remove(producerId);
- } else if (isBatchAborted(currentBatch)) {
- log.debug("Skipping aborted record batch from
partition {} with producerId {} and " +
- "offsets {} to {}",
- partition, producerId,
currentBatch.baseOffset(), currentBatch.lastOffset());
- nextFetchOffset = currentBatch.nextOffset();
- continue;
- }
- }
-
- records =
currentBatch.streamingIterator(decompressionBufferSupplier);
- } else {
- Record record = records.next();
- // skip any records out of range
- if (record.offset() >= nextFetchOffset) {
- // we only do validation when the message should not
be skipped.
- maybeEnsureValid(record);
-
- // control records are not returned to the user
- if (!currentBatch.isControlBatch()) {
- return record;
- } else {
- // Increment the next fetch offset when we skip a
control batch.
- nextFetchOffset = record.offset() + 1;
- }
- }
- }
- }
- }
-
- private List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
- // Error when fetching the next record before deserialization.
- if (corruptLastRecord)
- throw new KafkaException("Received exception when fetching the
next record from " + partition
- + ". If needed, please seek past
the record to "
- + "continue consumption.",
cachedRecordException);
-
- if (isConsumed)
- return Collections.emptyList();
-
- List<ConsumerRecord<K, V>> records = new ArrayList<>();
- try {
- for (int i = 0; i < maxRecords; i++) {
- // Only move to next record if there was no exception in
the last fetch. Otherwise we should
- // use the last record to do deserialization again.
- if (cachedRecordException == null) {
- corruptLastRecord = true;
- lastRecord = nextFetchedRecord();
- corruptLastRecord = false;
- }
- if (lastRecord == null)
- break;
- records.add(parseRecord(partition, currentBatch,
lastRecord));
- recordsRead++;
- bytesRead += lastRecord.sizeInBytes();
- nextFetchOffset = lastRecord.offset() + 1;
- // In some cases, the deserialization may have thrown an
exception and the retry may succeed,
- // we allow user to move forward in this case.
- cachedRecordException = null;
- }
- } catch (SerializationException se) {
- cachedRecordException = se;
- if (records.isEmpty())
- throw se;
- } catch (KafkaException e) {
- cachedRecordException = e;
- if (records.isEmpty())
- throw new KafkaException("Received exception when fetching
the next record from " + partition
- + ". If needed, please seek
past the record to "
- + "continue consumption.", e);
- }
- return records;
- }
-
- private void consumeAbortedTransactionsUpTo(long offset) {
- if (abortedTransactions == null)
- return;
-
- while (!abortedTransactions.isEmpty() &&
abortedTransactions.peek().firstOffset() <= offset) {
- FetchResponseData.AbortedTransaction abortedTransaction =
abortedTransactions.poll();
- abortedProducerIds.add(abortedTransaction.producerId());
- }
- }
-
- private boolean isBatchAborted(RecordBatch batch) {
- return batch.isTransactional() &&
abortedProducerIds.contains(batch.producerId());
- }
-
- private PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions(FetchResponseData.PartitionData partition) {
- if (partition.abortedTransactions() == null ||
partition.abortedTransactions().isEmpty())
- return null;
-
- PriorityQueue<FetchResponseData.AbortedTransaction>
abortedTransactions = new PriorityQueue<>(
- partition.abortedTransactions().size(),
Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset)
- );
- abortedTransactions.addAll(partition.abortedTransactions());
- return abortedTransactions;
- }
-
- private boolean containsAbortMarker(RecordBatch batch) {
- if (!batch.isControlBatch())
- return false;
-
- Iterator<Record> batchIterator = batch.iterator();
- if (!batchIterator.hasNext())
- return false;
-
- Record firstRecord = batchIterator.next();
- return ControlRecordType.ABORT ==
ControlRecordType.parse(firstRecord.key());
- }
-
- private boolean notInitialized() {
- return !this.initialized;
- }
- }
-
- /**
- * Since we parse the message data for each partition from each fetch
response lazily, fetch-level
- * metrics need to be aggregated as the messages from each partition are
parsed. This class is used
- * to facilitate this incremental aggregation.
- */
- private static class FetchResponseMetricAggregator {
- private final FetchManagerMetrics sensors;
- private final Set<TopicPartition> unrecordedPartitions;
-
- private final FetchMetrics fetchMetrics = new FetchMetrics();
- private final Map<String, FetchMetrics> topicFetchMetrics = new
HashMap<>();
-
- private FetchResponseMetricAggregator(FetchManagerMetrics sensors,
- Set<TopicPartition> partitions) {
- this.sensors = sensors;
- this.unrecordedPartitions = partitions;
- }
-
- /**
- * After each partition is parsed, we update the current metric totals
with the total bytes
- * and number of records parsed. After all partitions have reported,
we write the metric.
- */
- public void record(TopicPartition partition, int bytes, int records) {
- this.unrecordedPartitions.remove(partition);
- this.fetchMetrics.increment(bytes, records);
-
- // collect and aggregate per-topic metrics
- String topic = partition.topic();
- FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic);
- if (topicFetchMetric == null) {
- topicFetchMetric = new FetchMetrics();
- this.topicFetchMetrics.put(topic, topicFetchMetric);
- }
- topicFetchMetric.increment(bytes, records);
-
- if (this.unrecordedPartitions.isEmpty()) {
- // once all expected partitions from the fetch have reported
in, record the metrics
- this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
-
this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
-
- // also record per-topic metrics
- for (Map.Entry<String, FetchMetrics> entry:
this.topicFetchMetrics.entrySet()) {
- FetchMetrics metric = entry.getValue();
- this.sensors.recordTopicFetchMetrics(entry.getKey(),
metric.fetchBytes, metric.fetchRecords);
- }
- }
- }
-
- private static class FetchMetrics {
- private int fetchBytes;
- private int fetchRecords;
-
- protected void increment(int bytes, int records) {
- this.fetchBytes += bytes;
- this.fetchRecords += records;
- }
- }
- }
-
- private static class FetchManagerMetrics {
- private final Metrics metrics;
- private final FetcherMetricsRegistry metricsRegistry;
- private final Sensor bytesFetched;
- private final Sensor recordsFetched;
- private final Sensor fetchLatency;
- private final Sensor recordsFetchLag;
- private final Sensor recordsFetchLead;
-
- private int assignmentId = 0;
- private Set<TopicPartition> assignedPartitions =
Collections.emptySet();
-
- private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry
metricsRegistry) {
- this.metrics = metrics;
- this.metricsRegistry = metricsRegistry;
-
- this.bytesFetched = metrics.sensor("bytes-fetched");
-
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeAvg), new
Avg());
-
this.bytesFetched.add(metrics.metricInstance(metricsRegistry.fetchSizeMax), new
Max());
- this.bytesFetched.add(new
Meter(metrics.metricInstance(metricsRegistry.bytesConsumedRate),
-
metrics.metricInstance(metricsRegistry.bytesConsumedTotal)));
-
- this.recordsFetched = metrics.sensor("records-fetched");
-
this.recordsFetched.add(metrics.metricInstance(metricsRegistry.recordsPerRequestAvg),
new Avg());
- this.recordsFetched.add(new
Meter(metrics.metricInstance(metricsRegistry.recordsConsumedRate),
-
metrics.metricInstance(metricsRegistry.recordsConsumedTotal)));
-
- this.fetchLatency = metrics.sensor("fetch-latency");
-
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyAvg),
new Avg());
-
this.fetchLatency.add(metrics.metricInstance(metricsRegistry.fetchLatencyMax),
new Max());
- this.fetchLatency.add(new Meter(new WindowedCount(),
metrics.metricInstance(metricsRegistry.fetchRequestRate),
-
metrics.metricInstance(metricsRegistry.fetchRequestTotal)));
-
- this.recordsFetchLag = metrics.sensor("records-lag");
-
this.recordsFetchLag.add(metrics.metricInstance(metricsRegistry.recordsLagMax),
new Max());
-
- this.recordsFetchLead = metrics.sensor("records-lead");
-
this.recordsFetchLead.add(metrics.metricInstance(metricsRegistry.recordsLeadMin),
new Min());
- }
-
- private void recordTopicFetchMetrics(String topic, int bytes, int
records) {
- // record bytes fetched
- String name = "topic." + topic + ".bytes-fetched";
- Sensor bytesFetched = this.metrics.getSensor(name);
- if (bytesFetched == null) {
- Map<String, String> metricTags =
Collections.singletonMap("topic", topic.replace('.', '_'));
-
- bytesFetched = this.metrics.sensor(name);
-
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeAvg,
- metricTags), new Avg());
-
bytesFetched.add(this.metrics.metricInstance(metricsRegistry.topicFetchSizeMax,
- metricTags), new Max());
- bytesFetched.add(new
Meter(this.metrics.metricInstance(metricsRegistry.topicBytesConsumedRate,
metricTags),
-
this.metrics.metricInstance(metricsRegistry.topicBytesConsumedTotal,
metricTags)));
- }
- bytesFetched.record(bytes);
-
- // record records fetched
- name = "topic." + topic + ".records-fetched";
- Sensor recordsFetched = this.metrics.getSensor(name);
- if (recordsFetched == null) {
- Map<String, String> metricTags = new HashMap<>(1);
- metricTags.put("topic", topic.replace('.', '_'));
-
- recordsFetched = this.metrics.sensor(name);
-
recordsFetched.add(this.metrics.metricInstance(metricsRegistry.topicRecordsPerRequestAvg,
- metricTags), new Avg());
- recordsFetched.add(new
Meter(this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedRate,
metricTags),
-
this.metrics.metricInstance(metricsRegistry.topicRecordsConsumedTotal,
metricTags)));
- }
- recordsFetched.record(records);
- }
-
- private void maybeUpdateAssignment(SubscriptionState subscription) {
- int newAssignmentId = subscription.assignmentId();
- if (this.assignmentId != newAssignmentId) {
- Set<TopicPartition> newAssignedPartitions =
subscription.assignedPartitions();
- for (TopicPartition tp : this.assignedPartitions) {
- if (!newAssignedPartitions.contains(tp)) {
- metrics.removeSensor(partitionLagMetricName(tp));
- metrics.removeSensor(partitionLeadMetricName(tp));
-
metrics.removeMetric(partitionPreferredReadReplicaMetricName(tp));
- }
- }
-
- for (TopicPartition tp : newAssignedPartitions) {
- if (!this.assignedPartitions.contains(tp)) {
- MetricName metricName =
partitionPreferredReadReplicaMetricName(tp);
- metrics.addMetricIfAbsent(
- metricName,
- null,
- (Gauge<Integer>) (config, now) ->
subscription.preferredReadReplica(tp, 0L).orElse(-1)
- );
- }
- }
-
- this.assignedPartitions = newAssignedPartitions;
- this.assignmentId = newAssignmentId;
- }
- }
-
- private void recordPartitionLead(TopicPartition tp, long lead) {
- this.recordsFetchLead.record(lead);
-
- String name = partitionLeadMetricName(tp);
- Sensor recordsLead = this.metrics.getSensor(name);
- if (recordsLead == null) {
- Map<String, String> metricTags = topicPartitionTags(tp);
-
- recordsLead = this.metrics.sensor(name);
-
-
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLead,
metricTags), new Value());
-
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadMin,
metricTags), new Min());
-
recordsLead.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLeadAvg,
metricTags), new Avg());
- }
- recordsLead.record(lead);
- }
-
- private void recordPartitionLag(TopicPartition tp, long lag) {
- this.recordsFetchLag.record(lag);
-
- String name = partitionLagMetricName(tp);
- Sensor recordsLag = this.metrics.getSensor(name);
- if (recordsLag == null) {
- Map<String, String> metricTags = topicPartitionTags(tp);
- recordsLag = this.metrics.sensor(name);
-
-
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLag,
metricTags), new Value());
-
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagMax,
metricTags), new Max());
-
recordsLag.add(this.metrics.metricInstance(metricsRegistry.partitionRecordsLagAvg,
metricTags), new Avg());
- }
- recordsLag.record(lag);
- }
-
- private static String partitionLagMetricName(TopicPartition tp) {
- return tp + ".records-lag";
- }
-
- private static String partitionLeadMetricName(TopicPartition tp) {
- return tp + ".records-lead";
- }
-
- private MetricName
partitionPreferredReadReplicaMetricName(TopicPartition tp) {
- Map<String, String> metricTags = topicPartitionTags(tp);
- return
this.metrics.metricInstance(metricsRegistry.partitionPreferredReadReplica,
metricTags);
- }
-
- private Map<String, String> topicPartitionTags(TopicPartition tp) {
- Map<String, String> metricTags = new HashMap<>(2);
- metricTags.put("topic", tp.topic().replace('.', '_'));
- metricTags.put("partition", String.valueOf(tp.partition()));
- return metricTags;
- }
- }
-
// Visible for testing
void maybeCloseFetchSessions(final Timer timer) {
final Cluster cluster = metadata.fetch();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
new file mode 100644
index 00000000000..2272ee5c0a3
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SensorBuilder.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.Metric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.Min;
+import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.apache.kafka.common.metrics.stats.Value;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Supplier;
+
+/**
+ * {@code SensorBuilder} takes a bit of the boilerplate out of creating {@link
Sensor sensors} for recording
+ * {@link Metric metrics}.
+ */
+public class SensorBuilder {
+
+ private final Metrics metrics;
+
+ private final Sensor sensor;
+
+ private final boolean prexisting;
+
+ private final Map<String, String> tags;
+
+ public SensorBuilder(Metrics metrics, String name) {
+ this(metrics, name, Collections::emptyMap);
+ }
+
+ public SensorBuilder(Metrics metrics, String name, Supplier<Map<String,
String>> tagsSupplier) {
+ this.metrics = metrics;
+ Sensor s = metrics.getSensor(name);
+
+ if (s != null) {
+ sensor = s;
+ tags = Collections.emptyMap();
+ prexisting = true;
+ } else {
+ sensor = metrics.sensor(name);
+ tags = tagsSupplier.get();
+ prexisting = false;
+ }
+ }
+
+ SensorBuilder withAvg(MetricNameTemplate name) {
+ if (!prexisting)
+ sensor.add(metrics.metricInstance(name, tags), new Avg());
+
+ return this;
+ }
+
+ SensorBuilder withMin(MetricNameTemplate name) {
+ if (!prexisting)
+ sensor.add(metrics.metricInstance(name, tags), new Min());
+
+ return this;
+ }
+
+ SensorBuilder withMax(MetricNameTemplate name) {
+ if (!prexisting)
+ sensor.add(metrics.metricInstance(name, tags), new Max());
+
+ return this;
+ }
+
+ SensorBuilder withValue(MetricNameTemplate name) {
+ if (!prexisting)
+ sensor.add(metrics.metricInstance(name, tags), new Value());
+
+ return this;
+ }
+
+ SensorBuilder withMeter(MetricNameTemplate rateName, MetricNameTemplate
totalName) {
+ if (!prexisting) {
+ sensor.add(new Meter(metrics.metricInstance(rateName, tags),
metrics.metricInstance(totalName, tags)));
+ }
+
+ return this;
+ }
+
+ SensorBuilder withMeter(SampledStat sampledStat, MetricNameTemplate
rateName, MetricNameTemplate totalName) {
+ if (!prexisting) {
+ sensor.add(new Meter(sampledStat, metrics.metricInstance(rateName,
tags), metrics.metricInstance(totalName, tags)));
+ }
+
+ return this;
+ }
+
+ Sensor build() {
+ return sensor;
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
new file mode 100644
index 00000000000..b420852852a
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CompletedFetchTest.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.common.IsolationLevel;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RecordDeserializationException;
+import org.apache.kafka.common.message.FetchResponseData;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.EndTransactionMarker;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.UUIDDeserializer;
+import org.apache.kafka.common.serialization.UUIDSerializer;
+import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class CompletedFetchTest {
+
+ private final static String TOPIC_NAME = "test";
+ private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0);
+ private final static long PRODUCER_ID = 1000L;
+ private final static short PRODUCER_EPOCH = 0;
+
+ private BufferSupplier bufferSupplier;
+
+ @BeforeEach
+ public void setup() {
+ bufferSupplier = BufferSupplier.create();
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (bufferSupplier != null)
+ bufferSupplier.close();
+ }
+
+ @Test
+ public void testSimple() {
+ long fetchOffset = 5;
+ int startingOffset = 10;
+ int numRecords = 11; // Records for 10-20
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, numRecords,
fetchOffset));
+
+ CompletedFetch<String, String> completedFetch =
newCompletedFetch(fetchOffset, partitionData);
+
+ List<ConsumerRecord<String, String>> records =
completedFetch.fetchRecords(10);
+ assertEquals(10, records.size());
+ ConsumerRecord<String, String> record = records.get(0);
+ assertEquals(10, record.offset());
+
+ records = completedFetch.fetchRecords(10);
+ assertEquals(1, records.size());
+ record = records.get(0);
+ assertEquals(20, record.offset());
+
+ records = completedFetch.fetchRecords(10);
+ assertEquals(0, records.size());
+ }
+
+ @Test
+ public void testAbortedTransactionRecordsRemoved() {
+ int numRecords = 10;
+ Records rawRecords = newTranscactionalRecords(ControlRecordType.ABORT,
numRecords);
+
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setRecords(rawRecords)
+ .setAbortedTransactions(newAbortedTransactions());
+
+ CompletedFetch<String, String> completedFetch =
newCompletedFetch(IsolationLevel.READ_COMMITTED,
+ OffsetResetStrategy.NONE,
+ true,
+ 0,
+ partitionData);
+ List<ConsumerRecord<String, String>> records =
completedFetch.fetchRecords(10);
+ assertEquals(0, records.size());
+
+ completedFetch = newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
+ OffsetResetStrategy.NONE,
+ true,
+ 0,
+ partitionData);
+ records = completedFetch.fetchRecords(10);
+ assertEquals(numRecords, records.size());
+ }
+
+ @Test
+ public void testCommittedTransactionRecordsIncluded() {
+ int numRecords = 10;
+ Records rawRecords =
newTranscactionalRecords(ControlRecordType.COMMIT, numRecords);
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setRecords(rawRecords);
+ CompletedFetch<String, String> completedFetch =
newCompletedFetch(IsolationLevel.READ_COMMITTED,
+ OffsetResetStrategy.NONE,
+ true,
+ 0,
+ partitionData);
+ List<ConsumerRecord<String, String>> records =
completedFetch.fetchRecords(10);
+ assertEquals(10, records.size());
+ }
+
+ @Test
+ public void testNegativeFetchCount() {
+ long fetchOffset = 0;
+ int startingOffset = 0;
+ int numRecords = 10;
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, numRecords,
fetchOffset));
+
+ CompletedFetch<String, String> completedFetch =
newCompletedFetch(fetchOffset, partitionData);
+
+ List<ConsumerRecord<String, String>> records =
completedFetch.fetchRecords(-10);
+ assertEquals(0, records.size());
+ }
+
+ @Test
+ public void testNoRecordsInFetch() {
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(0)
+ .setHighWatermark(10)
+ .setLastStableOffset(20)
+ .setLogStartOffset(0);
+
+ CompletedFetch<String, String> completedFetch =
newCompletedFetch(IsolationLevel.READ_UNCOMMITTED,
+ OffsetResetStrategy.NONE,
+ false,
+ 1,
+ partitionData);
+
+ List<ConsumerRecord<String, String>> records =
completedFetch.fetchRecords(10);
+ assertEquals(0, records.size());
+ }
+
+ @Test
+ public void testCorruptedMessage() {
+ // Create one good record and then one "corrupted" record.
+ MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, 0);
+ builder.append(new SimpleRecord(new
UUIDSerializer().serialize(TOPIC_NAME, UUID.randomUUID())));
+ builder.append(0L, "key".getBytes(), "value".getBytes());
+ Records records = builder.build();
+
+ FetchResponseData.PartitionData partitionData = new
FetchResponseData.PartitionData()
+ .setPartitionIndex(0)
+ .setHighWatermark(10)
+ .setLastStableOffset(20)
+ .setLogStartOffset(0)
+ .setRecords(records);
+
+ CompletedFetch<UUID, UUID> completedFetch = newCompletedFetch(new
UUIDDeserializer(),
+ new UUIDDeserializer(),
+ IsolationLevel.READ_COMMITTED,
+ OffsetResetStrategy.NONE,
+ false,
+ 0,
+ partitionData);
+
+ completedFetch.fetchRecords(10);
+
+ assertThrows(RecordDeserializationException.class, () ->
completedFetch.fetchRecords(10));
+ }
+
+ private CompletedFetch<String, String> newCompletedFetch(long fetchOffset,
+
FetchResponseData.PartitionData partitionData) {
+ return newCompletedFetch(
+ IsolationLevel.READ_UNCOMMITTED,
+ OffsetResetStrategy.NONE,
+ true,
+ fetchOffset,
+ partitionData);
+ }
+
+ private CompletedFetch<String, String> newCompletedFetch(IsolationLevel
isolationLevel,
+
OffsetResetStrategy offsetResetStrategy,
+ boolean checkCrcs,
+ long fetchOffset,
+
FetchResponseData.PartitionData partitionData) {
+ return newCompletedFetch(new StringDeserializer(),
+ new StringDeserializer(),
+ isolationLevel,
+ offsetResetStrategy,
+ checkCrcs,
+ fetchOffset,
+ partitionData);
+ }
+
+ private <K, V> CompletedFetch<K, V> newCompletedFetch(Deserializer<K>
keyDeserializer,
+ Deserializer<V>
valueDeserializer,
+ IsolationLevel
isolationLevel,
+ OffsetResetStrategy
offsetResetStrategy,
+ boolean checkCrcs,
+ long fetchOffset,
+
FetchResponseData.PartitionData partitionData) {
+ LogContext logContext = new LogContext();
+ SubscriptionState subscriptions = new SubscriptionState(logContext,
offsetResetStrategy);
+ FetchMetricsRegistry metricsRegistry = new FetchMetricsRegistry();
+ FetchMetricsManager metrics = new FetchMetricsManager(new Metrics(),
metricsRegistry);
+ FetchMetricsAggregator metricAggregator = new
FetchMetricsAggregator(metrics, Collections.singleton(TP));
+
+ return new CompletedFetch<>(logContext,
+ subscriptions,
+ checkCrcs,
+ bufferSupplier,
+ keyDeserializer,
+ valueDeserializer,
+ isolationLevel,
+ TP,
+ partitionData,
+ metricAggregator,
+ fetchOffset,
+ ApiKeys.FETCH.latestVersion());
+ }
+
+ private Records newRecords(long baseOffset, int count, long
firstMessageId) {
+ MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
TimestampType.CREATE_TIME, baseOffset);
+ for (int i = 0; i < count; i++)
+ builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId +
i)).getBytes());
+ return builder.build();
+ }
+
+ private Records newTranscactionalRecords(ControlRecordType
controlRecordType, int numRecords) {
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE,
+ TimestampType.CREATE_TIME,
+ 0,
+ time.milliseconds(),
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ 0,
+ true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
+
+ for (int i = 0; i < numRecords; i++)
+ builder.append(new SimpleRecord(time.milliseconds(),
"key".getBytes(), "value".getBytes()));
+
+ builder.build();
+ writeTransactionMarker(buffer, controlRecordType, numRecords, time);
+ buffer.flip();
+
+ return MemoryRecords.readableRecords(buffer);
+ }
+
+ private void writeTransactionMarker(ByteBuffer buffer,
+ ControlRecordType controlRecordType,
+ int offset,
+ Time time) {
+ MemoryRecords.writeEndTransactionalMarker(buffer,
+ offset,
+ time.milliseconds(),
+ 0,
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ new EndTransactionMarker(controlRecordType, 0));
+ }
+
+ private List<FetchResponseData.AbortedTransaction>
newAbortedTransactions() {
+ FetchResponseData.AbortedTransaction abortedTransaction = new
FetchResponseData.AbortedTransaction();
+ abortedTransaction.setFirstOffset(0);
+ abortedTransaction.setProducerId(PRODUCER_ID);
+ return Collections.singletonList(abortedTransaction);
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
new file mode 100644
index 00000000000..dc00ce33366
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchMetricsManagerTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicPartitionTags;
+import static
org.apache.kafka.clients.consumer.internals.FetchMetricsManager.topicTags;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class FetchMetricsManagerTest {
+
+ private static final double EPSILON = 0.0001;
+
+ private final Time time = new MockTime(1, 0, 0);
+ private final static String TOPIC_NAME = "test";
+ private final static TopicPartition TP = new TopicPartition(TOPIC_NAME, 0);
+
+ private Metrics metrics;
+ private FetchMetricsRegistry metricsRegistry;
+ private FetchMetricsManager metricsManager;
+
+
+ @BeforeEach
+ public void setup() {
+ metrics = new Metrics(time);
+ metricsRegistry = new
FetchMetricsRegistry(metrics.config().tags().keySet(), "test");
+ metricsManager = new FetchMetricsManager(metrics, metricsRegistry);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (metrics != null) {
+ metrics.close();
+ metrics = null;
+ }
+
+ metricsManager = null;
+ }
+
+ @Test
+ public void testLatency() {
+ metricsManager.recordLatency(123);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordLatency(456);
+
+ assertEquals(289.5, metricValue(metricsRegistry.fetchLatencyAvg),
EPSILON);
+ assertEquals(456, metricValue(metricsRegistry.fetchLatencyMax),
EPSILON);
+ }
+
+ @Test
+ public void testBytesFetched() {
+ metricsManager.recordBytesFetched(2);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordBytesFetched(10);
+
+ assertEquals(6, metricValue(metricsRegistry.fetchSizeAvg), EPSILON);
+ assertEquals(10, metricValue(metricsRegistry.fetchSizeMax), EPSILON);
+ }
+
+ @Test
+ public void testBytesFetchedTopic() {
+ String topicName1 = TOPIC_NAME;
+ String topicName2 = "another-topic";
+ Map<String, String> tags1 = topicTags(topicName1);
+ Map<String, String> tags2 = topicTags(topicName2);
+
+ metricsManager.recordBytesFetched(topicName1, 2);
+ metricsManager.recordBytesFetched(topicName2, 1);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordBytesFetched(topicName1, 10);
+ metricsManager.recordBytesFetched(topicName2, 5);
+
+ assertEquals(6, metricValue(metricsRegistry.topicFetchSizeAvg, tags1),
EPSILON);
+ assertEquals(10, metricValue(metricsRegistry.topicFetchSizeMax,
tags1), EPSILON);
+ assertEquals(3, metricValue(metricsRegistry.topicFetchSizeAvg, tags2),
EPSILON);
+ assertEquals(5, metricValue(metricsRegistry.topicFetchSizeMax, tags2),
EPSILON);
+ }
+
+ @Test
+ public void testRecordsFetched() {
+ metricsManager.recordRecordsFetched(3);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordRecordsFetched(15);
+
+ assertEquals(9, metricValue(metricsRegistry.recordsPerRequestAvg),
EPSILON);
+ }
+
+ @Test
+ public void testRecordsFetchedTopic() {
+ String topicName1 = TOPIC_NAME;
+ String topicName2 = "another-topic";
+ Map<String, String> tags1 = topicTags(topicName1);
+ Map<String, String> tags2 = topicTags(topicName2);
+
+ metricsManager.recordRecordsFetched(topicName1, 2);
+ metricsManager.recordRecordsFetched(topicName2, 1);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordRecordsFetched(topicName1, 10);
+ metricsManager.recordRecordsFetched(topicName2, 5);
+
+ assertEquals(6, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags1), EPSILON);
+ assertEquals(3, metricValue(metricsRegistry.topicRecordsPerRequestAvg,
tags2), EPSILON);
+ }
+
+ @Test
+ public void testPartitionLag() {
+ Map<String, String> tags = topicPartitionTags(TP);
+ metricsManager.recordPartitionLag(TP, 14);
+ metricsManager.recordPartitionLag(TP, 8);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLag(TP, 5);
+
+ assertEquals(14, metricValue(metricsRegistry.recordsLagMax), EPSILON);
+ assertEquals(5, metricValue(metricsRegistry.partitionRecordsLag,
tags), EPSILON);
+ assertEquals(14, metricValue(metricsRegistry.partitionRecordsLagMax,
tags), EPSILON);
+ assertEquals(9, metricValue(metricsRegistry.partitionRecordsLagAvg,
tags), EPSILON);
+ }
+
+ @Test
+ public void testPartitionLead() {
+ Map<String, String> tags = topicPartitionTags(TP);
+ metricsManager.recordPartitionLead(TP, 15);
+ metricsManager.recordPartitionLead(TP, 11);
+ time.sleep(metrics.config().timeWindowMs() + 1);
+ metricsManager.recordPartitionLead(TP, 13);
+
+ assertEquals(11, metricValue(metricsRegistry.recordsLeadMin), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLead,
tags), EPSILON);
+ assertEquals(11, metricValue(metricsRegistry.partitionRecordsLeadMin,
tags), EPSILON);
+ assertEquals(13, metricValue(metricsRegistry.partitionRecordsLeadAvg,
tags), EPSILON);
+ }
+
+ private double metricValue(MetricNameTemplate name) {
+ MetricName metricName = metrics.metricInstance(name);
+ KafkaMetric metric = metrics.metric(metricName);
+ return (Double) metric.metricValue();
+ }
+
+ private double metricValue(MetricNameTemplate name, Map<String, String>
tags) {
+ MetricName metricName = metrics.metricInstance(name, tags);
+ KafkaMetric metric = metrics.metric(metricName);
+ return (Double) metric.metricValue();
+ }
+
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index a22739e1413..e60edbfb6c1 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -175,7 +175,7 @@ public class FetcherTest {
private MockTime time = new MockTime(1);
private SubscriptionState subscriptions;
private ConsumerMetadata metadata;
- private FetcherMetricsRegistry metricsRegistry;
+ private FetchMetricsRegistry metricsRegistry;
private MockClient client;
private Metrics metrics;
private ApiVersions apiVersions = new ApiVersions();
@@ -3686,7 +3686,7 @@ public class FetcherTest {
metrics = new Metrics(metricConfig, time);
consumerClient = spy(new ConsumerNetworkClient(logContext, client,
metadata, time,
100, 1000, Integer.MAX_VALUE));
- metricsRegistry = new
FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId);
+ metricsRegistry = new
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumer" + groupId);
}
private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>>
records) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
index d57acc243e1..ffaeab4709d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/OffsetFetcherTest.java
@@ -1244,7 +1244,7 @@ public class OffsetFetcherTest {
buildFetcher(metricConfig, isolationLevel, metadataExpireMs,
subscriptionState, logContext);
- FetcherMetricsRegistry metricsRegistry = new
FetcherMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
+ FetchMetricsRegistry metricsRegistry = new
FetchMetricsRegistry(metricConfig.tags().keySet(), "consumertest-group");
Fetcher<byte[], byte[]> fetcher = new Fetcher<>(
logContext,
consumerClient,