Updated Branches: refs/heads/flume-1.4 3e137a641 -> f098440a8
FLUME-1855. Sequence gen source should be able to stop after a fixed number of events. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f098440a Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f098440a Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f098440a Branch: refs/heads/flume-1.4 Commit: f098440a8d2ccc3b56767e0192ba422ec89bab4c Parents: 3e137a6 Author: Mike Percy <[email protected]> Authored: Thu Jan 17 00:50:15 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Thu Jan 17 00:51:38 2013 -0800 ---------------------------------------------------------------------- .../flume/source/SequenceGeneratorSource.java | 29 ++++++++++++--- 1 files changed, 24 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/f098440a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java index 1fbcf42..3cb1ccf 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SequenceGeneratorSource.java @@ -42,6 +42,8 @@ public class SequenceGeneratorSource extends AbstractSource implements private int batchSize; private CounterGroup counterGroup; private List<Event> batchArrayList; + private long totalEvents; + private long eventsSent = 0; public SequenceGeneratorSource() { sequence = 0; @@ -58,28 +60,45 @@ public class SequenceGeneratorSource extends AbstractSource implements if (batchSize > 1) { batchArrayList = new ArrayList<Event>(batchSize); } + totalEvents = context.getLong("totalEvents", Long.MAX_VALUE); } @Override public Status process() throws EventDeliveryException { + Status status = Status.READY; + int i = 0; try { if (batchSize <= 1) { - getChannelProcessor().processEvent( + if(eventsSent < totalEvents) { + getChannelProcessor().processEvent( EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + eventsSent++; + } else { + status = Status.BACKOFF; + } } else { batchArrayList.clear(); - for (int i = 0; i < batchSize; i++) { - batchArrayList.add(i, EventBuilder.withBody(String.valueOf(sequence++).getBytes())); + for (i = 0; i < batchSize; i++) { + if(eventsSent < totalEvents){ + batchArrayList.add(i, EventBuilder.withBody(String + .valueOf(sequence++).getBytes())); + eventsSent++; + } else { + status = Status.BACKOFF; + } + } + if(!batchArrayList.isEmpty()) { + getChannelProcessor().processEventBatch(batchArrayList); } - getChannelProcessor().processEventBatch(batchArrayList); } counterGroup.incrementAndGet("events.successful"); } catch (ChannelException ex) { counterGroup.incrementAndGet("events.failed"); + eventsSent -= i; } - return Status.READY; + return status; } @Override
