Updated Branches: refs/heads/trunk 960d03d8a -> 46659c715
FLUME-1572. Add batching support to FILE_ROLL sink. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/46659c71 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/46659c71 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/46659c71 Branch: refs/heads/trunk Commit: 46659c7156565778068c012f07301aab426d0728 Parents: 960d03d Author: Mike Percy <[email protected]> Authored: Wed Sep 12 17:33:16 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Wed Sep 12 17:33:16 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/sink/RollingFileSink.java | 47 ++++++++------ .../org/apache/flume/sink/TestRollingFileSink.java | 3 + 2 files changed, 30 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/46659c71/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java index e5e97ff..a94eea1 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java +++ b/flume-ng-core/src/main/java/org/apache/flume/sink/RollingFileSink.java @@ -47,6 +47,9 @@ public class RollingFileSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory .getLogger(RollingFileSink.class); private static final long defaultRollInterval = 30; + private static final int defaultBatchSize = 100; + + private int batchSize = defaultBatchSize; private File directory; private long rollInterval; @@ -90,6 +93,8 @@ public class RollingFileSink extends AbstractSink implements Configurable { this.rollInterval = Long.parseLong(rollInterval); } + batchSize = context.getInteger("sink.batchSize", defaultBatchSize); + this.directory = new File(directory); } @@ -175,30 +180,32 @@ public class RollingFileSink extends AbstractSink implements Configurable { try { transaction.begin(); - event = channel.take(); - - if (event != null) { - serializer.write(event); - - /* - * FIXME: Feature: Rotate on size and time by checking bytes written and - * setting shouldRotate = true if we're past a threshold. - */ - - /* - * FIXME: Feature: Control flush interval based on time or number of - * events. For now, we're super-conservative and flush on each write. - */ - serializer.flush(); - outputStream.flush(); - } else { - // No events found, request back-off semantics from runner - result = Status.BACKOFF; + for (int i = 0; i < batchSize; i++) { + event = channel.take(); + if (event != null) { + serializer.write(event); + + /* + * FIXME: Feature: Rotate on size and time by checking bytes written and + * setting shouldRotate = true if we're past a threshold. + */ + + /* + * FIXME: Feature: Control flush interval based on time or number of + * events. For now, we're super-conservative and flush on each write. + */ + } else { + // No events found, request back-off semantics from runner + result = Status.BACKOFF; + break; + } } + serializer.flush(); + outputStream.flush(); transaction.commit(); } catch (Exception ex) { transaction.rollback(); - throw new EventDeliveryException("Failed to process event: " + event, ex); + throw new EventDeliveryException("Failed to process transaction", ex); } finally { transaction.close(); } http://git-wip-us.apache.org/repos/asf/flume/blob/46659c71/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java index 10c9b82..07fa644 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java +++ b/flume-ng-core/src/test/java/org/apache/flume/sink/TestRollingFileSink.java @@ -84,6 +84,7 @@ public class TestRollingFileSink { context.put("sink.directory", tmpDir.getPath()); context.put("sink.rollInterval", "1"); + context.put("sink.batchSize", "1"); Configurables.configure(sink, context); @@ -131,6 +132,8 @@ public class TestRollingFileSink { context.put("sink.directory", tmpDir.getPath()); context.put("sink.rollInterval", "0"); + context.put("sink.batchSize", "1"); + Configurables.configure(sink, context);
