Cherrypicked this to flume-1.3.0 Thanks, Hari
-- Hari Shreedharan On Wednesday, September 12, 2012 at 5:34 PM, [email protected] wrote: > Updated Branches: > refs/heads/trunk 960d03d8a -> 46659c715 > > > FLUME-1572. Add batching support to FILE_ROLL sink. > > (Hari Shreedharan via Mike Percy) > > > Project: http://git-wip-us.apache.org/repos/asf/flume/repo > Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/46659c71 > Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/46659c71 > Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/46659c71 > > Branch: refs/heads/trunk > Commit: 46659c7156565778068c012f07301aab426d0728 > Parents: 960d03d > Author: Mike Percy <[email protected] (mailto:[email protected])> > Authored: Wed Sep 12 17:33:16 2012 -0700 > Committer: Mike Percy <[email protected] (mailto:[email protected])> > Committed: Wed Sep 12 17:33:16 2012 -0700 > > ---------------------------------------------------------------------- > .../org/apache/flume/sink/RollingFileSink.java | 47 ++++++++------ > .../org/apache/flume/sink/TestRollingFileSink.java | 3 + > 2 files changed, 30 insertions(+), 20 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/flume/blob/46659c71/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java > ---------------------------------------------------------------------- > diff --git > a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java > b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java > index e5e97ff..a94eea1 100644 > --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java > +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java > @@ -47,6 +47,9 @@ public class RollingFileSink extends AbstractSink > implements Configurable { > private static final Logger logger = LoggerFactory > .getLogger(RollingFileSink.class); > private static final long defaultRollInterval = 30; > + private static final int defaultBatchSize = 100; > + > + private int batchSize = defaultBatchSize; > > private File directory; > private long rollInterval; > @@ -90,6 +93,8 @@ public class RollingFileSink extends AbstractSink > implements Configurable { > this.rollInterval = Long.parseLong(rollInterval); > } > > + batchSize = context.getInteger("sink.batchSize", defaultBatchSize); > + > this.directory = new File(directory); > } > > @@ -175,30 +180,32 @@ public class RollingFileSink extends AbstractSink > implements Configurable { > > try { > transaction.begin(); > - event = channel.take(); > - > - if (event != null) { > - serializer.write(event); > - > - /* > - * FIXME: Feature: Rotate on size and time by checking bytes written and > - * setting shouldRotate = true if we're past a threshold. > - */ > - > - /* > - * FIXME: Feature: Control flush interval based on time or number of > - * events. For now, we're super-conservative and flush on each write. > - */ > - serializer.flush(); > - outputStream.flush(); > - } else { > - // No events found, request back-off semantics from runner > - result = Status.BACKOFF; > + for (int i = 0; i < batchSize; i++) { > + event = channel.take(); > + if (event != null) { > + serializer.write(event); > + > + /* > + * FIXME: Feature: Rotate on size and time by checking bytes written and > + * setting shouldRotate = true if we're past a threshold. > + */ > + > + /* > + * FIXME: Feature: Control flush interval based on time or number of > + * events. For now, we're super-conservative and flush on each write. > + */ > + } else { > + // No events found, request back-off semantics from runner > + result = Status.BACKOFF; > + break; > + } > } > + serializer.flush(); > + outputStream.flush(); > transaction.commit(); > } catch (Exception ex) { > transaction.rollback(); > - throw new EventDeliveryException("Failed to process event: " + event, ex); > + throw new EventDeliveryException("Failed to process transaction", ex); > } finally { > transaction.close(); > } > > http://git-wip-us.apache.org/repos/asf/flume/blob/46659c71/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java > ---------------------------------------------------------------------- > diff --git > a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java > b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java > index 10c9b82..07fa644 100644 > --- > a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java > +++ > b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java > @@ -84,6 +84,7 @@ public class TestRollingFileSink { > > context.put("sink.directory", tmpDir.getPath()); > context.put("sink.rollInterval", "1"); > + context.put("sink.batchSize", "1"); > > Configurables.configure(sink, context); > > @@ -131,6 +132,8 @@ public class TestRollingFileSink { > > context.put("sink.directory", tmpDir.getPath()); > context.put("sink.rollInterval", "0"); > + context.put("sink.batchSize", "1"); > + > > Configurables.configure(sink, context); > > >
