Repository: flume Updated Branches: refs/heads/trunk dffe1dcbc -> b252267ed
FLUME-3101 Add maxBatchCount config property to Taildir Source. If there are multiple files in the path(s) that need to be tailed and there is a file written by high frequency, then Taildir can read the batchSize size events from that file every time. This can lead to an endless loop and Taildir will only read data from the busy file, while other files will not be processed. Another problem is that in this case TaildirSource will be unresponsive to stop requests too. This commit handles this situation by introducing a new config property called maxBatchCount. It controls the number of batches being read consecutively from the same file. After reading maxBatchCount rounds from a file, Taildir will switch to another file / will have a break in the processing. This change is based on hunshenshi's patch. This closes #240 Reviewers: Ferenc Szabo, Endre Major (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b252267e Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b252267e Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b252267e Branch: refs/heads/trunk Commit: b252267ed297b849a8c3d900f7263e4abe5101c9 Parents: dffe1dc Author: Peter Turcsanyi <[email protected]> Authored: Thu Nov 22 17:12:57 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Thu Nov 22 17:12:57 2018 +0100 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 5 ++ flume-ng-sources/flume-taildir-source/pom.xml | 2 +- .../flume/source/taildir/TaildirSource.java | 13 +++++ .../TaildirSourceConfigurationConstants.java | 4 ++ .../flume/source/taildir/TestTaildirSource.java | 57 ++++++++++++++++++++ 5 files changed, 80 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index c6d947a..01eb81d 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1375,6 +1375,10 @@ skipToEnd false Whether to sk idleTimeout 120000 Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it. writePosInterval 3000 Interval time (ms) to write the last position of each file on the position file. batchSize 100 Max number of lines to read and send to the channel at a time. Using the default is usually fine. +maxBatchCount Long.MAX_VALUE Controls the number of batches being read consecutively from the same file. + If the source is tailing multiple files and one of them is written at a fast rate, + it can prevent other files to be processed, because the busy file would be read in an endless loop. + In this case lower this value. backoffSleepIncrement 1000 The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data. maxBackoffSleep 5000 The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data. cachePatternMatching true Listing directories and applying the filename regex pattern may be time consuming for directories @@ -1401,6 +1405,7 @@ Example for agent named a1: a1.sources.r1.headers.f2.headerKey1 = value2 a1.sources.r1.headers.f2.headerKey2 = value2-2 a1.sources.r1.fileHeader = true + a1.sources.ri.maxBatchCount = 1000 Twitter 1% firehose Source (experimental) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/pom.xml ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/pom.xml b/flume-ng-sources/flume-taildir-source/pom.xml index 9cc07a3..192aaeb 100644 --- a/flume-ng-sources/flume-taildir-source/pom.xml +++ b/flume-ng-sources/flume-taildir-source/pom.xml @@ -32,7 +32,7 @@ limitations under the License. <properties> <!-- TODO fix spotbugs/pmd violations --> - <spotbugs.maxAllowedViolations>13</spotbugs.maxAllowedViolations> + <spotbugs.maxAllowedViolations>14</spotbugs.maxAllowedViolations> <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations> </properties> http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index e121a2b..15ba507 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -87,6 +87,7 @@ public class TaildirSource extends AbstractSource implements private Long maxBackOffSleepInterval; private boolean fileHeader; private String fileHeaderKey; + private Long maxBatchCount; @Override public synchronized void start() { @@ -185,6 +186,12 @@ public class TaildirSource extends AbstractSource implements DEFAULT_FILE_HEADER); fileHeaderKey = context.getString(FILENAME_HEADER_KEY, DEFAULT_FILENAME_HEADER_KEY); + maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT); + if (maxBatchCount <= 0) { + maxBatchCount = DEFAULT_MAX_BATCH_COUNT; + logger.warn("Invalid maxBatchCount specified, initializing source " + + "default maxBatchCount of {}", maxBatchCount); + } if (sourceCounter == null) { sourceCounter = new SourceCounter(getName()); @@ -258,6 +265,7 @@ public class TaildirSource extends AbstractSource implements private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { + long batchCount = 0; while (true) { reader.setCurrentFile(tf); List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); @@ -282,6 +290,11 @@ public class TaildirSource extends AbstractSource implements sourceCounter.addToEventAcceptedCount(events.size()); sourceCounter.incrementAppendBatchAcceptedCount(); if (events.size() < batchSize) { + logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize); + break; + } + if (++batchCount >= maxBatchCount) { + logger.debug("The batches read from the same file is larger than " + maxBatchCount ); break; } } http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java index f2347f3..c614e26 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java @@ -63,4 +63,8 @@ public class TaildirSourceConfigurationConstants { /** Whether to include absolute path filename in a header. */ public static final String FILENAME_HEADER = "fileHeader"; public static final boolean DEFAULT_FILE_HEADER = false; + + /** The max number of batch reads from a file in one loop */ + public static final String MAX_BATCH_COUNT = "maxBatchCount"; + public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE; } http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index 6825cc5..416e82a 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -55,6 +55,8 @@ import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstant import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER; import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BATCH_SIZE; +import static org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.MAX_BATCH_COUNT; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -380,4 +382,59 @@ public class TestTaildirSource { source.stop(); } + @Test + public void testMaxBatchCount() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + Files.write("file1line1\nfile1line2\n" + + "file1line3\nfile1line4\n", f1, Charsets.UTF_8); + Files.write("file2line1\nfile2line2\n" + + "file2line3\nfile2line4\n", f2, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "fg"); + context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*"); + context.put(BATCH_SIZE, String.valueOf(1)); + context.put(MAX_BATCH_COUNT, String.valueOf(2)); + + Configurables.configure(source, context); + source.start(); + + // 2 x 4 lines will be processed in 2 rounds + source.process(); + source.process(); + + List<Event> eventList = new ArrayList<Event>(); + for (int i = 0; i < 8; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + Event e = channel.take(); + txn.commit(); + txn.close(); + if (e == null) { + break; + } + eventList.add(e); + } + + assertEquals("1", context.getString(BATCH_SIZE)); + assertEquals("2", context.getString(MAX_BATCH_COUNT)); + + assertEquals(8, eventList.size()); + + // the processing order of the files is not deterministic + String firstFile = new String(eventList.get(0).getBody()).substring(0, 5); + String secondFile = firstFile.equals("file1") ? "file2" : "file1"; + + assertEquals(firstFile + "line1", new String(eventList.get(0).getBody())); + assertEquals(firstFile + "line2", new String(eventList.get(1).getBody())); + assertEquals(secondFile + "line1", new String(eventList.get(2).getBody())); + assertEquals(secondFile + "line2", new String(eventList.get(3).getBody())); + assertEquals(firstFile + "line3", new String(eventList.get(4).getBody())); + assertEquals(firstFile + "line4", new String(eventList.get(5).getBody())); + assertEquals(secondFile + "line3", new String(eventList.get(6).getBody())); + assertEquals(secondFile + "line4", new String(eventList.get(7).getBody())); + } + }
