Update time interval counting on TridentKafkaEmitter

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d89f7027
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d89f7027
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d89f7027

Branch: refs/heads/master
Commit: d89f7027fcaf5576b5b4a14488f42c71094617ad
Parents: 27373ba
Author: darionyaphet <[email protected]>
Authored: Wed Feb 24 12:34:54 2016 +0800
Committer: darionyaphet <[email protected]>
Committed: Wed Feb 24 12:34:54 2016 +0800

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d89f7027/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 9732c8c..512363c 100644
--- 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -136,11 +136,10 @@ public class TridentKafkaEmitter {
     }
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, 
Partition partition, long offset) {
-        long start = System.nanoTime();
+        long start = System.currentTimeMillis();
         ByteBufferMessageSet msgs = null;
         msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        long end = System.nanoTime();
-        long millis = (end - start) / 1000000;
+        long millis = System.currentTimeMillis() - start;
         _kafkaMeanFetchLatencyMetric.update(millis);
         _kafkaMaxFetchLatencyMetric.update(millis);
         return msgs;

Reply via email to