Repository: flume
Updated Branches:
  refs/heads/trunk a582c100f -> f979b2683


FLUME-2502. Improve Spool Directory Source's performance by not listing files 
each time.

(Prateek Rungta via Hari)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/f979b268
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/f979b268
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/f979b268

Branch: refs/heads/trunk
Commit: f979b2683fc48d85806ae7593ee0e393bd812260
Parents: a582c10
Author: Hari Shreedharan <[email protected]>
Authored: Wed Oct 15 21:29:46 2014 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Wed Oct 15 21:29:46 2014 -0700

----------------------------------------------------------------------
 .../avro/ReliableSpoolingFileEventReader.java   | 48 +++++++++-----
 .../TestReliableSpoolingFileEventReader.java    | 66 +++++++++++++++++++-
 2 files changed, 97 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/f979b268/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index f858b56..1833076 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -19,6 +19,7 @@
 
 package org.apache.flume.client.avro;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
@@ -45,6 +46,7 @@ import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.*;
 import java.util.regex.Pattern;
+import java.util.ArrayList;
 
 /**
  * <p/>A {@link ReliableEventReader} which reads log data from files stored
@@ -99,6 +101,10 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   private Optional<FileInfo> lastFileRead = Optional.absent();
   private boolean committed = true;
 
+  /** Instance var to Cache directory listing **/
+  private Iterator<File> candidateFileIter = null;
+  private int listFilesCount = 0;
+
   /**
    * Create a ReliableSpoolingFileEventReader to watch the given directory.
    */
@@ -195,6 +201,11 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
     this.metaFile = new File(trackerDirectory, metaFileName);
   }
 
+  @VisibleForTesting
+  int getListFilesCount() {
+    return listFilesCount;
+  }
+
   /** Return the filename which generated the data from the last successful
    * {@link #readEvents(int)} call. Returns null if called before any file
    * contents are read. */
