Repository: storm
Updated Branches:
  refs/heads/master 20084ba34 -> 2aaa71809


STORM-713: Include topic information with Kafka metrics.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e3bf8c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e3bf8c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e3bf8c0

Branch: refs/heads/master
Commit: 1e3bf8c09cb1be6b60ca57fce2b6d859a57e173a
Parents: bb8d48d
Author: Craig Hawco <[email protected]>
Authored: Thu Mar 19 11:35:59 2015 -0400
Committer: Craig Hawco <[email protected]>
Committed: Thu Mar 19 14:02:26 2015 -0400

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1e3bf8c0/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java 
b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..50241f7 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -121,19 +121,19 @@ public class KafkaUtils {
                         }
                         long latestEmittedOffset = e.getValue();
                         long spoutLag = latestTimeOffset - latestEmittedOffset;
-                        ret.put(partition.getId() + "/" + "spoutLag", 
spoutLag);
-                        ret.put(partition.getId() + "/" + 
"earliestTimeOffset", earliestTimeOffset);
-                        ret.put(partition.getId() + "/" + "latestTimeOffset", 
latestTimeOffset);
-                        ret.put(partition.getId() + "/" + 
"latestEmittedOffset", latestEmittedOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + 
"spoutLag", spoutLag);
+                        ret.put(_topic + "/" + partition.getId() + "/" + 
"earliestTimeOffset", earliestTimeOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + 
"latestTimeOffset", latestTimeOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + 
"latestEmittedOffset", latestEmittedOffset);
                         totalSpoutLag += spoutLag;
                         totalEarliestTimeOffset += earliestTimeOffset;
                         totalLatestTimeOffset += latestTimeOffset;
                         totalLatestEmittedOffset += latestEmittedOffset;
                     }
-                    ret.put("totalSpoutLag", totalSpoutLag);
-                    ret.put("totalEarliestTimeOffset", 
totalEarliestTimeOffset);
-                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
-                    ret.put("totalLatestEmittedOffset", 
totalLatestEmittedOffset);
+                    ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag);
+                    ret.put(_topic + "/" + "totalEarliestTimeOffset", 
totalEarliestTimeOffset);
+                    ret.put(_topic + "/" + "totalLatestTimeOffset", 
totalLatestTimeOffset);
+                    ret.put(_topic + "/" + "totalLatestEmittedOffset", 
totalLatestEmittedOffset);
                     return ret;
                 } else {
                     LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");

Reply via email to