Repository: kafka Updated Branches: refs/heads/trunk 055ca9b7a -> 7f8edbc8e
KAFKA-4000; Collect and record per-topic consumer metrics Improve consumer metric collection by collecting and recording metrics per topic. Author: Vahid Hashemian <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #1684 from vahidhashemian/KAFKA-4000 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7f8edbc8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7f8edbc8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7f8edbc8 Branch: refs/heads/trunk Commit: 7f8edbc8e848d22b1cb016a7208bee1e52a65b73 Parents: 055ca9b Author: Vahid Hashemian <[email protected]> Authored: Fri Dec 9 14:54:30 2016 -0800 Committer: Jason Gustafson <[email protected]> Committed: Fri Dec 9 14:55:42 2016 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 45 +++++++++++++++----- 1 file changed, 34 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7f8edbc8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e414fcb..4bfe466 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -688,7 +688,6 @@ public class Fetcher<K, V> { } recordsCount = parsed.size(); - this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount); if (!parsed.isEmpty()) { log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position); @@ -833,11 +832,11 @@ public class Fetcher<K, V> { private final FetchManagerMetrics sensors; private final Set<TopicPartition> unrecordedPartitions; - private int totalBytes; - private int totalRecords; + private final FetchMetrics fetchMetrics = new FetchMetrics(); + private final Map<String, FetchMetrics> topicFetchMetrics = new HashMap<>(); - public FetchResponseMetricAggregator(FetchManagerMetrics sensors, - Set<TopicPartition> partitions) { + private FetchResponseMetricAggregator(FetchManagerMetrics sensors, + Set<TopicPartition> partitions) { this.sensors = sensors; this.unrecordedPartitions = partitions; } @@ -847,14 +846,38 @@ public class Fetcher<K, V> { * and number of records parsed. After all partitions have reported, we write the metric. */ public void record(TopicPartition partition, int bytes, int records) { - unrecordedPartitions.remove(partition); - totalBytes += bytes; - totalRecords += records; + this.unrecordedPartitions.remove(partition); + this.fetchMetrics.increment(bytes, records); + + // collect and aggregate per-topic metrics + String topic = partition.topic(); + FetchMetrics topicFetchMetric = this.topicFetchMetrics.get(topic); + if (topicFetchMetric == null) { + topicFetchMetric = new FetchMetrics(); + this.topicFetchMetrics.put(topic, topicFetchMetric); + } + topicFetchMetric.increment(bytes, records); - if (unrecordedPartitions.isEmpty()) { + if (this.unrecordedPartitions.isEmpty()) { // once all expected partitions from the fetch have reported in, record the metrics - sensors.bytesFetched.record(totalBytes); - sensors.recordsFetched.record(totalRecords); + this.sensors.bytesFetched.record(topicFetchMetric.fetchBytes); + this.sensors.recordsFetched.record(topicFetchMetric.fetchRecords); + + // also record per-topic metrics + for (Map.Entry<String, FetchMetrics> entry: this.topicFetchMetrics.entrySet()) { + FetchMetrics metric = entry.getValue(); + this.sensors.recordTopicFetchMetrics(entry.getKey(), metric.fetchBytes, metric.fetchRecords); + } + } + } + + private static class FetchMetrics { + private int fetchBytes; + private int fetchRecords; + + protected void increment(int bytes, int records) { + this.fetchBytes += bytes; + this.fetchRecords += records; } } }
