Updated Branches: refs/heads/flume-1.4 c954dfc76 -> e635e19b4
FLUME-2082. JMX support for Seq Generator Source. (Sravya Tirukkovalur via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e635e19b Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e635e19b Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e635e19b Branch: refs/heads/flume-1.4 Commit: e635e19b4ead2a7a756346912c307f78b76dd9fc Parents: c954dfc Author: Mike Percy <[email protected]> Authored: Fri Jun 21 12:21:27 2013 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Jun 21 12:21:56 2013 -0700 ---------------------------------------------------------------------- .../flume/source/SequenceGeneratorSource.java | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e635e19b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 0f85e87..51e021a 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -23,12 +23,12 @@ import java.util.ArrayList; import java.util.List; import org.apache.flume.ChannelException; import org.apache.flume.Context; -import org.apache.flume.CounterGroup; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.EventBuilder; +import org.apache.flume.instrumentation.SourceCounter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,14 +40,13 @@ public class SequenceGeneratorSource extends AbstractSource implements private long sequence; private int batchSize; - private CounterGroup counterGroup; + private SourceCounter sourceCounter; private List<Event> batchArrayList; private long totalEvents; private long eventsSent = 0; public SequenceGeneratorSource() { sequence = 0; - counterGroup = new CounterGroup(); } /** @@ -61,6 +60,9 @@ public class SequenceGeneratorSource extends AbstractSource implements batchArrayList = new ArrayList<Event>(batchSize); } totalEvents = context.getLong("totalEvents", Long.MAX_VALUE); + if (sourceCounter == null) { + sourceCounter = new SourceCounter(getName()); + } } @Override @@ -73,6 +75,7 @@ public class SequenceGeneratorSource extends AbstractSource implements if(eventsSent < totalEvents) { getChannelProcessor().processEvent( EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + sourceCounter.incrementEventAcceptedCount(); eventsSent++; } else { status = Status.BACKOFF; @@ -90,11 +93,12 @@ public class SequenceGeneratorSource extends AbstractSource implements } if(!batchArrayList.isEmpty()) { getChannelProcessor().processEventBatch(batchArrayList); + sourceCounter.incrementAppendBatchAcceptedCount(); + sourceCounter.addToEventAcceptedCount(batchArrayList.size()); } } - counterGroup.incrementAndGet("events.successful"); + } catch (ChannelException ex) { - counterGroup.incrementAndGet("events.failed"); eventsSent -= i; logger.error( getName() + " source could not write to channel.", ex); } @@ -107,7 +111,7 @@ public class SequenceGeneratorSource extends AbstractSource implements logger.info("Sequence generator source starting"); super.start(); - + sourceCounter.start(); logger.debug("Sequence generator source started"); } @@ -116,8 +120,9 @@ public class SequenceGeneratorSource extends AbstractSource implements logger.info("Sequence generator source stopping"); super.stop(); + sourceCounter.stop(); - logger.info("Sequence generator source stopped. Metrics:{}", counterGroup); + logger.info("Sequence generator source stopped. Metrics:{}",getName(), sourceCounter); } }
