Repository: storm Updated Branches: refs/heads/master 20084ba34 -> 2aaa71809
STORM-713: Include topic information with Kafka metrics. Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e3bf8c0 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e3bf8c0 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e3bf8c0 Branch: refs/heads/master Commit: 1e3bf8c09cb1be6b60ca57fce2b6d859a57e173a Parents: bb8d48d Author: Craig Hawco <[email protected]> Authored: Thu Mar 19 11:35:59 2015 -0400 Committer: Craig Hawco <[email protected]> Committed: Thu Mar 19 14:02:26 2015 -0400 ---------------------------------------------------------------------- .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/1e3bf8c0/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java index 137dc99..50241f7 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java @@ -121,19 +121,19 @@ public class KafkaUtils { } long latestEmittedOffset = e.getValue(); long spoutLag = latestTimeOffset - latestEmittedOffset; - ret.put(partition.getId() + "/" + "spoutLag", spoutLag); - ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset); - ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset); - ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset); + ret.put(_topic + "/" + partition.getId() + "/" + "spoutLag", spoutLag); + ret.put(_topic + "/" + partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset); + ret.put(_topic + "/" + partition.getId() + "/" + "latestTimeOffset", latestTimeOffset); + ret.put(_topic + "/" + partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset); totalSpoutLag += spoutLag; totalEarliestTimeOffset += earliestTimeOffset; totalLatestTimeOffset += latestTimeOffset; totalLatestEmittedOffset += latestEmittedOffset; } - ret.put("totalSpoutLag", totalSpoutLag); - ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset); - ret.put("totalLatestTimeOffset", totalLatestTimeOffset); - ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset); + ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag); + ret.put(_topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffset); + ret.put(_topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffset); + ret.put(_topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffset); return ret; } else { LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
