Updated Branches: refs/heads/flume-1.5 ad612c28b -> d2d7d7f96
FLUME-2255. Correctly handle ChannelExceptions in SpoolingDirectorySource (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d2d7d7f9 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d2d7d7f9 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d2d7d7f9 Branch: refs/heads/flume-1.5 Commit: d2d7d7f968c2eb78eda46822fd503d8772d69d7a Parents: ad612c2 Author: Mike Percy <[email protected]> Authored: Thu Dec 5 12:58:03 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Thu Dec 5 13:00:15 2013 -0800 ---------------------------------------------------------------------- .../flume/source/SpoolDirectorySource.java | 47 +++++++++++++++- ...olDirectorySourceConfigurationConstants.java | 4 ++ .../flume/source/TestSpoolDirectorySource.java | 58 ++++++++++++++++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + 4 files changed, 108 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d2d7d7f9/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index 72c4059..0160215 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -67,6 +67,9 @@ Configurable, EventDrivenSource { private SourceCounter sourceCounter; ReliableSpoolingFileEventReader reader; private ScheduledExecutorService executor; + private boolean backoff = true; + private boolean hitChannelException = false; + private int maxBackoff; @Override public synchronized void start() { @@ -161,6 +164,8 @@ Configurable, EventDrivenSource { deserializerContext.put(LineDeserializer.MAXLINE_KEY, bufferMaxLineLength.toString()); } + + maxBackoff = context.getInteger(MAX_BACKOFF, DEFAULT_MAX_BACKOFF); if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); } @@ -171,6 +176,28 @@ Configurable, EventDrivenSource { return hasFatalError; } + + + /** + * The class always backs off, this exists only so that we can test without + * taking a really long time. + * @param backoff - whether the source should backoff if the channel is full + */ + @VisibleForTesting + protected void setBackOff(boolean backoff) { + this.backoff = backoff; + } + + @VisibleForTesting + protected boolean hitChannelException() { + return hitChannelException; + } + + @VisibleForTesting + protected SourceCounter getSourceCounter() { + return sourceCounter; + } + private class SpoolDirectoryRunnable implements Runnable { private ReliableSpoolingFileEventReader reader; private SourceCounter sourceCounter; @@ -183,6 +210,7 @@ Configurable, EventDrivenSource { @Override public void run() { + int backoffInterval = 250; try { while (true) { List<Event> events = reader.readEvents(batchSize); @@ -192,8 +220,23 @@ Configurable, EventDrivenSource { sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); - getChannelProcessor().processEventBatch(events); - reader.commit(); + try { + getChannelProcessor().processEventBatch(events); + reader.commit(); + } catch (ChannelException ex) { + logger.warn("The channel is full, and cannot write data now. The " + + "source will try again after " + String.valueOf(backoffInterval) + + " milliseconds"); + hitChannelException = true; + if (backoff) { + TimeUnit.MILLISECONDS.sleep(backoffInterval); + backoffInterval = backoffInterval << 1; + backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : + backoffInterval; + } + continue; + } + backoffInterval = 250; sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); } http://git-wip-us.apache.org/repos/asf/flume/blob/d2d7d7f9/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java index 7bfb0ee..a2befe8 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java @@ -74,4 +74,8 @@ public class SpoolDirectorySourceConfigurationConstants { public static final String DECODE_ERROR_POLICY = "decodeErrorPolicy"; public static final String DEFAULT_DECODE_ERROR_POLICY = DecodeErrorPolicy.FAIL.name(); + + public static final String MAX_BACKOFF = "maxBackoff"; + + public static final Integer DEFAULT_MAX_BACKOFF = 4000; } http://git-wip-us.apache.org/repos/asf/flume/blob/d2d7d7f9/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 837cf15..9a546a5 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; @@ -33,6 +34,7 @@ import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.channel.ReplicatingChannelSelector; import org.apache.flume.conf.Configurables; +import org.apache.flume.instrumentation.SourceCounter; import org.apache.flume.lifecycle.LifecycleController; import org.apache.flume.lifecycle.LifecycleState; import org.junit.After; @@ -163,4 +165,60 @@ public class TestSpoolDirectorySource { Assert.assertFalse("Fatal error on iteration " + i, source.hasFatalError()); } } + + @Test + public void testSourceDoesNotDieOnFullChannel() throws Exception { + + Context chContext = new Context(); + chContext.put("capacity", "2"); + chContext.put("transactionCapacity", "2"); + chContext.put("keep-alive", "0"); + channel.stop(); + Configurables.configure(channel, chContext); + + channel.start(); + Context context = new Context(); + File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); + + Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); + + + context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, + tmpDir.getAbsolutePath()); + + context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2"); + Configurables.configure(source, context); + source.setBackOff(false); + source.start(); + + // Wait for the source to read enough events to fill up the channel. + while(!source.hitChannelException()) { + Thread.sleep(50); + } + + List<String> dataOut = Lists.newArrayList(); + + for (int i = 0; i < 8; ) { + Transaction tx = channel.getTransaction(); + tx.begin(); + Event e = channel.take(); + if (e != null) { + dataOut.add(new String(e.getBody(), "UTF-8")); + i++; + } + e = channel.take(); + if (e != null) { + dataOut.add(new String(e.getBody(), "UTF-8")); + i++; + } + tx.commit(); + tx.close(); + } + Assert.assertTrue("Expected to hit ChannelException, but did not!", + source.hitChannelException()); + Assert.assertEquals(8, dataOut.size()); + source.stop(); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/d2d7d7f9/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 0f12427..8687cb7 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -951,6 +951,7 @@ fileHeaderKey file Header key to use when appending filename ignorePattern ^$ Regular expression specifying which files to ignore (skip) trackerDir .flumespool Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir. +maxBackoff 4000 The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter. batchSize 100 Granularity at which to batch transfer to the channel inputCharset UTF-8 Character set used by deserializers that treat the input file as text. decodeErrorPolicy ``FAIL`` What to do when we see a non-decodable character in the input file.
