Updated Branches: refs/heads/master 79a4376d0 -> ef95569f8
SAMZA-28; fixing bug in blocking envelope map metrics. Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ef95569f Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ef95569f Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ef95569f Branch: refs/heads/master Commit: ef95569f8cec240cb0214012e5f6b15b8f4d4f6f Parents: 79a4376 Author: Chris Riccomini <[email protected]> Authored: Mon Oct 7 12:49:12 2013 -0700 Committer: Chris Riccomini <[email protected]> Committed: Mon Oct 7 12:49:12 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/samza/metrics/Gauge.java | 4 ++ .../apache/samza/util/BlockingEnvelopeMap.java | 60 +++++++++++--------- 2 files changed, 37 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java index 14db2d3..3335c15 100644 --- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java +++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java @@ -30,6 +30,10 @@ public class Gauge<T> implements Metric { this.ref = new AtomicReference<T>(value); } + public boolean compareAndSet(T expected, T n) { + return ref.compareAndSet(expected, n); + } + public T set(T n) { return ref.getAndSet(n); } http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java index c1dd279..d4db126 100644 --- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java +++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java @@ -22,6 +22,7 @@ package org.apache.samza.util; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; @@ -75,7 +76,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) { - this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry); + this.metrics = new BlockingEnvelopeMapMetrics(this.getClass().getName(), metricsRegistry); this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>(); this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>(); this.clock = clock; @@ -116,8 +117,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { } } - metrics.decBufferedMessageCount(systemStreamPartition, systemStreamPartitionMessages.size()); - // Now block if blocking is allowed and we have no messages. if (systemStreamPartitionMessages.size() == 0) { // How long we can legally block (if timeout > 0) @@ -130,7 +129,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { if (envelope != null) { systemStreamPartitionMessages.add(envelope); - metrics.decBufferedMessageCount(systemStreamPartition, 1); } } } else if (timeout > 0 && timeRemaining > 0) { @@ -139,7 +137,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { if (envelope != null) { systemStreamPartitionMessages.add(envelope); - metrics.decBufferedMessageCount(systemStreamPartition, 1); } } } @@ -152,7 +149,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) { bufferedMessages.get(systemStreamPartition).add(envelope); - metrics.incBufferedMessageCount(systemStreamPartition, 1); } protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) { @@ -161,8 +157,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { for (IncomingMessageEnvelope envelope : envelopes) { queue.add(envelope); } - - metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size()); } public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) { @@ -186,41 +180,32 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true); } - public static final class BlockingEnvelopeMapMetrics { - private static final String GROUP = "samza.consumers"; - + public class BlockingEnvelopeMapMetrics { + private final String group; private final MetricsRegistry metricsRegistry; - private final ConcurrentHashMap<SystemStreamPartition, Counter> bufferedMessageCountMap; private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> noMoreMessageGaugeMap; private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap; private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap; private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap; private final Counter pollCount; - public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) { + public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) { + this.group = group; this.metricsRegistry = metricsRegistry; - this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>(); this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>(); this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>(); this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>(); this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>(); - this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount"); + this.pollCount = metricsRegistry.newCounter(group, "poll-count"); } public void initMetrics(SystemStreamPartition systemStreamPartition) { - this.bufferedMessageCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BufferedMessageCount-" + systemStreamPartition)); - this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(GROUP, "NoMoreMessages-" + systemStreamPartition, false)); - this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "PollCount-" + systemStreamPartition)); - this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollCount-" + systemStreamPartition)); - this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollTimeoutCount-" + systemStreamPartition)); - } + this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(group, "no-more-messages-" + systemStreamPartition, false)); + this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "poll-count-" + systemStreamPartition)); + this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-count-" + systemStreamPartition)); + this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition)); - public void incBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) { - this.bufferedMessageCountMap.get(systemStreamPartition).inc(count); - } - - public void decBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) { - this.bufferedMessageCountMap.get(systemStreamPartition).dec(count); + metricsRegistry.<Integer> newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition)); } public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) { @@ -243,4 +228,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer { this.pollCount.inc(); } } + + public class BufferGauge extends Gauge<Integer> { + private final SystemStreamPartition systemStreamPartition; + + public BufferGauge(SystemStreamPartition systemStreamPartition, String name) { + super(name, 0); + + this.systemStreamPartition = systemStreamPartition; + } + + @Override + public Integer getValue() { + Queue<IncomingMessageEnvelope> envelopes = bufferedMessages.get(systemStreamPartition); + + if (envelopes == null) { + return 0; + } + + return envelopes.size(); + } + } }
