Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2480#discussion_r159063226
--- Diff:
external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
---
@@ -343,4 +344,48 @@ public void
testSpoutMustRefreshPartitionsEvenIfNotPolling() throws Exception {
spout.nextTuple();
verify(collectorMock).emit(anyString(), anyList(),
any(KafkaSpoutMessageId.class));
}
+
+ @Test
+ public void testOffsetMetrics() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>)
spout.getKafkaOffsetMetric().getValueAndReset();
+ assertEquals(0,
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+ // the offset of the last available message + 1.
+ assertEquals(10,
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+ assertEquals(0,
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+ assertEquals(0,
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+ //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+ assertEquals(10,
offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+ //Emit and Ack all messages
+ for(int i = 0; i < messageCount; i++) {
--- End diff --
I think you can use nextTuple_verifyEmitted_ack_resetCollector(int offset)
from the superclass to do everything in this block.
---