Repository: flume
Updated Branches:
  refs/heads/trunk dffe1dcbc -> b252267ed


FLUME-3101 Add maxBatchCount config property to Taildir Source.

If there are multiple files in the path(s) that need to be tailed and there
is a file written by high frequency, then Taildir can read the batchSize size
events from that file every time. This can lead to an endless loop and Taildir
will only read data from the busy file, while other files will not be
processed.
Another problem is that in this case TaildirSource will be unresponsive to
stop requests too.

This commit handles this situation by introducing a new config property called
maxBatchCount. It controls the number of batches being read consecutively
from the same file. After reading maxBatchCount rounds from a file, Taildir
will switch to another file / will have a break in the processing.

This change is based on hunshenshi's patch.

This closes #240

Reviewers: Ferenc Szabo, Endre Major

(Peter Turcsanyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b252267e
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b252267e
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b252267e

Branch: refs/heads/trunk
Commit: b252267ed297b849a8c3d900f7263e4abe5101c9
Parents: dffe1dc
Author: Peter Turcsanyi <[email protected]>
Authored: Thu Nov 22 17:12:57 2018 +0100
Committer: Ferenc Szabo <[email protected]>
Committed: Thu Nov 22 17:12:57 2018 +0100

----------------------------------------------------------------------
 flume-ng-doc/sphinx/FlumeUserGuide.rst          |  5 ++
 flume-ng-sources/flume-taildir-source/pom.xml   |  2 +-
 .../flume/source/taildir/TaildirSource.java     | 13 +++++
 .../TaildirSourceConfigurationConstants.java    |  4 ++
 .../flume/source/taildir/TestTaildirSource.java | 57 ++++++++++++++++++++
 5 files changed, 80 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-doc/sphinx/FlumeUserGuide.rst
----------------------------------------------------------------------
diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst 
b/flume-ng-doc/sphinx/FlumeUserGuide.rst
index c6d947a..01eb81d 100644
--- a/flume-ng-doc/sphinx/FlumeUserGuide.rst
+++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst
@@ -1375,6 +1375,10 @@ skipToEnd                           false                
          Whether to sk
 idleTimeout                         120000                         Time (ms) 
to close inactive files. If the closed file is appended new lines to, this 
source will automatically re-open it.
 writePosInterval                    3000                           Interval 
time (ms) to write the last position of each file on the position file.
 batchSize                           100                            Max number 
of lines to read and send to the channel at a time. Using the default is 
usually fine.
+maxBatchCount                       Long.MAX_VALUE                 Controls 
the number of batches being read consecutively from the same file.
+                                                                   If the 
source is tailing multiple files and one of them is written at a fast rate,
+                                                                   it can 
prevent other files to be processed, because the busy file would be read in an 
endless loop.
+                                                                   In this 
case lower this value.
 backoffSleepIncrement               1000                           The 
increment for time delay before reattempting to poll for new data, when the 
last attempt did not find any new data.
 maxBackoffSleep                     5000                           The max 
time delay between each reattempt to poll for new data, when the last attempt 
did not find any new data.
 cachePatternMatching                true                           Listing 
directories and applying the filename regex pattern may be time consuming for 
directories
@@ -1401,6 +1405,7 @@ Example for agent named a1:
   a1.sources.r1.headers.f2.headerKey1 = value2
   a1.sources.r1.headers.f2.headerKey2 = value2-2
   a1.sources.r1.fileHeader = true
+  a1.sources.ri.maxBatchCount = 1000
 
 Twitter 1% firehose Source (experimental)
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/pom.xml
----------------------------------------------------------------------
diff --git a/flume-ng-sources/flume-taildir-source/pom.xml 
b/flume-ng-sources/flume-taildir-source/pom.xml
index 9cc07a3..192aaeb 100644
--- a/flume-ng-sources/flume-taildir-source/pom.xml
+++ b/flume-ng-sources/flume-taildir-source/pom.xml
@@ -32,7 +32,7 @@ limitations under the License.
 
   <properties>
     <!-- TODO fix spotbugs/pmd violations -->
-    <spotbugs.maxAllowedViolations>13</spotbugs.maxAllowedViolations>
+    <spotbugs.maxAllowedViolations>14</spotbugs.maxAllowedViolations>
     <pmd.maxAllowedViolations>3</pmd.maxAllowedViolations>
   </properties>
 

http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
index e121a2b..15ba507 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java
@@ -87,6 +87,7 @@ public class TaildirSource extends AbstractSource implements
   private Long maxBackOffSleepInterval;
   private boolean fileHeader;
   private String fileHeaderKey;
+  private Long maxBatchCount;
 
   @Override
   public synchronized void start() {
@@ -185,6 +186,12 @@ public class TaildirSource extends AbstractSource 
implements
             DEFAULT_FILE_HEADER);
     fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
             DEFAULT_FILENAME_HEADER_KEY);
