Update time interval counting on TridentKafkaEmitter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d89f7027 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d89f7027 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d89f7027 Branch: refs/heads/master Commit: d89f7027fcaf5576b5b4a14488f42c71094617ad Parents: 27373ba Author: darionyaphet <[email protected]> Authored: Wed Feb 24 12:34:54 2016 +0800 Committer: darionyaphet <[email protected]> Committed: Wed Feb 24 12:34:54 2016 +0800 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d89f7027/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java index 9732c8c..512363c 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java @@ -136,11 +136,10 @@ public class TridentKafkaEmitter { } private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) { - long start = System.nanoTime(); + long start = System.currentTimeMillis(); ByteBufferMessageSet msgs = null; msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset); - long end = System.nanoTime(); - long millis = (end - start) / 1000000; + long millis = System.currentTimeMillis() - start; _kafkaMeanFetchLatencyMetric.update(millis); _kafkaMaxFetchLatencyMetric.update(millis); return msgs;
