Author: sslavic
Date: Wed Aug 28 19:35:36 2013
New Revision: 1518338

URL: http://svn.apache.org/r1518338
Log:
MAHOUT-1302 Replaced PrefixAddition FileFilter with equivalent implementation 
of commons-io DirectoryWalker, for walking the directory tree

Modified:
    mahout/trunk/integration/pom.xml
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
    mahout/trunk/pom.xml

Modified: mahout/trunk/integration/pom.xml
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/pom.xml?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
--- mahout/trunk/integration/pom.xml (original)
+++ mahout/trunk/integration/pom.xml Wed Aug 28 19:35:36 2013
@@ -116,6 +116,11 @@
     </dependency>
 
     <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.solr</groupId>
       <artifactId>solr-commons-csv</artifactId>
       <version>3.5.0</version>

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
 Wed Aug 28 19:35:36 2013
@@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 
+import org.apache.commons.io.DirectoryWalker;
 import org.apache.commons.io.comparator.CompositeFileComparator;
 import org.apache.commons.io.comparator.DirectoryFileComparator;
 import org.apache.commons.io.comparator.PathFileComparator;
@@ -42,11 +43,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileFilter;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.ArrayDeque;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Comparator;
+import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.regex.Pattern;
@@ -77,22 +80,14 @@ public final class SequenceFilesFromMail
 
   private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
 
-  @SuppressWarnings("unchecked")
-  private static final Comparator<File> FILE_COMPARATOR = new 
CompositeFileComparator(
-      DirectoryFileComparator.DIRECTORY_REVERSE, 
PathFileComparator.PATH_COMPARATOR);
-
   public void createSequenceFiles(MailOptions options) throws IOException {
     ChunkedWriter writer = new ChunkedWriter(getConf(), 
options.getChunkSize(), new Path(options.getOutputDir()));
     MailProcessor processor = new MailProcessor(options, options.getPrefix(), 
writer);
     try {
       if (options.getInput().isDirectory()) {
-        File[] inputFilesAndDirs = options.getInput().listFiles();
-        Arrays.sort(inputFilesAndDirs, FILE_COMPARATOR);
-        PrefixAdditionFilter filter = new PrefixAdditionFilter(processor, 
writer);
-        for (File aFile : inputFilesAndDirs) {
-          filter.accept(aFile);
-        }
-        log.info("Parsed {} messages from {}", filter.getMessageCount(), 
options.getInput().getAbsolutePath());
+        PrefixAdditionDirectoryWalker walker = new 
PrefixAdditionDirectoryWalker(processor, writer);
+        walker.walk(options.getInput());
+        log.info("Parsed {} messages from {}", walker.getMessageCount(), 
options.getInput().getAbsolutePath());
       } else {
         long start = System.currentTimeMillis();
         long cnt = processor.parseMboxLineByLine(options.getInput());
@@ -104,44 +99,73 @@ public final class SequenceFilesFromMail
     }
   }
 
-  public class PrefixAdditionFilter implements FileFilter {
-    private final MailProcessor processor;
+  private static class PrefixAdditionDirectoryWalker extends 
DirectoryWalker<Object> {
+
+    @SuppressWarnings("unchecked")
+    private static final Comparator<File> FILE_COMPARATOR = new 
CompositeFileComparator(
+        DirectoryFileComparator.DIRECTORY_REVERSE, 
PathFileComparator.PATH_COMPARATOR);
+
+    private final Deque<MailProcessor> processors = new 
ArrayDeque<MailProcessor>();
     private final ChunkedWriter writer;
-    private long messageCount;
+    private Deque<Long> messageCounts = new ArrayDeque<Long>();
 
-    public PrefixAdditionFilter(MailProcessor processor, ChunkedWriter writer) 
{
-      this.processor = processor;
+    public PrefixAdditionDirectoryWalker(MailProcessor processor, 
ChunkedWriter writer) {
+      processors.addFirst(processor);
       this.writer = writer;
-      this.messageCount = 0;
+      messageCounts.addFirst(0L);
+    }
+
+    public void walk(File startDirectory) throws IOException {
+      super.walk(startDirectory, null);
     }
 
     public long getMessageCount() {
-      return messageCount;
+      return messageCounts.getFirst();
     }
 
     @Override
-    public boolean accept(File current) {
-      if (current.isDirectory()) {
+    protected void handleDirectoryStart(File current, int depth, 
Collection<Object> results) throws IOException {
+      if (depth > 0) {
         log.info("At {}", current.getAbsolutePath());
-        PrefixAdditionFilter nested = new PrefixAdditionFilter(
-          new MailProcessor(processor.getOptions(), processor.getPrefix()
-            + File.separator + current.getName(), writer), writer);
-        File[] nestedInputFilesAndDirs = current.listFiles();
-        Arrays.sort(nestedInputFilesAndDirs, FILE_COMPARATOR);
-        for (File aFile : nestedInputFilesAndDirs) {
-          nested.accept(aFile);
-        }
-        long dirCount = nested.getMessageCount();
-        log.info("Parsed {} messages from directory {}", dirCount, 
current.getAbsolutePath());
-        messageCount += dirCount;
-      } else {
-        try {
-          messageCount += processor.parseMboxLineByLine(current);
-        } catch (IOException e) {
-          throw new IllegalStateException("Error processing " + current, e);
-        }
+        MailProcessor processor = processors.getFirst();
+        MailProcessor subDirProcessor = new 
MailProcessor(processor.getOptions(), processor.getPrefix()
+            + File.separator + current.getName(), writer);
+        processors.push(subDirProcessor);
+        messageCounts.push(0L);
+      }
+    }
+
+    @Override
+    protected File[] filterDirectoryContents(File directory, int depth, File[] 
files) throws IOException {
+      Arrays.sort(files, FILE_COMPARATOR);
+      return files;
+    }
+
+    @Override
+    protected void handleFile(File current, int depth, Collection<Object> 
results) throws IOException {
+      MailProcessor processor = processors.getFirst();
+      long currentDirMessageCount = messageCounts.pop();
+      try {
+        currentDirMessageCount += processor.parseMboxLineByLine(current);
+      } catch (IOException e) {
+        throw new IllegalStateException("Error processing " + current, e);
+      }
+      messageCounts.push(currentDirMessageCount);
+    }
+
+    @Override
+    protected void handleDirectoryEnd(File current, int depth, 
Collection<Object> results) throws IOException {
+      if (depth > 0) {
+        final long currentDirMessageCount = messageCounts.pop();
+        log.info("Parsed {} messages from directory {}", 
currentDirMessageCount, current.getAbsolutePath());
+
+        processors.pop();
+
+        // aggregate message counts
+        long parentDirMessageCount = messageCounts.pop();
+        parentDirMessageCount += currentDirMessageCount;
+        messageCounts.push(parentDirMessageCount);
       }
-      return false;
     }
   }
 

Modified: mahout/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/mahout/trunk/pom.xml?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
--- mahout/trunk/pom.xml (original)
+++ mahout/trunk/pom.xml Wed Aug 28 19:35:36 2013
@@ -366,6 +366,12 @@
       </dependency>
 
       <dependency>
+        <groupId>commons-io</groupId>
+        <artifactId>commons-io</artifactId>
+        <version>2.4</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.5</version>


Reply via email to