@@ -409,29 +420,38 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
    * If two or more files are equally old/young, then the file name with
    * lower lexicographical value is returned.
    * If the {@link #consumeOrder} variable is {@link ConsumeOrder#RANDOM}
-   * then returns any arbitrary file in the directory.
+   * then cache the directory listing to amortize retreival cost, and return
+   * any arbitary file from the directory.
    */
   private Optional<FileInfo> getNextFile() {
-    /* Filter to exclude finished or hidden files */
-    FileFilter filter = new FileFilter() {
-      public boolean accept(File candidate) {
-        String fileName = candidate.getName();
-        if ((candidate.isDirectory()) ||
+    List<File> candidateFiles = Collections.emptyList();
+
+    if (consumeOrder != ConsumeOrder.RANDOM ||
+      candidateFileIter == null ||
+      !candidateFileIter.hasNext()) {
+      /* Filter to exclude finished or hidden files */
+      FileFilter filter = new FileFilter() {
+        public boolean accept(File candidate) {
+          String fileName = candidate.getName();
+          if ((candidate.isDirectory()) ||
             (fileName.endsWith(completedSuffix)) ||
             (fileName.startsWith(".")) ||
             ignorePattern.matcher(fileName).matches()) {
-          return false;
+            return false;
+          }
+          return true;
         }
-        return true;
-      }
-    };
-    List<File> candidateFiles = Arrays.asList(
-      spoolDirectory.listFiles(filter));
-    if (candidateFiles.isEmpty()) { // No matching file in spooling directory.
+      };
+      candidateFiles = Arrays.asList(spoolDirectory.listFiles(filter));
+      listFilesCount++;
+      candidateFileIter = candidateFiles.iterator();
+    }
+
+    if (!candidateFileIter.hasNext()) { // No matching file in spooling 
directory.
       return Optional.absent();
     }
 
-    File selectedFile = candidateFiles.get(0); // Select the first random file.
+    File selectedFile = candidateFileIter.next();
     if (consumeOrder == ConsumeOrder.RANDOM) { // Selected file is random.
       return openFile(selectedFile);
     } else if (consumeOrder == ConsumeOrder.YOUNGEST) {

http://git-wip-us.apache.org/repos/asf/flume/blob/f979b268/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
index 6a02612..a6b2473 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Files;
 import junit.framework.Assert;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.SystemUtils;
 import org.apache.flume.Event;
 import 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
 import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
@@ -38,6 +39,7 @@ import java.io.FileFilter;
 import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
+import java.util.concurrent.*;
 
 public class TestReliableSpoolingFileEventReader {
 
@@ -212,7 +214,7 @@ public class TestReliableSpoolingFileEventReader {
     FileUtils.write(fileName,
       "New file created in the end. Shoud be read randomly.\n");
     Set<String> actual = Sets.newHashSet();
-    readEventsForFilesInDir(WORK_DIR, reader, actual);      
+    readEventsForFilesInDir(WORK_DIR, reader, actual);
     Set<String> expected = Sets.newHashSet();
     createExpectedFromFilesInSetup(expected);
     expected.add("");
@@ -221,6 +223,52 @@ public class TestReliableSpoolingFileEventReader {
     Assert.assertEquals(expected, actual);    
   }
 
+  @Test
+  public void testConsumeFileRandomlyNewFile() throws Exception {
+    // Atomic moves are not supported in Windows.
+    if (SystemUtils.IS_OS_WINDOWS) {
+      return;
+    }
+    final ReliableEventReader reader
+      = new ReliableSpoolingFileEventReader.Builder()
+      .spoolDirectory(WORK_DIR)
+      .consumeOrder(ConsumeOrder.RANDOM)
+      .build();
+    File fileName = new File(WORK_DIR, "new-file");
+    FileUtils.write(fileName,
+      "New file created in the end. Shoud be read randomly.\n");
+    Set<String> expected = Sets.newHashSet();
+    File tempDir = Files.createTempDir();
+    File tempFile = new File(tempDir, "t");
+    File finalFile = new File(WORK_DIR, "t-file");
+    FileUtils.write(tempFile, "Last file");
+    final Set<String> actual = Sets.newHashSet();
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    final Semaphore semaphore = new Semaphore(0);
+    Future<Void> wait = executor.submit(
+      new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          readEventsForFilesInDir(WORK_DIR, reader, actual, semaphore);
+          return null;
+        }
+      }
+    );
+    semaphore.acquire();
+    tempFile.renameTo(finalFile);
+    wait.get();
+    finalFile.delete();
+    FileUtils.deleteQuietly(tempDir);
+    createExpectedFromFilesInSetup(expected);
+    expected.add("");
+    expected.add(
+      "New file created in the end. Shoud be read randomly.");
+    expected.add("Last file");
+    Assert.assertEquals(2, ((ReliableSpoolingFileEventReader)reader)
+      .getListFilesCount());
+    Assert.assertEquals(expected, actual);
+  }
+
 
   @Test
   public void testConsumeFileOldest() throws IOException, InterruptedException 
{
@@ -414,17 +462,29 @@ public class TestReliableSpoolingFileEventReader {
       deleteDir(dir);
     }
   }
+
+  private void readEventsForFilesInDir(File dir, ReliableEventReader reader,
+    Collection<String> actual) throws IOException {
+    readEventsForFilesInDir(dir, reader, actual, null);
+  }
     
   /* Read events, one for each file in the given directory. */
   private void readEventsForFilesInDir(File dir, ReliableEventReader reader, 
-      Collection<String> actual) throws IOException {
+      Collection<String> actual, Semaphore semaphore) throws IOException {
     List<Event> events;
     for (int i=0; i < listFiles(dir).size(); i++) {
       events = reader.readEvents(10);
-      for (Event e: events) {
+      for (Event e : events) {
         actual.add(new String(e.getBody()));
       }
       reader.commit();
+      try {
+        if (semaphore != null) {
+          semaphore.release();
+        }
+      } catch (Exception ex) {
+        throw new IOException(ex);
+      }
     }
   }    
   /* Create expected results out of the files created in the setup method. */

Reply via email to