Repository: flume Updated Branches: refs/heads/trunk b252267ed -> 0602cb032
FLUME-3294 Fix polling logic in TaildirSource TaildirSource.process() implements the correct polling logic now. It returns Status.READY / Status.BACKOFF which controls the common backoff sleeping mechanism implemented in PollableSourceRunner.PollingRunner (instead of always returning Status.READY and sleeping inside the method which was an incorrect behaviour). This closes #241 Reviewers: Endre Major, Denes Arvay (Peter Turcsanyi via Ferenc Szabo) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0602cb03 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0602cb03 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0602cb03 Branch: refs/heads/trunk Commit: 0602cb032b44cba1d660c3ff4405b3504e63118b Parents: b252267 Author: Peter Turcsanyi <[email protected]> Authored: Thu Nov 22 21:51:28 2018 +0100 Committer: Ferenc Szabo <[email protected]> Committed: Thu Nov 22 21:51:28 2018 +0100 ---------------------------------------------------------------------- .../flume/source/taildir/TaildirSource.java | 20 +++++------ .../flume/source/taildir/TestTaildirSource.java | 35 ++++++++++++++++++++ 2 files changed, 44 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0602cb03/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java index 15ba507..9ecccd7 100644 --- a/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/main/java/org/apache/flume/source/taildir/TaildirSource.java @@ -229,22 +229,20 @@ public class TaildirSource extends AbstractSource implements @Override public Status process() { - Status status = Status.READY; + Status status = Status.BACKOFF; try { existingInodes.clear(); existingInodes.addAll(reader.updateTailFiles()); for (long inode : existingInodes) { TailFile tf = reader.getTailFiles().get(inode); if (tf.needTail()) { - tailFileProcess(tf, true); + boolean hasMoreLines = tailFileProcess(tf, true); + if (hasMoreLines) { + status = Status.READY; + } } } closeTailFiles(); - try { - TimeUnit.MILLISECONDS.sleep(retryInterval); - } catch (InterruptedException e) { - logger.info("Interrupted while sleeping"); - } } catch (Throwable t) { logger.error("Unable to tail files", t); sourceCounter.incrementEventReadFail(); @@ -263,14 +261,14 @@ public class TaildirSource extends AbstractSource implements return maxBackOffSleepInterval; } - private void tailFileProcess(TailFile tf, boolean backoffWithoutNL) + private boolean tailFileProcess(TailFile tf, boolean backoffWithoutNL) throws IOException, InterruptedException { long batchCount = 0; while (true) { reader.setCurrentFile(tf); List<Event> events = reader.readEvents(batchSize, backoffWithoutNL); if (events.isEmpty()) { - break; + return false; } sourceCounter.addToEventReceivedCount(events.size()); sourceCounter.incrementAppendBatchReceivedCount(); @@ -291,11 +289,11 @@ public class TaildirSource extends AbstractSource implements sourceCounter.incrementAppendBatchAcceptedCount(); if (events.size() < batchSize) { logger.debug("The events taken from " + tf.getPath() + " is less than " + batchSize); - break; + return false; } if (++batchCount >= maxBatchCount) { logger.debug("The batches read from the same file is larger than " + maxBatchCount ); - break; + return true; } } } http://git-wip-us.apache.org/repos/asf/flume/blob/0602cb03/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java ---------------------------------------------------------------------- diff --git a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java index 416e82a..1c30cd4 100644 --- a/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java +++ b/flume-ng-sources/flume-taildir-source/src/test/java/org/apache/flume/source/taildir/TestTaildirSource.java @@ -29,6 +29,7 @@ import org.apache.flume.ChannelException; import org.apache.flume.ChannelSelector; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.PollableSource.Status; import org.apache.flume.Transaction; import org.apache.flume.channel.ChannelProcessor; import org.apache.flume.channel.MemoryChannel; @@ -437,4 +438,38 @@ public class TestTaildirSource { assertEquals(secondFile + "line4", new String(eventList.get(7).getBody())); } + @Test + public void testStatus() throws IOException { + File f1 = new File(tmpDir, "file1"); + File f2 = new File(tmpDir, "file2"); + Files.write("file1line1\nfile1line2\n" + + "file1line3\nfile1line4\nfile1line5\n", f1, Charsets.UTF_8); + Files.write("file2line1\nfile2line2\n" + + "file2line3\n", f2, Charsets.UTF_8); + + Context context = new Context(); + context.put(POSITION_FILE, posFilePath); + context.put(FILE_GROUPS, "fg"); + context.put(FILE_GROUPS_PREFIX + "fg", tmpDir.getAbsolutePath() + "/file.*"); + context.put(BATCH_SIZE, String.valueOf(1)); + context.put(MAX_BATCH_COUNT, String.valueOf(2)); + + Configurables.configure(source, context); + source.start(); + + Status status; + + status = source.process(); + assertEquals(Status.READY, status); + + status = source.process(); + assertEquals(Status.READY, status); + + status = source.process(); + assertEquals(Status.BACKOFF, status); + + status = source.process(); + assertEquals(Status.BACKOFF, status); + } + }
