Repository: storm Updated Branches: refs/heads/1.x-branch 80213bae7 -> f0abfff92
STORM-971: Metric for messages lost due to kafka retention Signed-off-by: P. Taylor Goetz <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e742210 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e742210 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e742210 Branch: refs/heads/1.x-branch Commit: 6e742210a5aef41f385d74c8be5580a507382ef1 Parents: 80213ba Author: Abhishek Agarwal <[email protected]> Authored: Sun Mar 13 00:37:14 2016 +0530 Committer: P. Taylor Goetz <[email protected]> Committed: Tue Mar 15 15:38:58 2016 -0400 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/PartitionManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/6e742210/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index dbf70a0..4db8af6 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -44,6 +44,8 @@ public class PartitionManager { private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; + // Count of messages which could not be emitted or retried because they were deleted from kafka + private final CountMetric _lostMessageCount; Long _emittedToOffset; // _pending key = Kafka offset, value = time at which the message was first submitted to the topology private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); @@ -117,6 +119,7 @@ public class PartitionManager { _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); + _lostMessageCount = new CountMetric(); } public Map getMetricsDataMap() { @@ -125,6 +128,7 @@ public class PartitionManager { ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); + ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset()); return ret; } @@ -185,7 +189,7 @@ public class PartitionManager { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (TopicOffsetOutOfRangeException e) { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); - // fetch failed, so don't update the metrics + // fetch failed, so don't update the fetch metrics //fix bug [STORM-643] : remove outdated failed offsets if (!processingNewTuples) { @@ -194,11 +198,17 @@ public class PartitionManager { // offset, since they are anyway not there. // These calls to broker API will be then saved. Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset); + + // Omitted messages have not been acked and may be lost + if (null != omitted) { + _lostMessageCount.incrBy(omitted.size()); + } LOG.warn("Removing the failed offsets that are out of range: {}", omitted); } if (offset > _emittedToOffset) { + _lostMessageCount.incrBy(offset - _emittedToOffset); _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset); }