+    maxBatchCount = context.getLong(MAX_BATCH_COUNT, DEFAULT_MAX_BATCH_COUNT);
+    if (maxBatchCount <= 0) {
+      maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
+      logger.warn("Invalid maxBatchCount specified, initializing source "
+          + "default maxBatchCount of {}", maxBatchCount);
+    }
 
     if (sourceCounter == null) {
       sourceCounter = new SourceCounter(getName());
@@ -258,6 +265,7 @@ public class TaildirSource extends AbstractSource implements
 
   private void tailFileProcess(TailFile tf, boolean backoffWithoutNL)
       throws IOException, InterruptedException {
+    long batchCount = 0;
     while (true) {
       reader.setCurrentFile(tf);
       List<Event> events = reader.readEvents(batchSize, backoffWithoutNL);
@@ -282,6 +290,11 @@ public class TaildirSource extends AbstractSource 
implements
       sourceCounter.addToEventAcceptedCount(events.size());
       sourceCounter.incrementAppendBatchAcceptedCount();
       if (events.size() < batchSize) {
+        logger.debug("The events taken from " + tf.getPath() + " is less than 
" + batchSize);
+        break;
+      }
+      if (++batchCount >= maxBatchCount) {
+        logger.debug("The batches read from the same file is larger than " + 
maxBatchCount );
         break;
       }
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
index f2347f3..c614e26 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSourceConfigurationConstants.java
@@ -63,4 +63,8 @@ public class TaildirSourceConfigurationConstants {
   /** Whether to include absolute path filename in a header. */
   public static final String FILENAME_HEADER = "fileHeader";
   public static final boolean DEFAULT_FILE_HEADER = false;
+
+  /** The max number of batch reads from a file in one loop */
+  public static final String MAX_BATCH_COUNT = "maxBatchCount";
+  public static final Long DEFAULT_MAX_BATCH_COUNT = Long.MAX_VALUE;
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/b252267e/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
index 6825cc5..416e82a 100644
--- 
a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
+++ 
b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java
@@ -55,6 +55,8 @@ import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstant
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.POSITION_FILE;
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER;
 import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.FILENAME_HEADER_KEY;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.BATCH_SIZE;
+import static 
org.apache.flume.source.taildir.TaildirSourceConfigurationConstants.MAX_BATCH_COUNT;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -380,4 +382,59 @@ public class TestTaildirSource {
     source.stop();
   }
 
+  @Test
+  public void testMaxBatchCount() throws IOException {
+    File f1 = new File(tmpDir, "file1");
+    File f2 = new File(tmpDir, "file2");
+    Files.write("file1line1\nfile1line2\n" +
+        "file1line3\nfile1line4\n", f1, Charsets.UTF_8);
+    Files.write("file2line1\nfile2line2\n" +
+        "file2line3\nfile2line4\n", f2, Charsets.UTF_8);
+
+    Context context = new Context();
+    context.put(POSITION_FILE, posFilePath);
+    context.put(FILE_GROUPS, "fg");
+    context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + 
"/file.*");
+    context.put(BATCH_SIZE, String.valueOf(1));
+    context.put(MAX_BATCH_COUNT, String.valueOf(2));
+
+    Configurables.configure(source, context);
+    source.start();
+
+    // 2 x 4 lines will be processed in 2 rounds
+    source.process();
+    source.process();
+
+    List<Event> eventList = new ArrayList<Event>();
+    for (int i = 0; i < 8; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+      Event e = channel.take();
+      txn.commit();
+      txn.close();
+      if (e == null) {
+        break;
+      }
+      eventList.add(e);
+    }
+
+    assertEquals("1", context.getString(BATCH_SIZE));
+    assertEquals("2", context.getString(MAX_BATCH_COUNT));
+
+    assertEquals(8, eventList.size());
+
+    // the processing order of the files is not deterministic
+    String firstFile = new String(eventList.get(0).getBody()).substring(0, 5);
+    String secondFile = firstFile.equals("file1") ? "file2" : "file1";
+
+    assertEquals(firstFile + "line1", new String(eventList.get(0).getBody()));
+    assertEquals(firstFile + "line2", new String(eventList.get(1).getBody()));
+    assertEquals(secondFile + "line1", new String(eventList.get(2).getBody()));
+    assertEquals(secondFile + "line2", new String(eventList.get(3).getBody()));
+    assertEquals(firstFile + "line3", new String(eventList.get(4).getBody()));
+    assertEquals(firstFile + "line4", new String(eventList.get(5).getBody()));
+    assertEquals(secondFile + "line3", new String(eventList.get(6).getBody()));
+    assertEquals(secondFile + "line4", new String(eventList.get(7).getBody()));
+  }
+
 }

Reply via email to