This is an automated email from the ASF dual-hosted git repository.
rabreu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 1a6ff411d [STORM-8019] Fixing kafka topic level metrics computation
(#8047)
1a6ff411d is described below
commit 1a6ff411d61e0a5358a2ee159444339041de6ac7
Author: reiabreu <[email protected]>
AuthorDate: Mon Jun 16 10:21:16 2025 +0100
[STORM-8019] Fixing kafka topic level metrics computation (#8047)
* Fixing the way Kafka topic level metrics are computed
---
.../org/apache/storm/kafka/spout/KafkaSpout.java | 59 ++---
.../spout/metrics2/KafkaOffsetMetricManager.java | 13 +-
.../metrics2/KafkaOffsetPartitionMetrics.java | 185 ++++------------
.../spout/metrics2/KafkaOffsetTopicMetrics.java | 182 +++++++++------
.../kafka/spout/metrics2/KafkaOffsetUtil.java | 92 ++++++++
.../kafka/spout/KafkaSpoutReactivationTest.java | 27 +++
.../metric2/KafkaOffsetPartitionMetricsTest.java | 163 ++++++++++++++
.../spout/metric2/KafkaOffsetTopicMetricsTest.java | 243 +++++++++++++++++++++
8 files changed, 729 insertions(+), 235 deletions(-)
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index a328aec5d..0fcb22a04 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,6 +39,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+
import org.apache.commons.lang.Validate;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
@@ -154,7 +155,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
tupleListener.open(conf, context);
this.kafkaOffsetMetricManager
- = new KafkaOffsetMetricManager<>(() ->
Collections.unmodifiableMap(offsetManagers), () -> admin, context);
+ = new KafkaOffsetMetricManager<>(() ->
Collections.unmodifiableMap(offsetManagers), () -> admin, context);
LOG.info("Kafka Spout opened with the following configuration: {}",
kafkaSpoutConfig);
}
@@ -183,7 +184,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
previousAssignment = partitions;
LOG.info("Partitions revoked. [consumer-group={}, consumer={},
topic-partitions={}]",
- kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+ kafkaSpoutConfig.getConsumerGroupId(), consumer,
partitions);
if (isAtLeastOnceProcessing()) {
commitOffsetsForAckedTuples();
@@ -193,7 +194,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
LOG.info("Partitions reassignment. [task-ID={}, consumer-group={},
consumer={}, topic-partitions={}]",
- context.getThisTaskId(),
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
+ context.getThisTaskId(),
kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
initialize(partitions);
tupleListener.onPartitionsReassigned(partitions);
@@ -221,7 +222,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
final OffsetAndMetadata committedOffset =
consumer.committed(newTp);
final long fetchOffset = doSeek(newTp, committedOffset);
LOG.debug("Set consumer position to [{}] for topic-partition
[{}] with [{}] and committed offset [{}]",
- fetchOffset, newTp, firstPollOffsetStrategy,
committedOffset);
+ fetchOffset, newTp, firstPollOffsetStrategy,
committedOffset);
if (isAtLeastOnceProcessing() &&
!offsetManagers.containsKey(newTp)) {
offsetManagers.put(newTp, new OffsetManager(newTp,
fetchOffset));
}
@@ -234,13 +235,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
*/
private long doSeek(TopicPartition newTp, OffsetAndMetadata
committedOffset) {
LOG.trace("Seeking offset for topic-partition [{}] with [{}] and
committed offset [{}]",
- newTp, firstPollOffsetStrategy, committedOffset);
+ newTp, firstPollOffsetStrategy, committedOffset);
if (committedOffset != null) {
// offset was previously committed for this consumer group and
topic-partition, either by this or another topology.
if
(commitMetadataManager.isOffsetCommittedByThisTopology(newTp,
- committedOffset,
- Collections.unmodifiableMap(offsetManagers))) {
+ committedOffset,
+ Collections.unmodifiableMap(offsetManagers))) {
// Another KafkaSpout instance (of this topology) already
committed, therefore FirstPollOffsetStrategy does not apply.
consumer.seek(newTp, committedOffset.offset());
} else {
@@ -281,7 +282,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
commitOffsetsForAckedTuples();
} else if (kafkaSpoutConfig.getProcessingGuarantee() ==
ProcessingGuarantee.NO_GUARANTEE) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
- createFetchedOffsetsMetadata(consumer.assignment());
+
createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitAsync(offsetsToCommit, null);
LOG.debug("Committed offsets {} to Kafka",
offsetsToCommit);
}
@@ -336,7 +337,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
pollablePartitions.add(tp);
} else {
LOG.debug("Not polling on partition [{}]. It has [{}]
uncommitted offsets, which exceeds the limit of [{}]. ", tp,
- numUncommittedOffsets, maxUncommittedOffsets);
+ numUncommittedOffsets, maxUncommittedOffsets);
}
}
}
@@ -345,7 +346,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean isWaitingToEmit() {
return waitingToEmit.values().stream()
- .anyMatch(list -> !list.isEmpty());
+ .anyMatch(list -> !list.isEmpty());
}
private void setWaitingToEmit(ConsumerRecords<K, V> consumerRecords) {
@@ -365,11 +366,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets,
consumerRecords);
final int numPolledRecords = consumerRecords.count();
LOG.debug("Polled [{}] records from Kafka",
- numPolledRecords);
+ numPolledRecords);
if (kafkaSpoutConfig.getProcessingGuarantee() ==
KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
//Commit polled records immediately to ensure delivery is
at-most-once.
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
- createFetchedOffsetsMetadata(consumer.assignment());
+ createFetchedOffsetsMetadata(consumer.assignment());
consumer.commitSync(offsetsToCommit);
LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
}
@@ -387,7 +388,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
private void ackRetriableOffsetsIfCompactedAway(Map<TopicPartition, Long>
earliestRetriableOffsets,
- ConsumerRecords<K, V> consumerRecords) {
+ ConsumerRecords<K, V>
consumerRecords) {
for (Entry<TopicPartition, Long> entry :
earliestRetriableOffsets.entrySet()) {
TopicPartition tp = entry.getKey();
List<ConsumerRecord<K, V>> records = consumerRecords.records(tp);
@@ -529,7 +530,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
* to the committed offset.
*/
LOG.debug("Consumer fell behind committed offset. Catching
up. Position was [{}], skipping to [{}]",
- position, committedOffset);
+ position, committedOffset);
consumer.seek(tp, committedOffset);
}
/**
@@ -539,8 +540,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (waitingToEmitForTp != null) {
//Discard the pending records that are already committed
waitingToEmit.put(tp, waitingToEmitForTp.stream()
- .filter(record -> record.offset() >= committedOffset)
- .collect(Collectors.toCollection(LinkedList::new)));
+ .filter(record -> record.offset() >=
committedOffset)
+
.collect(Collectors.toCollection(LinkedList::new)));
}
final OffsetManager offsetManager = offsetManagers.get(tp);
@@ -573,11 +574,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (!emitted.contains(msgId)) {
LOG.debug("Received ack for message [{}], associated with tuple
emitted for a ConsumerRecord that "
- + "came from a topic-partition that this consumer group
instance is no longer tracking "
- + "due to rebalance/partition reassignment. No action taken.",
msgId);
+ + "came from a topic-partition that this consumer group
instance is no longer tracking "
+ + "due to rebalance/partition reassignment. No action
taken.", msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id
" + msgId + " is queued for retry while being acked."
- + " This should never occur barring errors in the RetryService
implementation or the spout code.");
+ + " This should never occur barring errors in the
RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
@@ -595,11 +596,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
if (!emitted.contains(msgId)) {
LOG.debug("Received fail for tuple this spout is no longer
tracking."
- + " Partitions may have been reassigned. Ignoring message
[{}]", msgId);
+ + " Partitions may have been reassigned. Ignoring message
[{}]", msgId);
return;
}
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " +
msgId + " is queued for retry while being failed."
- + " This should never occur barring errors in the RetryService
implementation or the spout code.");
+ + " This should never occur barring errors in the RetryService
implementation or the spout code.");
msgId.incrementNumFails();
@@ -630,7 +631,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
List<TopicPartition> allPartitionsSorted = new
ArrayList<>(allPartitions);
Collections.sort(allPartitionsSorted,
TopicPartitionComparator.INSTANCE);
Set<TopicPartition> assignedPartitions =
kafkaSpoutConfig.getTopicPartitioner()
- .getPartitionsForThisTask(allPartitionsSorted, context);
+ .getPartitionsForThisTask(allPartitionsSorted, context);
boolean partitionChanged = topicAssigner.assignPartitions(consumer,
assignedPartitions, rebalanceListener);
if (partitionChanged && canRegisterMetrics()) {
LOG.info("Partitions assignments has changed, updating metrics.");
@@ -683,9 +684,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public String toString() {
return "KafkaSpout{"
- + "offsetManagers =" + offsetManagers
- + ", emitted=" + emitted
- + "}";
+ + "offsetManagers =" + offsetManagers
+ + ", emitted=" + emitted
+ + "}";
}
@Override
@@ -718,8 +719,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean isWrapper(Class<?> type) {
return type == Double.class || type == Float.class || type ==
Long.class
- || type == Integer.class || type == Short.class || type ==
Character.class
- || type == Byte.class || type == Boolean.class || type ==
String.class;
+ || type == Integer.class || type == Short.class || type ==
Character.class
+ || type == Byte.class || type == Boolean.class || type ==
String.class;
}
private String getTopicsString() {
@@ -735,8 +736,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
PollablePartitionsInfo(Set<TopicPartition> pollablePartitions,
Map<TopicPartition, Long> earliestRetriableOffsets) {
this.pollablePartitions = pollablePartitions;
this.pollableEarliestRetriableOffsets =
earliestRetriableOffsets.entrySet().stream()
- .filter(entry -> pollablePartitions.contains(entry.getKey()))
- .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
entry.getValue()));
+ .filter(entry ->
pollablePartitions.contains(entry.getKey()))
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry
-> entry.getValue()));
}
public boolean shouldPoll() {
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
index 2b150797f..64c42832c 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetMetricManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -22,7 +22,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
-
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.internal.OffsetManager;
@@ -30,6 +29,8 @@ import org.apache.storm.task.TopologyContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
/**
* This class is used to manage both the partition and topic level offset
metrics.
*/
@@ -55,6 +56,7 @@ public class KafkaOffsetMetricManager<K, V> {
}
public void registerMetricsForNewTopicPartitions(Set<TopicPartition>
newAssignment) {
+
for (TopicPartition topicPartition : newAssignment) {
if (!topicPartitionMetricsMap.containsKey(topicPartition)) {
LOG.info("Registering metric for topicPartition: {}",
topicPartition);
@@ -62,13 +64,14 @@ public class KafkaOffsetMetricManager<K, V> {
String topic = topicPartition.topic();
KafkaOffsetTopicMetrics topicMetrics =
topicMetricsMap.get(topic);
if (topicMetrics == null) {
- topicMetrics = new KafkaOffsetTopicMetrics(topic);
+ LOG.info("Registering metric for topic: {}", topic);
+ topicMetrics = new KafkaOffsetTopicMetrics(topic,
offsetManagerSupplier, adminSupplier, newAssignment);
topicMetricsMap.put(topic, topicMetrics);
topologyContext.registerMetricSet("kafkaOffset",
topicMetrics);
}
KafkaOffsetPartitionMetrics topicPartitionMetricSet
- = new KafkaOffsetPartitionMetrics<>(offsetManagerSupplier,
adminSupplier, topicPartition, topicMetrics);
+ = new
KafkaOffsetPartitionMetrics<>(offsetManagerSupplier, adminSupplier,
topicPartition);
topicPartitionMetricsMap.put(topicPartition,
topicPartitionMetricSet);
topologyContext.registerMetricSet("kafkaOffset",
topicPartitionMetricSet);
}
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
index 6b29eefc9..df5b0c869 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetPartitionMetrics.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,27 +18,25 @@
package org.apache.storm.kafka.spout.metrics2;
+import static
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getBeginningOffsets;
+import static
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getEndOffsets;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
-
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
-
import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.OffsetSpec;
-import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
+
/**
* Partition level metrics.
* <p>
@@ -56,16 +54,13 @@ public class KafkaOffsetPartitionMetrics<K, V> implements
MetricSet {
private final Supplier<Admin> adminSupplier;
private TopicPartition topicPartition;
- private KafkaOffsetTopicMetrics topicMetrics;
public KafkaOffsetPartitionMetrics(Supplier<Map<TopicPartition,
OffsetManager>> offsetManagerSupplier,
Supplier<Admin> adminSupplier,
- TopicPartition topicPartition,
- KafkaOffsetTopicMetrics topicMetrics) {
+ TopicPartition topicPartition) {
this.offsetManagerSupplier = offsetManagerSupplier;
this.adminSupplier = adminSupplier;
this.topicPartition = topicPartition;
- this.topicMetrics = topicMetrics;
LOG.info("Running KafkaOffsetMetricSet");
}
@@ -74,93 +69,57 @@ public class KafkaOffsetPartitionMetrics<K, V> implements
MetricSet {
public Map<String, Metric> getMetrics() {
Map<String, Metric> metrics = new HashMap();
- String metricPath = topicPartition.topic() + "/partition_" +
topicPartition.partition();
- Gauge<Long> spoutLagGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition));
- if (endOffsets == null || endOffsets.isEmpty()) {
- LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
- return 0L;
- }
- // add value to topic level metric
- OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
- Long ret = endOffsets.get(topicPartition) -
offsetManager.getCommittedOffset();
- topicMetrics.totalSpoutLag += ret;
- return ret;
+ String metricPath = topicPartition.topic() + "/partition_" +
topicPartition.partition();
+ Gauge<Long> spoutLagGauge = () -> {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
+ return 0L;
}
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ return endOffsets.get(topicPartition) -
offsetManager.getCommittedOffset();
};
- Gauge<Long> earliestTimeOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition));
- if (beginningOffsets == null || beginningOffsets.isEmpty()) {
- LOG.error("Failed to get beginningOffsets from Kafka for
topic partitions: {}.", topicPartition);
- return 0L;
- }
- // add value to topic level metric
- Long ret = beginningOffsets.get(topicPartition);
- topicMetrics.totalEarliestTimeOffset +=
beginningOffsets.get(topicPartition);
- return ret;
+ Gauge<Long> earliestTimeOffsetGauge = () -> {
+ Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (beginningOffsets == null || beginningOffsets.isEmpty()) {
+ LOG.error("Failed to get beginningOffsets from Kafka for topic
partitions: {}.", topicPartition);
+ return 0L;
}
+ return beginningOffsets.get(topicPartition);
};
- Gauge<Long> latestTimeOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition));
- if (endOffsets == null || endOffsets.isEmpty()) {
- LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
- return 0L;
- }
- // add value to topic level metric
- Long ret = endOffsets.get(topicPartition);
- topicMetrics.totalLatestTimeOffset += ret;
- return ret;
+ Gauge<Long> latestTimeOffsetGauge = () -> {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
+ return 0L;
}
+ return endOffsets.get(topicPartition);
};
- Gauge<Long> latestEmittedOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- // add value to topic level metric
- OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
- Long ret = offsetManager.getLatestEmittedOffset();
- topicMetrics.totalLatestEmittedOffset += ret;
- return ret;
- }
+ Gauge<Long> latestEmittedOffsetGauge = () -> {
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ return offsetManager.getLatestEmittedOffset();
};
- Gauge<Long> latestCompletedOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- // add value to topic level metric
- OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
- Long ret = offsetManager.getCommittedOffset();
- topicMetrics.totalLatestCompletedOffset += ret;
- return ret;
- }
+ Gauge<Long> latestCompletedOffsetGauge = () -> {
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ return offsetManager.getCommittedOffset();
};
- Gauge<Long> recordsInPartitionGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition));
- if (endOffsets == null || endOffsets.isEmpty()) {
- LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
- return 0L;
- }
- Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition));
- if (beginningOffsets == null || beginningOffsets.isEmpty()) {
- LOG.error("Failed to get beginningOffsets from Kafka for
topic partitions: {}.", topicPartition);
- return 0L;
- }
- // add value to topic level metric
- Long ret = endOffsets.get(topicPartition) -
beginningOffsets.get(topicPartition);
- topicMetrics.totalRecordsInPartitions += ret;
- return ret;
+ Gauge<Long> recordsInPartitionGauge = () -> {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for topic
partitions: {}.", topicPartition);
+ return 0L;
+ }
+ Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (beginningOffsets == null || beginningOffsets.isEmpty()) {
+ LOG.error("Failed to get beginningOffsets from Kafka for topic
partitions: {}.", topicPartition);
+ return 0L;
}
+ return endOffsets.get(topicPartition) -
beginningOffsets.get(topicPartition);
};
metrics.put(metricPath + "/" + "spoutLag", spoutLagGauge);
@@ -172,56 +131,4 @@ public class KafkaOffsetPartitionMetrics<K, V> implements
MetricSet {
return metrics;
}
-
- private Map<TopicPartition, Long> getBeginningOffsets(Set<TopicPartition>
topicPartitions) {
- Admin admin = adminSupplier.get();
- if (admin == null) {
- LOG.error("Kafka admin object is null, returning 0.");
- return Collections.EMPTY_MAP;
- }
-
- Map<TopicPartition, Long> beginningOffsets;
- try {
- beginningOffsets = getOffsets(admin, topicPartitions,
OffsetSpec.earliest());
- } catch (RetriableException | ExecutionException |
InterruptedException e) {
- LOG.error("Failed to get offset from Kafka for topic partitions:
{}.", topicPartition, e);
- return Collections.EMPTY_MAP;
- }
- return beginningOffsets;
- }
-
- private Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition>
topicPartitions) {
- Admin admin = adminSupplier.get();
- if (admin == null) {
- LOG.error("Kafka admin object is null, returning 0.");
- return Collections.EMPTY_MAP;
- }
-
- Map<TopicPartition, Long> endOffsets;
- try {
- endOffsets = getOffsets(admin, topicPartitions,
OffsetSpec.latest());
- } catch (RetriableException | ExecutionException |
InterruptedException e) {
- LOG.error("Failed to get offset from Kafka for topic partitions:
{}.", topicPartition, e);
- return Collections.EMPTY_MAP;
- }
- return endOffsets;
- }
-
- private static Map<TopicPartition, Long> getOffsets(Admin admin,
Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
- throws InterruptedException, ExecutionException {
-
- Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
- for (TopicPartition topicPartition : topicPartitions) {
- offsetSpecMap.put(topicPartition, offsetSpec);
- }
- Map<TopicPartition, Long> ret = new HashMap<>();
- ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
- KafkaFuture<Map<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
- Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionListOffsetsResultInfoMap = all.get();
- for (Map.Entry<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> entry :
- topicPartitionListOffsetsResultInfoMap.entrySet()) {
- ret.put(entry.getKey(), entry.getValue().offset());
- }
- return ret;
- }
}
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
index cd0fbfc6b..8bdcef374 100644
---
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetTopicMetrics.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ * <p>
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,14 +18,26 @@
package org.apache.storm.kafka.spout.metrics2;
+import static
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getBeginningOffsets;
+import static
org.apache.storm.kafka.spout.metrics2.KafkaOffsetUtil.getEndOffsets;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
+
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* Topic level metrics.
* <p>
@@ -42,22 +54,18 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaOffsetTopicMetrics.class);
private String topic;
- long totalSpoutLag;
- long totalEarliestTimeOffset;
- long totalLatestTimeOffset;
- long totalLatestEmittedOffset;
- long totalLatestCompletedOffset;
- long totalRecordsInPartitions;
-
-
- public KafkaOffsetTopicMetrics(String topic) {
+ Set<TopicPartition> assignment;
+ Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+ Supplier<Admin> adminSupplier;
+
+ public KafkaOffsetTopicMetrics(String topic, Supplier<Map<TopicPartition,
+ OffsetManager>>
offsetManagerSupplier,
+ Supplier<Admin> adminSupplier,
+ Set<TopicPartition> newAssignment) {
this.topic = topic;
- this.totalSpoutLag = 0L;
- this.totalEarliestTimeOffset = 0L;
- this.totalLatestTimeOffset = 0L;
- this.totalLatestEmittedOffset = 0L;
- this.totalLatestCompletedOffset = 0L;
- this.totalRecordsInPartitions = 0L;
+ this.assignment = newAssignment;
+ this.offsetManagerSupplier = offsetManagerSupplier;
+ this.adminSupplier = adminSupplier;
LOG.info("Create KafkaOffsetTopicMetrics for topic: {}", topic);
}
@@ -65,46 +73,117 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
public Map<String, Metric> getMetrics() {
Map<String, Metric> metrics = new HashMap();
- Gauge<Long> totalSpoutLagGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalSpoutLag;
+ Gauge<Long> totalSpoutLagGauge = () -> {
+ Long totalSpoutLag = 0L;
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for
topic partitions: {}.", topicPartition);
+ return 0L;
+ }
+ // add value to topic level metric
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ Long ret = endOffsets.get(topicPartition) -
offsetManager.getCommittedOffset();
+ totalSpoutLag += ret;
+ }
}
+ return totalSpoutLag;
};
- Gauge<Long> totalEarliestTimeOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalEarliestTimeOffset;
+ Gauge<Long> totalEarliestTimeOffsetGauge = () -> {
+
+ Long totalEarliestTimeOffset = 0L;
+
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (beginningOffsets == null ||
beginningOffsets.isEmpty()) {
+ LOG.error("Failed to get beginningOffsets from Kafka
for topic partitions: {}.", topicPartition);
+ return 0L;
+ }
+ // add value to topic level metric
+ Long ret = beginningOffsets.get(topicPartition);
+ totalEarliestTimeOffset += ret;
+ }
+
}
+ return totalEarliestTimeOffset;
};
- Gauge<Long> totalLatestTimeOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalLatestTimeOffset;
+ Gauge<Long> totalLatestTimeOffsetGauge = () -> {
+
+ Long totalLatestTimeOffset = 0L;
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for
topic partitions: {}.", topicPartition);
+ return 0L;
+ }
+ // add value to topic level metric
+ Long ret = endOffsets.get(topicPartition);
+ totalLatestTimeOffset += ret;
+ }
}
+ return totalLatestTimeOffset;
};
- Gauge<Long> totalLatestEmittedOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalLatestEmittedOffset;
+ Gauge<Long> totalLatestEmittedOffsetGauge = () -> {
+
+ Long totalLatestEmittedOffset = 0L;
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ Long ret = offsetManager.getLatestEmittedOffset();
+ totalLatestEmittedOffset += ret;
+ }
+
}
+ return totalLatestEmittedOffset;
};
- Gauge<Long> totalLatestCompletedOffsetGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalLatestCompletedOffset;
+ Gauge<Long> totalLatestCompletedOffsetGauge = () -> {
+
+ Long totalLatestCompletedOffset = 0L;
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ // add value to topic level metric
+ OffsetManager offsetManager =
offsetManagerSupplier.get().get(topicPartition);
+ Long ret = offsetManager.getCommittedOffset();
+ totalLatestCompletedOffset += ret;
+ }
}
+
+ return totalLatestCompletedOffset;
};
- Gauge<Long> totalRecordsInPartitionsGauge = new Gauge<Long>() {
- @Override
- public Long getValue() {
- return totalRecordsInPartitions;
+ Gauge<Long> totalRecordsInPartitionsGauge = () -> {
+ Long totalRecordsInPartitions = 0L;
+ for (TopicPartition topicPartition : assignment) {
+ String topicOfPartition = topicPartition.topic();
+ if (topicOfPartition.equals(topic)) {
+ Map<TopicPartition, Long> endOffsets =
getEndOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (endOffsets == null || endOffsets.isEmpty()) {
+ LOG.error("Failed to get endOffsets from Kafka for
topic partitions: {}.", topicPartition);
+ return 0L;
+ }
+ Map<TopicPartition, Long> beginningOffsets =
getBeginningOffsets(Collections.singleton(topicPartition), adminSupplier);
+ if (beginningOffsets == null ||
beginningOffsets.isEmpty()) {
+ LOG.error("Failed to get beginningOffsets from Kafka
for topic partitions: {}.", topicPartition);
+ return 0L;
+ }
+ // add value to topic level metric
+ Long ret = endOffsets.get(topicPartition) -
beginningOffsets.get(topicPartition);
+ totalRecordsInPartitions += ret;
+ }
}
+ return totalRecordsInPartitions;
};
metrics.put(topic + "/" + "totalSpoutLag", totalSpoutLagGauge);
@@ -115,25 +194,4 @@ public class KafkaOffsetTopicMetrics implements MetricSet {
metrics.put(topic + "/" + "totalRecordsInPartitions",
totalRecordsInPartitionsGauge);
return metrics;
}
-
- private class TopicMetrics {
- long totalSpoutLag = 0L;
- long totalEarliestTimeOffset = 0L;
- long totalLatestTimeOffset = 0L;
- long totalLatestEmittedOffset = 0L;
- long totalLatestCompletedOffset = 0L;
- long totalRecordsInPartitions = 0L;
-
- public void incrementTotalSpoutLag(long offset) {
- totalSpoutLag += offset;
- }
-
- public void incrementTotalEarliestTimeOffset(long offset) {
- totalEarliestTimeOffset += offset;
- }
-
- public void incrementTotalLatestTimeOffset(long offset) {
- totalLatestTimeOffset += offset;
- }
- }
}
diff --git
a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
new file mode 100644
index 000000000..c7f9fabc4
--- /dev/null
+++
b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics2/KafkaOffsetUtil.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class KafkaOffsetUtil {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(KafkaOffsetUtil.class);
+
+ public static Map<TopicPartition, Long>
getBeginningOffsets(Set<TopicPartition> topicPartitions, Supplier<Admin>
adminSupplier) {
+ Admin admin = adminSupplier.get();
+ if (admin == null) {
+ LOG.error("Kafka admin object is null, returning 0.");
+ return Collections.EMPTY_MAP;
+ }
+
+ Map<TopicPartition, Long> beginningOffsets;
+ try {
+ beginningOffsets = getOffsets(admin, topicPartitions,
OffsetSpec.earliest());
+ } catch (RetriableException | ExecutionException |
InterruptedException e) {
+ LOG.error("Failed to get offset from Kafka for topic partitions:
{}.", topicPartitions, e);
+ return Collections.EMPTY_MAP;
+ }
+ return beginningOffsets;
+ }
+
+ public static Map<TopicPartition, Long> getEndOffsets(Set<TopicPartition>
topicPartitions, Supplier<Admin> adminSupplier) {
+ Admin admin = adminSupplier.get();
+ if (admin == null) {
+ LOG.error("Kafka admin object is null, returning 0.");
+ return Collections.EMPTY_MAP;
+ }
+
+ Map<TopicPartition, Long> endOffsets;
+ try {
+ endOffsets = getOffsets(admin, topicPartitions,
OffsetSpec.latest());
+ } catch (RetriableException | ExecutionException |
InterruptedException e) {
+ LOG.error("Failed to get offset from Kafka for topic partitions:
{}.", topicPartitions, e);
+ return Collections.EMPTY_MAP;
+ }
+ return endOffsets;
+ }
+
+ public static Map<TopicPartition, Long> getOffsets(Admin admin,
Set<TopicPartition> topicPartitions, OffsetSpec offsetSpec)
+ throws InterruptedException, ExecutionException {
+
+ Map<TopicPartition, OffsetSpec> offsetSpecMap = new HashMap<>();
+ for (TopicPartition topicPartition : topicPartitions) {
+ offsetSpecMap.put(topicPartition, offsetSpec);
+ }
+ Map<TopicPartition, Long> ret = new HashMap<>();
+ ListOffsetsResult listOffsetsResult = admin.listOffsets(offsetSpecMap);
+ KafkaFuture<Map<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo>> all = listOffsetsResult.all();
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionListOffsetsResultInfoMap = all.get();
+ for (Map.Entry<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> entry :
+ topicPartitionListOffsetsResultInfoMap.entrySet()) {
+ ret.put(entry.getKey(), entry.getValue().offset());
+ }
+ return ret;
+ }
+}
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index b4cab2ace..b7ae2824b 100644
---
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -29,9 +29,12 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
import java.util.HashMap;
import java.util.Map;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -65,6 +68,7 @@ public class KafkaSpoutReactivationTest {
private final SpoutOutputCollector collector =
mock(SpoutOutputCollector.class);
private final long commitOffsetPeriodMs = 2_000;
private Consumer<String, String> consumerSpy;
+ private Admin adminSpy;
private KafkaSpout<String, String> spout;
private final int maxPollRecords = 10;
@@ -78,9 +82,12 @@ public class KafkaSpoutReactivationTest {
.build();
ClientFactory<String, String> clientFactory = new
ClientFactoryDefault<>();
this.consumerSpy =
spy(clientFactory.createConsumer(spoutConfig.getKafkaProps()));
+ this.adminSpy =
spy(clientFactory.createAdmin(spoutConfig.getKafkaProps()));
ClientFactory<String, String> clientFactoryMock =
mock(ClientFactory.class);
when(clientFactoryMock.createConsumer(any()))
.thenReturn(consumerSpy);
+ when(clientFactoryMock.createAdmin(any()))
+ .thenReturn(adminSpy);
this.spout = new KafkaSpout<>(spoutConfig, clientFactoryMock, new
TopicAssigner());
SingleTopicKafkaUnitSetupHelper.populateTopicData(kafkaUnitExtension.getKafkaUnit(),
SingleTopicKafkaSpoutConfiguration.TOPIC, messageCount);
SingleTopicKafkaUnitSetupHelper.initializeSpout(spout, conf,
topologyContext, collector);
@@ -147,4 +154,24 @@ public class KafkaSpoutReactivationTest {
//With earliest, the spout should also resume where it left off,
rather than restart at the earliest offset.
doReactivationTest(FirstPollOffsetStrategy.EARLIEST);
}
+
+ @Test
+ public void testSpoutMustHandleGettingMetricsWhileDeactivated() throws
Exception {
+ //Storm will try to get metrics from the spout even while deactivated,
the spout must be able to handle this
+ prepareSpout(10, FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST);
+
+ for (int i = 0; i < 5; i++) {
+ KafkaSpoutMessageId msgId = emitOne();
+ spout.ack(msgId);
+ }
+ spout.deactivate();
+
+ Map<String, Metric> partitionsOffsetMetric =
spout.getKafkaOffsetMetricManager().getTopicPartitionMetricsMap().get(new
TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC ,0)).getMetrics();
+ Long partitionLag = (Long) ((Gauge)
partitionsOffsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC +
"/partition_0/spoutLag")).getValue();
+ assertThat(partitionLag, is(5L));
+
+ Map<String, Metric> topicOffsetMetric =
spout.getKafkaOffsetMetricManager().getTopicMetricsMap().get(SingleTopicKafkaSpoutConfiguration.TOPIC).getMetrics();
+ Long totalSpoutLag = (Long) ((Gauge)
topicOffsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC +
"/totalSpoutLag")).getValue();
+ assertThat(totalSpoutLag, is(5L));
+ }
}
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
new file mode 100644
index 000000000..ee085858e
--- /dev/null
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetPartitionMetricsTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metric2;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.*;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.kafka.spout.metrics2.KafkaOffsetPartitionMetrics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaOffsetPartitionMetricsTest {
+
+ private Set<TopicPartition> assignment;
+ private Admin admin = mock(Admin.class);
+ private HashMap<TopicPartition, OffsetManager> offsetManagers;
+ private KafkaFuture kafkaFuture = mock(KafkaFuture.class);
+
+ @BeforeEach
+ public void initializeTests() {
+ reset(admin, kafkaFuture);
+
+ }
+
+ @Test
+ public void registerMetricsGetSpoutLagAndPartitionRecords() throws
ExecutionException, InterruptedException {
+
+ TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1LatestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+ ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+ when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+ admin = mock(Admin.class);
+ when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+ OffsetManager offsetManagertopicAPartition1 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(90L);
+
+
+ offsetManagers = new HashMap<>();
+ offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+
+ KafkaOffsetPartitionMetrics kafkaOffsetPartitionAndTopicMetrics = new
KafkaOffsetPartitionMetrics(() -> Collections.unmodifiableMap(offsetManagers),
() -> admin, topicAPartition1);
+ Map<String, Metric> result =
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+ Gauge g1 = (Gauge) result.get("topicA/partition_1/spoutLag");
+ assertEquals(10L, g1.getValue());
+
+
+ //get partition records
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1EarliestListOffsetsResultInfo);
+
+ //mock consecutive calls. Each call to the recordsInPartition gauge
will call kafkaFuture.get() twice
+
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap,
topicPartitionEarliestListOffsetsResultInfoMap);
+
+ result = kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+ g1 = (Gauge) result.get("topicA/partition_1/recordsInPartition");
+ assertEquals(99L, g1.getValue());
+ }
+
+ @Test
+ public void registerMetricsGetEarliestAndLatest() throws
ExecutionException, InterruptedException {
+
+ TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1EarliestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap);
+
+ ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+ when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+ admin = mock(Admin.class);
+ when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+ OffsetManager offsetManagertopicAPartition1 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition1.getLatestEmittedOffset()).thenReturn(50L);
+
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(40L);
+
+
+ offsetManagers = new HashMap<>();
+ offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+
+ assignment = new HashSet<>();
+ assignment.add(topicAPartition1);
+
+ KafkaOffsetPartitionMetrics kafkaOffsetPartitionAndTopicMetrics = new
KafkaOffsetPartitionMetrics(() -> Collections.unmodifiableMap(offsetManagers),
() -> admin, topicAPartition1);
+ Map<String, Metric> result =
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+ Gauge g1 = (Gauge) result.get("topicA/partition_1/earliestTimeOffset");
+
+ assertEquals(g1.getValue(), 1L);
+
+ //get the latest offsets
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(),
Optional.empty());
+
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1LatestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+ result = kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+
+ g1 = (Gauge) result.get("topicA/partition_1/latestTimeOffset");
+ assertEquals(100L, g1.getValue());
+
+ g1 = (Gauge) result.get("topicA/partition_1/latestEmittedOffset");
+ assertEquals(50L, g1.getValue());
+
+
+ g1 = (Gauge) result.get("topicA/partition_1/latestCompletedOffset");
+ assertEquals(40L, g1.getValue());
+
+ }
+}
diff --git
a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
new file mode 100644
index 000000000..264625033
--- /dev/null
+++
b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/metric2/KafkaOffsetTopicMetricsTest.java
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metric2;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.Metric;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.kafka.spout.metrics2.KafkaOffsetTopicMetrics;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.Mockito.*;
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaOffsetTopicMetricsTest {
+
+ private Set<TopicPartition> assignment;
+ private Admin admin = mock(Admin.class);
+ private HashMap<TopicPartition, OffsetManager> offsetManagers;
+ private KafkaFuture kafkaFuture = mock(KafkaFuture.class);
+
+ @BeforeEach
+ public void initializeTests() {
+ reset(admin, kafkaFuture);
+ }
+
+ @Test
+ public void registerMetricsGetSpoutLagAndPartitionRecords() throws
ExecutionException, InterruptedException {
+
+ TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+ TopicPartition topicAPartition2 = new TopicPartition("topicA", 2);
+ TopicPartition topicBPartition1 = new TopicPartition("topicB", 1);
+ TopicPartition topicBPartition2 = new TopicPartition("topicB", 2);
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition2LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition2LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2,
topicAPartition2LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1,
topicBPartition1LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2,
topicBPartition2LatestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+ ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+ when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+ admin = mock(Admin.class);
+ when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+ OffsetManager offsetManagertopicAPartition1 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(90L);
+
+ OffsetManager offsetManagertopicAPartition2 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(170L);
+
+ offsetManagers = new HashMap<>();
+ offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+ offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2);
+
+ assignment = new HashSet<>();
+ assignment.add(topicAPartition1);
+ assignment.add(topicAPartition2);
+ assignment.add(topicBPartition1);
+ assignment.add(topicBPartition2);
+
+
+ KafkaOffsetTopicMetrics kafkaOffsetTopicMetricsA = new
KafkaOffsetTopicMetrics("topicA", () ->
Collections.unmodifiableMap(offsetManagers), () -> admin, assignment);
+ Map<String, Metric> result = kafkaOffsetTopicMetricsA.getMetrics();
+ Gauge g1 = (Gauge) result.get("topicA/totalSpoutLag");
+ assertEquals(40L, g1.getValue());
+
+ //get again the values from the Gauge. Values cannot change
+ g1 = (Gauge) result.get("topicA/totalSpoutLag");
+ assertEquals(40L, g1.getValue());
+
+ assertNull(result.get("topicB/totalSpoutLag"));
+
+ //get topic records
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition2EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(2, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(3, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition2EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(4, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2,
topicAPartition2EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1,
topicBPartition1EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2,
topicBPartition2EarliestListOffsetsResultInfo);
+
+ //mock consecutive calls. Each call to the recordsInPartition gauge
will call kafkaFuture.get() twice
+ when(kafkaFuture.get()).thenReturn(
+ topicPartitionLatestListOffsetsResultInfoMap,
topicPartitionEarliestListOffsetsResultInfoMap,
+ topicPartitionLatestListOffsetsResultInfoMap,
topicPartitionEarliestListOffsetsResultInfoMap,
+ topicPartitionLatestListOffsetsResultInfoMap,
topicPartitionEarliestListOffsetsResultInfoMap,
+ topicPartitionLatestListOffsetsResultInfoMap,
topicPartitionEarliestListOffsetsResultInfoMap);
+
+ Gauge gATotal = (Gauge) result.get("topicA/totalRecordsInPartitions");
+ assertEquals(297L, gATotal.getValue());
+
+ //get again the values from the Gauge. Values cannot change
+ gATotal = (Gauge) result.get("topicA/totalRecordsInPartitions");
+ assertEquals(297L, gATotal.getValue());
+
+ assertNull(result.get("topicB/totalRecordsInPartitions"));
+ }
+
+ @Test
+ public void registerMetricsGetEarliestAndLatest() throws
ExecutionException, InterruptedException {
+
+ TopicPartition topicAPartition1 = new TopicPartition("topicA", 1);
+ TopicPartition topicAPartition2 = new TopicPartition("topicA", 2);
+ TopicPartition topicBPartition1 = new TopicPartition("topicB", 1);
+ TopicPartition topicBPartition2 = new TopicPartition("topicB", 2);
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition2EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition1EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition2EarliestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(1, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionEarliestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicAPartition2,
topicAPartition2EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition1,
topicBPartition1EarliestListOffsetsResultInfo);
+ topicPartitionEarliestListOffsetsResultInfoMap.put(topicBPartition2,
topicBPartition2EarliestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionEarliestListOffsetsResultInfoMap);
+
+ ListOffsetsResult listOffsetsResult = mock(ListOffsetsResult.class);
+ when(listOffsetsResult.all()).thenReturn(kafkaFuture);
+
+ admin = mock(Admin.class);
+ when(admin.listOffsets(anyMap())).thenReturn(listOffsetsResult);
+
+ OffsetManager offsetManagertopicAPartition1 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition1.getLatestEmittedOffset()).thenReturn(50L);
+
when(offsetManagertopicAPartition1.getCommittedOffset()).thenReturn(40L);
+
+ OffsetManager offsetManagertopicAPartition2 =
mock(OffsetManager.class);
+
when(offsetManagertopicAPartition2.getLatestEmittedOffset()).thenReturn(100L);
+
when(offsetManagertopicAPartition2.getCommittedOffset()).thenReturn(90L);
+
+
+ offsetManagers = new HashMap<>();
+ offsetManagers.put(topicAPartition1, offsetManagertopicAPartition1);
+ offsetManagers.put(topicAPartition2, offsetManagertopicAPartition2);
+
+ assignment = new HashSet<>();
+ assignment.add(topicAPartition1);
+ assignment.add(topicAPartition2);
+ assignment.add(topicBPartition1);
+ assignment.add(topicBPartition2);
+
+ KafkaOffsetTopicMetrics kafkaOffsetPartitionAndTopicMetrics = new
KafkaOffsetTopicMetrics("topicA",() ->
Collections.unmodifiableMap(offsetManagers), () -> admin, assignment);
+ Map<String, Metric> result =
kafkaOffsetPartitionAndTopicMetrics.getMetrics();
+
+ Gauge gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset");
+ assertEquals(2L, gATotal.getValue());
+ assertNull(result.get("topicB/totalEarliestTimeOffset"));
+
+ //get the metrics a second time. Values should be the same
+
+ gATotal = (Gauge) result.get("topicA/totalEarliestTimeOffset");
+ assertEquals(2L, gATotal.getValue());
+ assertNull(result.get("topicB/totalEarliestTimeOffset"));
+
+ //get the latest offsets
+
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(100, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicAPartition2LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(200, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition1LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(300, System.currentTimeMillis(),
Optional.empty());
+ ListOffsetsResult.ListOffsetsResultInfo
topicBPartition2LatestListOffsetsResultInfo = new
ListOffsetsResult.ListOffsetsResultInfo(400, System.currentTimeMillis(),
Optional.empty());
+
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
topicPartitionLatestListOffsetsResultInfoMap = new HashMap<>();
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition1,
topicAPartition1LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicAPartition2,
topicAPartition2LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition1,
topicBPartition1LatestListOffsetsResultInfo);
+ topicPartitionLatestListOffsetsResultInfoMap.put(topicBPartition2,
topicBPartition2LatestListOffsetsResultInfo);
+
+
when(kafkaFuture.get()).thenReturn(topicPartitionLatestListOffsetsResultInfoMap);
+
+ gATotal = (Gauge) result.get("topicA/totalLatestTimeOffset");
+ assertEquals(300L, gATotal.getValue());
+ assertNull((result.get("topicB/totalLatestTimeOffset")));
+
+ gATotal = (Gauge) result.get("topicA/totalLatestEmittedOffset");
+ assertEquals(150L, gATotal.getValue());
+ assertNull( result.get("topicB/totalLatestEmittedOffset"));
+
+
+ gATotal = (Gauge) result.get("topicA/totalLatestCompletedOffset");
+ assertEquals(130L, gATotal.getValue());
+ assertNull(result.get("topiBA/totalLatestCompletedOffset"));
+
+ //get the metrics a second time. Values should be the same
+
+ gATotal = (Gauge) result.get("topicA/totalLatestTimeOffset");
+ assertEquals(300L, gATotal.getValue());
+ assertNull((result.get("topicB/totalLatestTimeOffset")));
+
+ gATotal = (Gauge) result.get("topicA/totalLatestEmittedOffset");
+ assertEquals(150L, gATotal.getValue());
+ assertNull( result.get("topicB/totalLatestEmittedOffset"));
+
+
+ gATotal = (Gauge) result.get("topicA/totalLatestCompletedOffset");
+ assertEquals(130L, gATotal.getValue());
+ assertNull(result.get("topiBA/totalLatestCompletedOffset"));
+
+ }
+}