This is an automated email from the ASF dual-hosted git repository.

rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6ff411d [STORM-8019] Fixing kafka topic level metrics computation 
(#8047)
1a6ff411d is described below

commit 1a6ff411d61e0a5358a2ee159444339041de6ac7
Author: reiabreu <[email protected]>
AuthorDate: Mon Jun 16 10:21:16 2025 +0100

    [STORM-8019] Fixing kafka topic level metrics computation (#8047)
    
    * Fixing the way Kafka topic level metrics are computed
---
 .../org/apache/storm/kafka/spout/KafkaSpout.java   |  59 ++---
 .../spout/metrics2/KafkaOffsetMetricManager.java   |  13 +-
 .../metrics2/KafkaOffsetPartitionMetrics.java      | 185 ++++------------
 .../spout/metrics2/KafkaOffsetTopicMetrics.java    | 182 +++++++++------
 .../kafka/spout/metrics2/KafkaOffsetUtil.java      |  92 ++++++++
 .../kafka/spout/KafkaSpoutReactivationTest.java    |  27 +++
 .../metric2/KafkaOffsetPartitionMetricsTest.java   | 163 ++++++++++++++
 .../spout/metric2/KafkaOffsetTopicMetricsTest.java | 243 +++++++++++++++++++++
 8 files changed, 729 insertions(+), 235 deletions(-)

diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index a328aec5d..0fcb22a04 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,6 +39,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+
 import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
@@ -154,7 +155,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         tupleListener.open(conf, context);
         this.kafkaOffsetMetricManager
-            = new KafkaOffsetMetricManager<>(() -> 
Collections.unmodifiableMap(offsetManagers), () -> admin, context);
+                = new KafkaOffsetMetricManager<>(() -> 
Collections.unmodifiableMap(offsetManagers), () -> admin, context);
 
         LOG.info("Kafka Spout opened with the following configuration: {}", 
kafkaSpoutConfig);
     }
@@ -183,7 +184,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             previousAssignment = partitions;
 
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, 
topic-partitions={}]",
-                kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+                    kafkaSpoutConfig.getConsumerGroupId(), consumer, 
partitions);
 
             if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
@@ -193,7 +194,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, 
consumer={}, topic-partitions={}]",
-                context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+                    context.getThisTaskId(), 
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
 
             initialize(partitions);
             tupleListener.onPartitionsReassigned(partitions);
@@ -221,7 +222,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 final OffsetAndMetadata committedOffset = 
consumer.committed(newTp);
                 final long fetchOffset = doSeek(newTp, committedOffset);
                 LOG.debug("Set consumer position to [{}] for topic-partition 
[{}] with [{}] and committed offset [{}]",
-                    fetchOffset, newTp, firstPollOffsetStrategy, 
committedOffset);
+                        fetchOffset, newTp, firstPollOffsetStrategy, 
committedOffset);
                 if (isAtLeastOnceProcessing() && 
!offsetManagers.containsKey(newTp)) {
                     offsetManagers.put(newTp, new OffsetManager(newTp, 
fetchOffset));
                 }
@@ -234,13 +235,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
          */
         private long doSeek(TopicPartition newTp, OffsetAndMetadata 
committedOffset) {
             LOG.trace("Seeking offset for topic-partition [{}] with [{}] and 
committed offset [{}]",
-                newTp, firstPollOffsetStrategy, committedOffset);
+                    newTp, firstPollOffsetStrategy, committedOffset);
 
             if (committedOffset != null) {
                 // offset was previously committed for this consumer group and 
topic-partition, either by this or another topology.
                 if 
(commitMetadataManager.isOffsetCommittedByThisTopology(newTp,
-                    committedOffset,
-                    Collections.unmodifiableMap(offsetManagers))) {
+                        committedOffset,
+                        Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already 
committed, therefore FirstPollOffsetStrategy does not apply.
                     consumer.seek(newTp, committedOffset.offset());
                 } else {
@@ -281,7 +282,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     commitOffsetsForAckedTuples();
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == 
ProcessingGuarantee.NO_GUARANTEE) {
                     Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
-                        createFetchedOffsetsMetadata(consumer.assignment());
+                            
createFetchedOffsetsMetadata(consumer.assignment());
                     consumer.commitAsync(offsetsToCommit, null);
                     LOG.debug("Committed offsets {} to Kafka", 
offsetsToCommit);
                 }
@@ -336,7 +337,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     pollablePartitions.add(tp);
                 } else {
                     LOG.debug("Not polling on partition [{}]. It has [{}] 
uncommitted offsets, which exceeds the limit of [{}]. ", tp,
-                        numUncommittedOffsets, maxUncommittedOffsets);
+                            numUncommittedOffsets, maxUncommittedOffsets);
                 }
             }
         }
@@ -345,7 +346,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean isWaitingToEmit() {
         return waitingToEmit.values().stream()
-            .anyMatch(list -> !list.isEmpty());
+                .anyMatch(list -> !list.isEmpty());
     }
 
     private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
@@ -365,11 +366,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets,
 consumerRecords);
             final int numPolledRecords = consumerRecords.count();
             LOG.debug("Polled [{}] records from Kafka",
-                numPolledRecords);
+                    numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == 
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is 
at-most-once.
                 Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
