Repository: flume Updated Branches: refs/heads/trunk 4b74aa286 -> 1422f7330
FLUME-2619. Spooldir source should log channel exceptions Reviewed by Denes Arvay and Mike Percy. (Bessenyei Balázs Donát via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/1422f733 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/1422f733 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/1422f733 Branch: refs/heads/trunk Commit: 1422f733007dbb78caae7e5135bc33470e88502a Parents: 4b74aa2 Author: Bessenyei Balázs Donát <[email protected]> Authored: Mon Aug 8 18:09:44 2016 -0700 Committer: Mike Percy <[email protected]> Committed: Mon Aug 8 18:11:58 2016 -0700 ---------------------------------------------------------------------- .../apache/flume/channel/ChannelProcessor.java | 24 +++++----- .../flume/source/SpoolDirectorySource.java | 45 +++++++++++++------ .../flume/source/TestSpoolDirectorySource.java | 46 +++++++++++--------- 3 files changed, 71 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java index 1cce137..6987860 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java +++ b/flume-ng-core/src/main/java/org/apache/flume/channel/ChannelProcessor.java @@ -20,6 +20,7 @@ package org.apache.flume.channel; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; + import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; @@ -43,7 +44,7 @@ import org.slf4j.LoggerFactory; * A channel processor exposes operations to put {@link Event}s into * {@link Channel}s. These operations will propagate a {@link ChannelException} * if any errors occur while attempting to write to {@code required} channels. - * + * <p> * Each channel processor instance is configured with a {@link ChannelSelector} * instance that specifies which channels are * {@linkplain ChannelSelector#getRequiredChannels(Event) required} and which @@ -73,6 +74,7 @@ public class ChannelProcessor implements Configurable { /** * The Context of the associated Source is passed. + * * @param context */ @Override @@ -103,7 +105,7 @@ public class ChannelProcessor implements Configurable { if (type == null) { LOG.error("Type not specified for interceptor " + interceptorName); throw new FlumeException("Interceptor.Type not specified for " + - interceptorName); + interceptorName); } try { Interceptor.Builder builder = factory.newInstance(type); @@ -132,7 +134,7 @@ public class ChannelProcessor implements Configurable { * Attempts to {@linkplain Channel#put(Event) put} the given events into each * configured channel. If any {@code required} channel throws a * {@link ChannelException}, that exception will be propagated. - * + * <p> * <p>Note that if multiple channels are configured, some {@link Transaction}s * may have already been committed while others may be rolled back in the * case of an exception. @@ -165,7 +167,7 @@ public class ChannelProcessor implements Configurable { List<Channel> optChannels = selector.getOptionalChannels(event); - for (Channel ch: optChannels) { + for (Channel ch : optChannels) { List<Event> eventQueue = optChannelQueue.get(ch); if (eventQueue == null) { eventQueue = new ArrayList<Event>(); @@ -193,9 +195,10 @@ public class ChannelProcessor implements Configurable { } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { - LOG.error("Error while writing to required channel: " + - reqChannel, t); + LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; + } else if (t instanceof ChannelException) { + throw (ChannelException) t; } else { throw new ChannelException("Unable to put batch on required " + "channel: " + reqChannel, t); @@ -216,7 +219,7 @@ public class ChannelProcessor implements Configurable { List<Event> batch = optChannelQueue.get(optChannel); - for (Event event : batch ) { + for (Event event : batch) { optChannel.put(event); } @@ -239,7 +242,7 @@ public class ChannelProcessor implements Configurable { * Attempts to {@linkplain Channel#put(Event) put} the given event into each * configured channel. If any {@code required} channel throws a * {@link ChannelException}, that exception will be propagated. - * + * <p> * <p>Note that if multiple channels are configured, some {@link Transaction}s * may have already been committed while others may be rolled back in the * case of an exception. @@ -268,9 +271,10 @@ public class ChannelProcessor implements Configurable { } catch (Throwable t) { tx.rollback(); if (t instanceof Error) { - LOG.error("Error while writing to required channel: " + - reqChannel, t); + LOG.error("Error while writing to required channel: " + reqChannel, t); throw (Error) t; + } else if (t instanceof ChannelException) { + throw (ChannelException) t; } else { throw new ChannelException("Unable to put event on required " + "channel: " + reqChannel, t); http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java index d88cc1d..c8c7cda 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java +++ b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import org.apache.flume.ChannelException; +import org.apache.flume.ChannelFullException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDrivenSource; @@ -44,7 +45,7 @@ import java.util.concurrent.TimeUnit; import static org.apache.flume.source.SpoolDirectorySourceConfigurationConstants.*; public class SpoolDirectorySource extends AbstractSource - implements Configurable, EventDrivenSource { + implements Configurable, EventDrivenSource { private static final Logger logger = LoggerFactory.getLogger(SpoolDirectorySource.class); @@ -70,6 +71,7 @@ public class SpoolDirectorySource extends AbstractSource private ScheduledExecutorService executor; private boolean backoff = true; private boolean hitChannelException = false; + private boolean hitChannelFullException = false; private int maxBackoff; private ConsumeOrder consumeOrder; private int pollDelay; @@ -158,7 +160,7 @@ public class SpoolDirectorySource extends AbstractSource inputCharset = context.getString(INPUT_CHARSET, DEFAULT_INPUT_CHARSET); decodeErrorPolicy = DecodeErrorPolicy.valueOf( context.getString(DECODE_ERROR_POLICY, DEFAULT_DECODE_ERROR_POLICY) - .toUpperCase(Locale.ENGLISH)); + .toUpperCase(Locale.ENGLISH)); ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT); trackerDirPath = context.getString(TRACKER_DIR, DEFAULT_TRACKER_DIR); @@ -196,10 +198,10 @@ public class SpoolDirectorySource extends AbstractSource } - /** * The class always backs off, this exists only so that we can test without * taking a really long time. + * * @param backoff - whether the source should backoff if the channel is full */ @VisibleForTesting @@ -208,11 +210,16 @@ public class SpoolDirectorySource extends AbstractSource } @VisibleForTesting - protected boolean hitChannelException() { + protected boolean didHitChannelException() { return hitChannelException; } @VisibleForTesting + protected boolean didHitChannelFullException() { + return hitChannelFullException; + } + + @VisibleForTesting protected SourceCounter getSourceCounter() { return sourceCounter; } @@ -227,7 +234,7 @@ public class SpoolDirectorySource extends AbstractSource private SourceCounter sourceCounter; public SpoolDirectoryRunnable(ReliableSpoolingFileEventReader reader, - SourceCounter sourceCounter) { + SourceCounter sourceCounter) { this.reader = reader; this.sourceCounter = sourceCounter; } @@ -247,17 +254,19 @@ public class SpoolDirectorySource extends AbstractSource try { getChannelProcessor().processEventBatch(events); reader.commit(); - } catch (ChannelException ex) { + } catch (ChannelFullException ex) { logger.warn("The channel is full, and cannot write data now. The " + - "source will try again after " + String.valueOf(backoffInterval) + + "source will try again after " + backoffInterval + + " milliseconds"); + hitChannelFullException = true; + backoffInterval = waitAndGetNewBackoffInterval(backoffInterval); + continue; + } catch (ChannelException ex) { + logger.warn("The channel threw an exception, and cannot write data now. The " + + "source will try again after " + backoffInterval + " milliseconds"); hitChannelException = true; - if (backoff) { - TimeUnit.MILLISECONDS.sleep(backoffInterval); - backoffInterval = backoffInterval << 1; - backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : - backoffInterval; - } + backoffInterval = waitAndGetNewBackoffInterval(backoffInterval); continue; } backoffInterval = 250; @@ -272,5 +281,15 @@ public class SpoolDirectorySource extends AbstractSource Throwables.propagate(t); } } + + private int waitAndGetNewBackoffInterval(int backoffInterval) throws InterruptedException { + if (backoff) { + TimeUnit.MILLISECONDS.sleep(backoffInterval); + backoffInterval = backoffInterval << 1; + backoffInterval = backoffInterval >= maxBackoff ? maxBackoff : + backoffInterval; + } + return backoffInterval; + } } } http://git-wip-us.apache.org/repos/asf/flume/blob/1422f733/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java index 82c5351..0182d21 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java +++ b/flume-ng-core/src/test/java/org/apache/flume/source/TestSpoolDirectorySource.java @@ -74,6 +74,7 @@ public class TestSpoolDirectorySource { /** * Helper method to recursively clean up testing directory + * * @param directory the directory to clean up */ private void deleteFiles(File directory) { @@ -87,7 +88,7 @@ public class TestSpoolDirectorySource { } } - @Test (expected = IllegalArgumentException.class) + @Test(expected = IllegalArgumentException.class) public void testInvalidSortOrder() { Context context = new Context(); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, @@ -102,7 +103,7 @@ public class TestSpoolDirectorySource { Context context = new Context(); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); - context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, + context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "oLdESt"); Configurables.configure(source, context); context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, @@ -110,17 +111,17 @@ public class TestSpoolDirectorySource { Configurables.configure(source, context); context.put(SpoolDirectorySourceConfigurationConstants.CONSUME_ORDER, "rAnDom"); - Configurables.configure(source, context); + Configurables.configure(source, context); } - + @Test public void testPutFilenameHeader() throws IOException, InterruptedException { Context context = new Context(); File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", - f1, Charsets.UTF_8); + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, tmpDir.getAbsolutePath()); @@ -155,7 +156,7 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, @@ -192,18 +193,18 @@ public class TestSpoolDirectorySource { Assert.assertTrue("source directories must be created", directoriesCreated); final String FILE_NAME = "recursion_file.txt"; - File f1 = new File(subDir, FILE_NAME); + File f1 = new File(subDir, FILE_NAME); String origBody = "file1line1\nfile1line2\nfile1line3\nfile1line4\n" + "file1line5\nfile1line6\nfile1line7\nfile1line8\n"; Files.write(origBody, f1, Charsets.UTF_8); Context context = new Context(); context.put(SpoolDirectorySourceConfigurationConstants.RECURSIVE_DIRECTORY_SEARCH, - "true"); // enable recursion, so we should find the file we created above + "true"); // enable recursion, so we should find the file we created above context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, - tmpDir.getAbsolutePath()); // spool set to root dir + tmpDir.getAbsolutePath()); // spool set to root dir context.put(SpoolDirectorySourceConfigurationConstants.FILENAME_HEADER, - "true"); // put the file name in the "file" header + "true"); // put the file name in the "file" header Configurables.configure(source, context); source.start(); @@ -224,7 +225,7 @@ public class TestSpoolDirectorySource { Assert.assertNotNull("Event headers must not be null", e.getHeaders()); Assert.assertTrue("File header value did not end with expected filename", - e.getHeaders().get("file").endsWith(FILE_NAME)); + e.getHeaders().get("file").endsWith(FILE_NAME)); ByteArrayOutputStream baos = new ByteArrayOutputStream(); do { // collecting the whole body @@ -256,7 +257,6 @@ public class TestSpoolDirectorySource { Assert.assertTrue("source directories must be created", directoriesCreated); - File f1 = new File(subDir.getAbsolutePath() + "/file1.txt"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + @@ -364,12 +364,12 @@ public class TestSpoolDirectorySource { File f1 = new File(tmpDir.getAbsolutePath() + "/file1"); Files.write("file1line1\nfile1line2\nfile1line3\nfile1line4\n" + - "file1line5\nfile1line6\nfile1line7\nfile1line8\n", + "file1line5\nfile1line6\nfile1line7\nfile1line8\n", f1, Charsets.UTF_8); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, - tmpDir.getAbsolutePath()); + tmpDir.getAbsolutePath()); context.put(SpoolDirectorySourceConfigurationConstants.BATCH_SIZE, "2"); Configurables.configure(source, context); @@ -377,10 +377,16 @@ public class TestSpoolDirectorySource { source.start(); // Wait for the source to read enough events to fill up the channel. - while (!source.hitChannelException()) { - Thread.sleep(50); + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 5000 && !source.didHitChannelFullException()) { + Thread.sleep(10); } + Assert.assertTrue("Expected to hit ChannelFullException, but did not!", + source.didHitChannelFullException()); + + List<String> dataOut = Lists.newArrayList(); for (int i = 0; i < 8; ) { @@ -399,8 +405,6 @@ public class TestSpoolDirectorySource { tx.commit(); tx.close(); } - Assert.assertTrue("Expected to hit ChannelException, but did not!", - source.hitChannelException()); Assert.assertEquals(8, dataOut.size()); source.stop(); } @@ -422,7 +426,7 @@ public class TestSpoolDirectorySource { Files.touch(f4); context.put(SpoolDirectorySourceConfigurationConstants.SPOOL_DIRECTORY, - tmpDir.getAbsolutePath()); + tmpDir.getAbsolutePath()); Configurables.configure(source, context); source.start(); @@ -431,7 +435,7 @@ public class TestSpoolDirectorySource { Assert.assertFalse("Server did not error", source.hasFatalError()); Assert.assertEquals("One message was read", - 1, source.getSourceCounter().getEventAcceptedCount()); + 1, source.getSourceCounter().getEventAcceptedCount()); source.stop(); } }