-                    createFetchedOffsetsMetadata(consumer.assignment());
+                        createFetchedOffsetsMetadata(consumer.assignment());
                 consumer.commitSync(offsetsToCommit);
                 LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
@@ -387,7 +388,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long> 
earliestRetriableOffsets,
-        ConsumerRecords<K, V> consumerRecords) {
+                                                    ConsumerRecords<K, V> 
consumerRecords) {
         for (Entry<TopicPartition, Long> entry : 
earliestRetriableOffsets.entrySet()) {
             TopicPartition tp = entry.getKey();
             List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
@@ -529,7 +530,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                      * to the committed offset.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching 
up. Position was [{}], skipping to [{}]",
-                        position, committedOffset);
+                            position, committedOffset);
                     consumer.seek(tp, committedOffset);
                 }
                 /**
@@ -539,8 +540,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (waitingToEmitForTp != null) {
                     //Discard the pending records that are already committed
                     waitingToEmit.put(tp, waitingToEmitForTp.stream()
-                        .filter(record -> record.offset() >= committedOffset)
-                        .collect(Collectors.toCollection(LinkedList::new)));
+                            .filter(record -> record.offset() >= 
committedOffset)
+                            
.collect(Collectors.toCollection(LinkedList::new)));
                 }
 
                 final OffsetManager offsetManager = offsetManagers.get(tp);
@@ -573,11 +574,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         if (!emitted.contains(msgId)) {
             LOG.debug("Received ack for message [{}], associated with tuple 
emitted for a ConsumerRecord that "
-                + "came from a topic-partition that this consumer group 
instance is no longer tracking "
-                + "due to rebalance/partition reassignment. No action taken.", 
msgId);
+                    + "came from a topic-partition that this consumer group 
instance is no longer tracking "
+                    + "due to rebalance/partition reassignment. No action 
taken.", msgId);
         } else {
             Validate.isTrue(!retryService.isScheduled(msgId), "The message id 
" + msgId + " is queued for retry while being acked."
-                + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
+                    + " This should never occur barring errors in the 
RetryService implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
@@ -595,11 +596,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!emitted.contains(msgId)) {
             LOG.debug("Received fail for tuple this spout is no longer 
tracking."
-                + " Partitions may have been reassigned. Ignoring message 
[{}]", msgId);
+                    + " Partitions may have been reassigned. Ignoring message 
[{}]", msgId);
             return;
         }
         Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + 
msgId + " is queued for retry while being failed."
-            + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
+                + " This should never occur barring errors in the RetryService 
implementation or the spout code.");
 
         msgId.incrementNumFails();
 
@@ -630,7 +631,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         List<TopicPartition> allPartitionsSorted = new 
ArrayList<>(allPartitions);
         Collections.sort(allPartitionsSorted, 
TopicPartitionComparator.INSTANCE);
         Set<TopicPartition> assignedPartitions = 
kafkaSpoutConfig.getTopicPartitioner()
-            .getPartitionsForThisTask(allPartitionsSorted, context);
+                .getPartitionsForThisTask(allPartitionsSorted, context);
         boolean partitionChanged = topicAssigner.assignPartitions(consumer, 
assignedPartitions, rebalanceListener);
         if (partitionChanged && canRegisterMetrics()) {
             LOG.info("Partitions assignments has changed, updating metrics.");
@@ -683,9 +684,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public String toString() {
         return "KafkaSpout{"
-            + "offsetManagers =" + offsetManagers
-            + ", emitted=" + emitted
-            + "}";
+                + "offsetManagers =" + offsetManagers
+                + ", emitted=" + emitted
+                + "}";
     }
 
     @Override
@@ -718,8 +719,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean isWrapper(Class<?> type) {
         return type == Double.class || type == Float.class || type == 
Long.class
-            || type == Integer.class || type == Short.class || type == 
Character.class
-            || type == Byte.class || type == Boolean.class || type == 
String.class;
+                || type == Integer.class || type == Short.class || type == 
Character.class
+                || type == Byte.class || type == Boolean.class || type == 
String.class;
     }
 
     private String getTopicsString() {
@@ -735,8 +736,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         PollablePartitionsInfo(Set<TopicPartition> pollablePartitions, 
Map<TopicPartition, Long> earliestRetriableOffsets) {
             this.pollablePartitions = pollablePartitions;
             this.pollableEarliestRetriableOffsets = 
earliestRetriableOffsets.entrySet().stream()
-                .filter(entry -> pollablePartitions.contains(entry.getKey()))
-                .collect(Collectors.toMap(entry -> entry.getKey(), entry -> 
entry.getValue()));
+                    .filter(entry -> 
pollablePartitions.contains(entry.getKey()))
+                    .collect(Collectors.toMap(entry -> entry.getKey(), entry 
-> entry.getValue()));
         }
 
         public boolean shouldPoll() {
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
index 2b150797f..64c42832c 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
-
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -30,6 +29,8 @@ import org.apache.storm.task.TopologyContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
 /**
  * This class is used to manage both the partition and topic level offset 
metrics.
  */
@@ -55,6 +56,7 @@ public class KafkaOffsetMetricManager<K, V> {
     }
 
     public void registerMetricsForNewTopicPartitions(Set<TopicPartition> 
newAssignment) {
+
         for (TopicPartition topicPartition : newAssignment) {
             if (!topicPartitionMetricsMap.containsKey(topicPartition)) {
                 LOG.info("Registering metric for topicPartition: {}", 
topicPartition);
@@ -62,13 +64,14 @@ public class KafkaOffsetMetricManager<K, V> {
                 String topic = topicPartition.topic();
                 KafkaOffsetTopicMetrics topicMetrics = 
topicMetricsMap.get(topic);
                 if (topicMetrics == null) {
-                    topicMetrics = new KafkaOffsetTopicMetrics(topic);
+                    LOG.info("Registering metric for topic: {}", topic);
+                    topicMetrics = new KafkaOffsetTopicMetrics(topic, 
offsetManagerSupplier, adminSupplier, newAssignment);
                     topicMetricsMap.put(topic, topicMetrics);
                     topologyContext.registerMetricSet("kafkaOffset", 
topicMetrics);
                 }
 
                 KafkaOffsetPartitionMetrics topicPartitionMetricSet
-                    = new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, 
adminSupplier, topicPartition, topicMetrics);
+                        = new 
KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier, 
topicPartition);
                 topicPartitionMetricsMap.put(topicPartition, 
topicPartitionMetricSet);
                 topologyContext.registerMetricSet("kafkaOffset", 
topicPartitionMetricSet);
             }
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
index 6b29eefc9..df5b0c869 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,27 +18,25 @@
 
 package org.apache.storm.kafka.spout.metrics2;
 
+import static 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getBeginningOffsets;
+import static 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getEndOffsets;
+
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
-
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Supplier;
-
 import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.OffsetSpec;
-import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+
+
 /**
  * Partition level metrics.
  * <p>
@@ -56,16 +54,13 @@ public class KafkaOffsetPartitionMetrics<K, V> implements 
MetricSet {
     private final Supplier<Admin> adminSupplier;
 
     private TopicPartition topicPartition;
-    private KafkaOffsetTopicMetrics topicMetrics;
 
     public KafkaOffsetPartitionMetrics(Supplier<Map<TopicPartition, 
OffsetManager>> offsetManagerSupplier,
                                        Supplier<Admin> adminSupplier,
-                                       TopicPartition topicPartition,
-                                       KafkaOffsetTopicMetrics topicMetrics) {
+                                       TopicPartition topicPartition) {
         this.offsetManagerSupplier = offsetManagerSupplier;
         this.adminSupplier = adminSupplier;
         this.topicPartition = topicPartition;
-        this.topicMetrics = topicMetrics;
 
         LOG.info("Running KafkaOffsetMetricSet");
     }
@@ -74,93 +69,57 @@ public class KafkaOffsetPartitionMetrics<K, V> implements 
MetricSet {
     public Map<String, Metric> getMetrics() {
         Map<String, Metric> metrics = new HashMap();
 
-        String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
-        Gauge<Long> spoutLagGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition));
-                if (endOffsets == null || endOffsets.isEmpty()) {
-                    LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
-                    return 0L;
-                }
-                // add value to topic level metric
-                OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
-                Long ret = endOffsets.get(topicPartition) - 
offsetManager.getCommittedOffset();
-                topicMetrics.totalSpoutLag += ret;
-                return ret;
+        String metricPath = topicPartition.topic() + "/partition_" + 
topicPartition.partition();
+        Gauge<Long> spoutLagGauge = () -> {
+            Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+            if (endOffsets == null || endOffsets.isEmpty()) {
+                LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
+                return 0L;
             }
+            OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+            return endOffsets.get(topicPartition) - 
offsetManager.getCommittedOffset();
         };
 
-        Gauge<Long> earliestTimeOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition));
-                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
-                    LOG.error("Failed to get beginningOffsets from Kafka for 
topic partitions: {}.", topicPartition);
-                    return 0L;
-                }
-                // add value to topic level metric
-                Long ret = beginningOffsets.get(topicPartition);
-                topicMetrics.totalEarliestTimeOffset += 
beginningOffsets.get(topicPartition);
-                return ret;
+        Gauge<Long> earliestTimeOffsetGauge = () -> {
+            Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+            if (beginningOffsets == null || beginningOffsets.isEmpty()) {
+                LOG.error("Failed to get beginningOffsets from Kafka for topic 
partitions: {}.", topicPartition);
+                return 0L;
             }
+            return beginningOffsets.get(topicPartition);
         };
 
-        Gauge<Long> latestTimeOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition));
-                if (endOffsets == null || endOffsets.isEmpty()) {
-                    LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
-                    return 0L;
-                }
-                // add value to topic level metric
-                Long ret = endOffsets.get(topicPartition);
-                topicMetrics.totalLatestTimeOffset += ret;
-                return ret;
+        Gauge<Long> latestTimeOffsetGauge = () -> {
+            Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+            if (endOffsets == null || endOffsets.isEmpty()) {
+                LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
+                return 0L;
             }
+            return endOffsets.get(topicPartition);
         };
 
-        Gauge<Long> latestEmittedOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                // add value to topic level metric
-                OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
-                Long ret = offsetManager.getLatestEmittedOffset();
-                topicMetrics.totalLatestEmittedOffset += ret;
-                return ret;
-            }
+        Gauge<Long> latestEmittedOffsetGauge = () -> {
+            OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+            return offsetManager.getLatestEmittedOffset();
         };
 
-        Gauge<Long> latestCompletedOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                // add value to topic level metric
-                OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
-                Long ret = offsetManager.getCommittedOffset();
-                topicMetrics.totalLatestCompletedOffset += ret;
-                return ret;
-            }
+        Gauge<Long> latestCompletedOffsetGauge = () -> {
+            OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+            return offsetManager.getCommittedOffset();
         };
 
-        Gauge<Long> recordsInPartitionGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition));
-                if (endOffsets == null || endOffsets.isEmpty()) {
-                    LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
-                    return 0L;
-                }
-                Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition));
-                if (beginningOffsets == null || beginningOffsets.isEmpty()) {
-                    LOG.error("Failed to get beginningOffsets from Kafka for 
topic partitions: {}.", topicPartition);
-                    return 0L;
-                }
-                // add value to topic level metric
-                Long ret = endOffsets.get(topicPartition) - 
beginningOffsets.get(topicPartition);
-                topicMetrics.totalRecordsInPartitions += ret;
-                return ret;
+        Gauge<Long> recordsInPartitionGauge = () -> {
+            Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+            if (endOffsets == null || endOffsets.isEmpty()) {
+                LOG.error("Failed to get endOffsets from Kafka for topic 
partitions: {}.", topicPartition);
+                return 0L;
+            }
+            Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+            if (beginningOffsets == null || beginningOffsets.isEmpty()) {
+                LOG.error("Failed to get beginningOffsets from Kafka for topic 
partitions: {}.", topicPartition);
+                return 0L;
             }
+            return endOffsets.get(topicPartition) - 
beginningOffsets.get(topicPartition);
         };
 
         metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge);
@@ -172,56 +131,4 @@ public class KafkaOffsetPartitionMetrics<K, V> implements 
MetricSet {
 
         return metrics;
     }
-
-    private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition> 
topicPartitions) {
-        Admin admin = adminSupplier.get();
-        if (admin == null) {
-            LOG.error("Kafka admin object is null, returning 0.");
-            return Collections.EMPTY_MAP;
-        }
-
-        Map<TopicPartition, Long> beginningOffsets;
-        try {
-            beginningOffsets = getOffsets(admin, topicPartitions, 
OffsetSpec.earliest());
-        } catch (RetriableException | ExecutionException | 
InterruptedException e) {
-            LOG.error("Failed to get offset from Kafka for topic partitions: 
{}.", topicPartition, e);
-            return Collections.EMPTY_MAP;
-        }
-        return beginningOffsets;
-    }
-
-    private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> 
topicPartitions) {
-        Admin admin = adminSupplier.get();
-        if (admin == null) {
-            LOG.error("Kafka admin object is null, returning 0.");
-            return Collections.EMPTY_MAP;
-        }
-
-        Map<TopicPartition, Long> endOffsets;
-        try {
-            endOffsets = getOffsets(admin, topicPartitions, 
OffsetSpec.latest());
-        } catch (RetriableException | ExecutionException | 
InterruptedException e) {
-            LOG.error("Failed to get offset from Kafka for topic partitions: 
{}.", topicPartition, e);
-            return Collections.EMPTY_MAP;
-        }
-        return endOffsets;
-    }
-
-    private static Map<TopicPartition, Long> getOffsets(Admin admin, 
Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
-        throws InterruptedException, ExecutionException {
-
-        Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
-        for (TopicPartition topicPartition : topicPartitions) {
-            offsetSpecMap.put(topicPartition, offsetSpec);
-        }
-        Map<TopicPartition, Long> ret = new HashMap<>();
-        ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
-        KafkaFuture<Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
-        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionListOffsetsResultInfoMap = all.get();
-        for (Map.Entry<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> entry :
-                topicPartitionListOffsetsResultInfoMap.entrySet()) {
-            ret.put(entry.getKey(), entry.getValue().offset());
-        }
-        return ret;
-    }
 }
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
index cd0fbfc6b..8bdcef374 100644
--- 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,26 @@
 
 package org.apache.storm.kafka.spout.metrics2;
 
+import static 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getBeginningOffsets;
+import static 
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getEndOffsets;
+
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricSet;
+
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * Topic level metrics.
  * <p>
@@ -42,22 +54,18 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class);
 
     private String topic;
-    long totalSpoutLag;
-    long totalEarliestTimeOffset;
-    long totalLatestTimeOffset;
-    long totalLatestEmittedOffset;
-    long totalLatestCompletedOffset;
-    long totalRecordsInPartitions;
-
-
-    public KafkaOffsetTopicMetrics(String topic) {
+    Set<TopicPartition> assignment;
+    Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+    Supplier<Admin> adminSupplier;
+
+    public KafkaOffsetTopicMetrics(String topic, Supplier<Map<TopicPartition,
+                                           OffsetManager>> 
offsetManagerSupplier,
+                                   Supplier<Admin> adminSupplier,
+                                   Set<TopicPartition> newAssignment) {
         this.topic = topic;
-        this.totalSpoutLag = 0L;
-        this.totalEarliestTimeOffset = 0L;
-        this.totalLatestTimeOffset = 0L;
-        this.totalLatestEmittedOffset = 0L;
-        this.totalLatestCompletedOffset = 0L;
-        this.totalRecordsInPartitions = 0L;
+        this.assignment = newAssignment;
+        this.offsetManagerSupplier = offsetManagerSupplier;
+        this.adminSupplier = adminSupplier;
         LOG.info("Create KafkaOffsetTopicMetrics for topic: {}", topic);
     }
 
@@ -65,46 +73,117 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
     public Map<String, Metric> getMetrics() {
         Map<String, Metric> metrics = new HashMap();
 
-        Gauge<Long> totalSpoutLagGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalSpoutLag;
+        Gauge<Long> totalSpoutLagGauge = () -> {
+            Long totalSpoutLag = 0L;
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+                    if (endOffsets == null || endOffsets.isEmpty()) {
+                        LOG.error("Failed to get endOffsets from Kafka for 
topic partitions: {}.", topicPartition);
+                        return 0L;
+                    }
+                    // add value to topic level metric
+                    OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+                    Long ret = endOffsets.get(topicPartition) - 
offsetManager.getCommittedOffset();
+                    totalSpoutLag += ret;
+                }
             }
+            return totalSpoutLag;
         };
 
-        Gauge<Long> totalEarliestTimeOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalEarliestTimeOffset;
+        Gauge<Long> totalEarliestTimeOffsetGauge = () -> {
+
+            Long totalEarliestTimeOffset = 0L;
+
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+                    if (beginningOffsets == null || 
beginningOffsets.isEmpty()) {
+                        LOG.error("Failed to get beginningOffsets from Kafka 
for topic partitions: {}.", topicPartition);
+                        return 0L;
+                    }
+                    // add value to topic level metric
+                    Long ret = beginningOffsets.get(topicPartition);
+                    totalEarliestTimeOffset += ret;
+                }
+
             }
+            return totalEarliestTimeOffset;
         };
 
-        Gauge<Long> totalLatestTimeOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalLatestTimeOffset;
+        Gauge<Long> totalLatestTimeOffsetGauge = () -> {
+
+            Long totalLatestTimeOffset = 0L;
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+                    if (endOffsets == null || endOffsets.isEmpty()) {
+                        LOG.error("Failed to get endOffsets from Kafka for 
topic partitions: {}.", topicPartition);
+                        return 0L;
+                    }
+                    // add value to topic level metric
+                    Long ret = endOffsets.get(topicPartition);
+                    totalLatestTimeOffset += ret;
+                }
             }
+            return totalLatestTimeOffset;
         };
 
-        Gauge<Long> totalLatestEmittedOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalLatestEmittedOffset;
+        Gauge<Long> totalLatestEmittedOffsetGauge = () -> {
+
+            Long totalLatestEmittedOffset = 0L;
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+                    Long ret = offsetManager.getLatestEmittedOffset();
+                    totalLatestEmittedOffset += ret;
+                }
+
             }
+            return totalLatestEmittedOffset;
         };
 
-        Gauge<Long> totalLatestCompletedOffsetGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalLatestCompletedOffset;
+        Gauge<Long> totalLatestCompletedOffsetGauge = () -> {
+
+            Long totalLatestCompletedOffset = 0L;
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    // add value to topic level metric
+                    OffsetManager offsetManager = 
offsetManagerSupplier.get().get(topicPartition);
+                    Long ret = offsetManager.getCommittedOffset();
+                    totalLatestCompletedOffset += ret;
+                }
             }
+
+            return totalLatestCompletedOffset;
         };
 
-        Gauge<Long> totalRecordsInPartitionsGauge = new Gauge<Long>() {
-            @Override
-            public Long getValue() {
-                return totalRecordsInPartitions;
+        Gauge<Long> totalRecordsInPartitionsGauge = () -> {
+            Long totalRecordsInPartitions = 0L;
+            for (TopicPartition topicPartition : assignment) {
+                String topicOfPartition = topicPartition.topic();
+                if (topicOfPartition.equals(topic)) {
+                    Map<TopicPartition, Long> endOffsets = 
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+                    if (endOffsets == null || endOffsets.isEmpty()) {
+                        LOG.error("Failed to get endOffsets from Kafka for 
topic partitions: {}.", topicPartition);
+                        return 0L;
+                    }
+                    Map<TopicPartition, Long> beginningOffsets = 
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+                    if (beginningOffsets == null || 
beginningOffsets.isEmpty()) {
+                        LOG.error("Failed to get beginningOffsets from Kafka 
for topic partitions: {}.", topicPartition);
+                        return 0L;
+                    }
+                    // add value to topic level metric
+                    Long ret = endOffsets.get(topicPartition) - 
beginningOffsets.get(topicPartition);
+                    totalRecordsInPartitions += ret;
+                }
             }
+            return totalRecordsInPartitions;
         };
 
         metrics.put(topic + "/" + "totalSpoutLag", totalSpoutLagGauge);
@@ -115,25 +194,4 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
         metrics.put(topic + "/" + "totalRecordsInPartitions", 
totalRecordsInPartitionsGauge);
         return metrics;
     }
-
-    private class TopicMetrics {
-        long totalSpoutLag = 0L;
-        long totalEarliestTimeOffset = 0L;
-        long totalLatestTimeOffset = 0L;
-        long totalLatestEmittedOffset = 0L;
-        long totalLatestCompletedOffset = 0L;
-        long totalRecordsInPartitions = 0L;
-
-        public void incrementTotalSpoutLag(long offset) {
-            totalSpoutLag += offset;
-        }
-
-        public void incrementTotalEarliestTimeOffset(long offset) {
-            totalEarliestTimeOffset += offset;
-        }
-
-        public void incrementTotalLatestTimeOffset(long offset) {
-            totalLatestTimeOffset += offset;
-        }
-    }
 }
diff --git 
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
new file mode 100644
index 000000000..c7f9fabc4
--- /dev/null
+++ 
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaOffsetUtil {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetUtil.class);
+
+    public static Map<TopicPartition, Long> 
getBeginningOffsets(Set<TopicPartition> topicPartitions, Supplier<Admin> 
adminSupplier) {
+        Admin admin = adminSupplier.get();
+        if (admin == null) {
+            LOG.error("Kafka admin object is null, returning 0.");
+            return Collections.EMPTY_MAP;
+        }
+
+        Map<TopicPartition, Long> beginningOffsets;
+        try {
+            beginningOffsets = getOffsets(admin, topicPartitions, 
OffsetSpec.earliest());
+        } catch (RetriableException | ExecutionException | 
InterruptedException e) {
+            LOG.error("Failed to get offset from Kafka for topic partitions: 
{}.", topicPartitions, e);
+            return Collections.EMPTY_MAP;
+        }
+        return beginningOffsets;
+    }
+
+    public static Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition> 
topicPartitions, Supplier<Admin> adminSupplier) {
+        Admin admin = adminSupplier.get();
+        if (admin == null) {
+            LOG.error("Kafka admin object is null, returning 0.");
+            return Collections.EMPTY_MAP;
+        }
+
+        Map<TopicPartition, Long> endOffsets;
+        try {
+            endOffsets = getOffsets(admin, topicPartitions, 
OffsetSpec.latest());
+        } catch (RetriableException | ExecutionException | 
InterruptedException e) {
+            LOG.error("Failed to get offset from Kafka for topic partitions: 
{}.", topicPartitions, e);
+            return Collections.EMPTY_MAP;
+        }
+        return endOffsets;
+    }
+
+    public static Map<TopicPartition, Long> getOffsets(Admin admin, 
Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
+            throws InterruptedException, ExecutionException {
+
+        Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
+        for (TopicPartition topicPartition : topicPartitions) {
+            offsetSpecMap.put(topicPartition, offsetSpec);
+        }
+        Map<TopicPartition, Long> ret = new HashMap<>();
+        ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
+        KafkaFuture<Map<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionListOffsetsResultInfoMap = all.get();
+        for (Map.Entry<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> entry :
+                topicPartitionListOffsetsResultInfoMap.entrySet()) {
+            ret.put(entry.getKey(), entry.getValue().offset());
+        }
+        return ret;
+    }
+}
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index b4cab2ace..b7ae2824b 100644
--- 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -29,9 +29,12 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -65,6 +68,7 @@ public class KafkaSpoutReactivationTest {
     private final SpoutOutputCollector collector = 
mock(SpoutOutputCollector.class);
     private final long commitOffsetPeriodMs = 2_000;
     private Consumer<String, String> consumerSpy;
+    private Admin adminSpy;
     private KafkaSpout<String, String> spout;
     private final int maxPollRecords = 10;
 
@@ -78,9 +82,12 @@ public class KafkaSpoutReactivationTest {
                 .build();
         ClientFactory<String, String> clientFactory = new 
ClientFactoryDefault<>();
         this.consumerSpy = 
spy(clientFactory.createConsumer(spoutConfig.getKafkaProps()));
+        this.adminSpy = 
spy(clientFactory.createAdmin(spoutConfig.getKafkaProps()));
         ClientFactory<String, String> clientFactoryMock = 
mock(ClientFactory.class);
         when(clientFactoryMock.createConsumer(any()))
             .thenReturn(consumerSpy);
+        when(clientFactoryMock.createAdmin(any()))
+                .thenReturn(adminSpy);
         this.spout = new KafkaSpout<>(spoutConfig, clientFactoryMock, new 
TopicAssigner());
         
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(),
 SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
         SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf, 
topologyContext, collector);
@@ -147,4 +154,24 @@ public class KafkaSpoutReactivationTest {
         //With earliest, the spout should also resume where it left off, 
rather than restart at the earliest offset.
         doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
     }
+
+    @Test
+    public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws 
Exception {
+        //Storm will try to get metrics from the spout even while deactivated, 
the spout must be able to handle this
+        prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+        for (int i = 0; i < 5; i++) {
+            KafkaSpoutMessageId msgId = emitOne();
+            spout.ack(msgId);
+        }
+        spout.deactivate();
+
+        Map<String, Metric> partitionsOffsetMetric = 
spout.getKafkaOffsetMetricManager().getTopicPartitionMetricsMap().get(new 
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC ,0)).getMetrics();
+        Long partitionLag = (Long) ((Gauge) 
partitionsOffsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + 
"/partition_0/spoutLag")).getValue();
+        assertThat(partitionLag, is(5L));
+
+        Map<String, Metric> topicOffsetMetric = 
spout.getKafkaOffsetMetricManager().getTopicMetricsMap().get(SingleTopicKafkaSpoutConfiguration.TOPIC).getMetrics();
+        Long totalSpoutLag = (Long) ((Gauge) 
topicOffsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC + 
"/totalSpoutLag")).getValue();
+        assertThat(totalSpoutLag, is(5L));
+    }
 }
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
new file mode 100644
index 000000000..ee085858e
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metric2;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaOffsetPartitionMetricsTest {
+
+    private Set<TopicPartition> assignment;
+    private Admin admin = mock(Admin.class);
+    private HashMap<TopicPartition, OffsetManager> offsetManagers;
+    private KafkaFuture kafkaFuture = mock(KafkaFuture.class);
+
+    @BeforeEach
+    public void initializeTests() {
+        reset(admin, kafkaFuture);
+
+    }
+
+    @Test
+    public void registerMetricsGetSpoutLagAndPartitionRecords() throws 
ExecutionException, InterruptedException {
+
+        TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1LatestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+        ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+        when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+        admin = mock(Admin.class);
+        when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+        OffsetManager offsetManagertopicAPartition1 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(90L);
+
+
+        offsetManagers = new HashMap<>();
+        offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+
+        KafkaOffsetPartitionMetrics kafkaOffsetPartitionAndTopicMetrics = new 
KafkaOffsetPartitionMetrics(() -> Collections.unmodifiableMap(offsetManagers), 
() -> admin, topicAPartition1);
+        Map<String, Metric> result = 
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+        Gauge g1 = (Gauge) result.get("topicA/partition_1/spoutLag");
+        assertEquals(10L, g1.getValue());
+
+
+        //get partition records
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1EarliestListOffsetsResultInfo);
+
+        //mock consecutive calls. Each call to the recordsInPartition gauge 
will call kafkaFuture.get() twice
+        
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap,
 topicPartitionEarliestListOffsetsResultInfoMap);
+
+        result = kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+        g1 = (Gauge) result.get("topicA/partition_1/recordsInPartition");
+        assertEquals(99L, g1.getValue());
+    }
+
+    @Test
+    public void registerMetricsGetEarliestAndLatest() throws 
ExecutionException, InterruptedException {
+
+        TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1EarliestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap);
+
+        ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+        when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+        admin = mock(Admin.class);
+        when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+        OffsetManager offsetManagertopicAPartition1 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition1.getLatestEmittedOffset()).thenReturn(50L);
+        
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(40L);
+
+
+        offsetManagers = new HashMap<>();
+        offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+
+        assignment = new HashSet<>();
+        assignment.add(topicAPartition1);
+
+        KafkaOffsetPartitionMetrics kafkaOffsetPartitionAndTopicMetrics = new 
KafkaOffsetPartitionMetrics(() -> Collections.unmodifiableMap(offsetManagers), 
() -> admin, topicAPartition1);
+        Map<String, Metric> result = 
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+        Gauge g1 = (Gauge) result.get("topicA/partition_1/earliestTimeOffset");
+
+        assertEquals(g1.getValue(), 1L);
+
+        //get the latest offsets
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), 
Optional.empty());
+
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1LatestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+        result = kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+
+        g1 = (Gauge) result.get("topicA/partition_1/latestTimeOffset");
+        assertEquals(100L, g1.getValue());
+
+        g1 = (Gauge) result.get("topicA/partition_1/latestEmittedOffset");
+        assertEquals(50L, g1.getValue());
+
+
+        g1 = (Gauge) result.get("topicA/partition_1/latestCompletedOffset");
+        assertEquals(40L, g1.getValue());
+
+    }
+}
diff --git 
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
new file mode 100644
index 000000000..264625033
--- /dev/null
+++ 
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metric2;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.kafka.spout.metrics2.KafkaOffsetTopicMetrics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaOffsetTopicMetricsTest {
+
+    private Set<TopicPartition> assignment;
+    private Admin admin = mock(Admin.class);
+    private HashMap<TopicPartition, OffsetManager> offsetManagers;
+    private KafkaFuture kafkaFuture = mock(KafkaFuture.class);
+
+    @BeforeEach
+    public void initializeTests() {
+        reset(admin, kafkaFuture);
+    }
+
+    @Test
+    public void registerMetricsGetSpoutLagAndPartitionRecords() throws 
ExecutionException, InterruptedException {
+
+        TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+        TopicPartition topicAPartition2 = new TopicPartition("topicA", 2);
+        TopicPartition topicBPartition1 = new TopicPartition("topicB", 1);
+        TopicPartition topicBPartition2 = new TopicPartition("topicB", 2);
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition2LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition2LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2, 
topicAPartition2LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1, 
topicBPartition1LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2, 
topicBPartition2LatestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+        ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+        when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+        admin = mock(Admin.class);
+        when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+        OffsetManager offsetManagertopicAPartition1 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(90L);
+
+        OffsetManager offsetManagertopicAPartition2 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(170L);
+
+        offsetManagers = new HashMap<>();
+        offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+        offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2);
+
+        assignment = new HashSet<>();
+        assignment.add(topicAPartition1);
+        assignment.add(topicAPartition2);
+        assignment.add(topicBPartition1);
+        assignment.add(topicBPartition2);
+
+
+        KafkaOffsetTopicMetrics kafkaOffsetTopicMetricsA = new 
KafkaOffsetTopicMetrics("topicA", () -> 
Collections.unmodifiableMap(offsetManagers), () -> admin, assignment);
+        Map<String, Metric> result = kafkaOffsetTopicMetricsA.getMetrics();
+        Gauge g1 = (Gauge) result.get("topicA/totalSpoutLag");
+        assertEquals(40L, g1.getValue());
+
+        //get again the values from the Gauge. Values cannot change
+        g1 = (Gauge) result.get("topicA/totalSpoutLag");
+        assertEquals(40L, g1.getValue());
+
+        assertNull(result.get("topicB/totalSpoutLag"));
+
+        //get topic records
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition2EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(2, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(3, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition2EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(4, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2, 
topicAPartition2EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1, 
topicBPartition1EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2, 
topicBPartition2EarliestListOffsetsResultInfo);
+
+        //mock consecutive calls. Each call to the recordsInPartition gauge 
will call kafkaFuture.get() twice
+        when(kafkaFuture.get()).thenReturn(
+                topicPartitionLatestListOffsetsResultInfoMap, 
topicPartitionEarliestListOffsetsResultInfoMap,
+                topicPartitionLatestListOffsetsResultInfoMap, 
topicPartitionEarliestListOffsetsResultInfoMap,
+                topicPartitionLatestListOffsetsResultInfoMap, 
topicPartitionEarliestListOffsetsResultInfoMap,
+                topicPartitionLatestListOffsetsResultInfoMap, 
topicPartitionEarliestListOffsetsResultInfoMap);
+
+        Gauge gATotal = (Gauge) result.get("topicA/totalRecordsInPartitions");
+        assertEquals(297L, gATotal.getValue());
+
+        //get again the values from the Gauge. Values cannot change
+         gATotal = (Gauge) result.get("topicA/totalRecordsInPartitions");
+        assertEquals(297L, gATotal.getValue());
+
+        assertNull(result.get("topicB/totalRecordsInPartitions"));
+    }
+
+    @Test
+    public void registerMetricsGetEarliestAndLatest() throws 
ExecutionException, InterruptedException {
+
+        TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+        TopicPartition topicAPartition2 = new TopicPartition("topicA", 2);
+        TopicPartition topicBPartition1 = new TopicPartition("topicB", 1);
+        TopicPartition topicBPartition2 = new TopicPartition("topicB", 2);
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition2EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition1EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition2EarliestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2, 
topicAPartition2EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1, 
topicBPartition1EarliestListOffsetsResultInfo);
+        topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2, 
topicBPartition2EarliestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap);
+
+        ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+        when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+        admin = mock(Admin.class);
+        when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+        OffsetManager offsetManagertopicAPartition1 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition1.getLatestEmittedOffset()).thenReturn(50L);
+        
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(40L);
+
+        OffsetManager offsetManagertopicAPartition2 = 
mock(OffsetManager.class);
+        
when(offsetManagertopicAPartition2.getLatestEmittedOffset()).thenReturn(100L);
+        
when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(90L);
+
+
+        offsetManagers = new HashMap<>();
+        offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+        offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2);
+
+        assignment = new HashSet<>();
+        assignment.add(topicAPartition1);
+        assignment.add(topicAPartition2);
+        assignment.add(topicBPartition1);
+        assignment.add(topicBPartition2);
+
+        KafkaOffsetTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new 
KafkaOffsetTopicMetrics("topicA",() -> 
Collections.unmodifiableMap(offsetManagers), () -> admin, assignment);
+        Map<String, Metric> result = 
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+
+        Gauge gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset");
+        assertEquals(2L, gATotal.getValue());
+        assertNull(result.get("topicB/totalEarliestTimeOffset"));
+
+        //get the metrics a second time. Values should be the same
+
+        gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset");
+        assertEquals(2L, gATotal.getValue());
+        assertNull(result.get("topicB/totalEarliestTimeOffset"));
+
+        //get the latest offsets
+
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicAPartition2LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition1LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(), 
Optional.empty());
+        ListOffsetsResult.ListOffsetsResultInfo 
topicBPartition2LatestListOffsetsResultInfo = new 
ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(), 
Optional.empty());
+
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1, 
topicAPartition1LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2, 
topicAPartition2LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1, 
topicBPartition1LatestListOffsetsResultInfo);
+        topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2, 
topicBPartition2LatestListOffsetsResultInfo);
+
+        
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+        gATotal = (Gauge) result.get("topicA/totalLatestTimeOffset");
+        assertEquals(300L, gATotal.getValue());
+        assertNull((result.get("topicB/totalLatestTimeOffset")));
+
+        gATotal = (Gauge) result.get("topicA/totalLatestEmittedOffset");
+        assertEquals(150L, gATotal.getValue());
+        assertNull( result.get("topicB/totalLatestEmittedOffset"));
+
+
+        gATotal = (Gauge) result.get("topicA/totalLatestCompletedOffset");
+        assertEquals(130L, gATotal.getValue());
+        assertNull(result.get("topiBA/totalLatestCompletedOffset"));
+
+        //get the metrics a second time. Values should be the same
+
+        gATotal = (Gauge) result.get("topicA/totalLatestTimeOffset");
+        assertEquals(300L, gATotal.getValue());
+        assertNull((result.get("topicB/totalLatestTimeOffset")));
+
+        gATotal = (Gauge) result.get("topicA/totalLatestEmittedOffset");
+        assertEquals(150L, gATotal.getValue());
+        assertNull( result.get("topicB/totalLatestEmittedOffset"));
+
+
+        gATotal = (Gauge) result.get("topicA/totalLatestCompletedOffset");
+        assertEquals(130L, gATotal.getValue());
+        assertNull(result.get("topiBA/totalLatestCompletedOffset"));
+
+    }
+}

Reply via email to