Author: sslavic
Date: Wed Aug 28 19:35:36 2013
New Revision: 1518338
URL: http://svn.apache.org/r1518338
Log:
MAHOUT-1302 Replaced PrefixAddition FileFilter with equivalent implementation
of commons-io DirectoryWalker, for walking the directory tree
Modified:
mahout/trunk/integration/pom.xml
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
mahout/trunk/pom.xml
Modified: mahout/trunk/integration/pom.xml
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/pom.xml?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
--- mahout/trunk/integration/pom.xml (original)
+++ mahout/trunk/integration/pom.xml Wed Aug 28 19:35:36 2013
@@ -116,6 +116,11 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-commons-csv</artifactId>
<version>3.5.0</version>
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromMailArchives.java
Wed Aug 28 19:35:36 2013
@@ -20,6 +20,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
+import org.apache.commons.io.DirectoryWalker;
import org.apache.commons.io.comparator.CompositeFileComparator;
import org.apache.commons.io.comparator.DirectoryFileComparator;
import org.apache.commons.io.comparator.PathFileComparator;
@@ -42,11 +43,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.Charset;
+import java.util.ArrayDeque;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
@@ -77,22 +80,14 @@ public final class SequenceFilesFromMail
private static final int MAX_JOB_SPLIT_LOCATIONS = 1000000;
- @SuppressWarnings("unchecked")
- private static final Comparator<File> FILE_COMPARATOR = new
CompositeFileComparator(
- DirectoryFileComparator.DIRECTORY_REVERSE,
PathFileComparator.PATH_COMPARATOR);
-
public void createSequenceFiles(MailOptions options) throws IOException {
ChunkedWriter writer = new ChunkedWriter(getConf(),
options.getChunkSize(), new Path(options.getOutputDir()));
MailProcessor processor = new MailProcessor(options, options.getPrefix(),
writer);
try {
if (options.getInput().isDirectory()) {
- File[] inputFilesAndDirs = options.getInput().listFiles();
- Arrays.sort(inputFilesAndDirs, FILE_COMPARATOR);
- PrefixAdditionFilter filter = new PrefixAdditionFilter(processor,
writer);
- for (File aFile : inputFilesAndDirs) {
- filter.accept(aFile);
- }
- log.info("Parsed {} messages from {}", filter.getMessageCount(),
options.getInput().getAbsolutePath());
+ PrefixAdditionDirectoryWalker walker = new
PrefixAdditionDirectoryWalker(processor, writer);
+ walker.walk(options.getInput());
+ log.info("Parsed {} messages from {}", walker.getMessageCount(),
options.getInput().getAbsolutePath());
} else {
long start = System.currentTimeMillis();
long cnt = processor.parseMboxLineByLine(options.getInput());
@@ -104,44 +99,73 @@ public final class SequenceFilesFromMail
}
}
- public class PrefixAdditionFilter implements FileFilter {
- private final MailProcessor processor;
+ private static class PrefixAdditionDirectoryWalker extends
DirectoryWalker<Object> {
+
+ @SuppressWarnings("unchecked")
+ private static final Comparator<File> FILE_COMPARATOR = new
CompositeFileComparator(
+ DirectoryFileComparator.DIRECTORY_REVERSE,
PathFileComparator.PATH_COMPARATOR);
+
+ private final Deque<MailProcessor> processors = new
ArrayDeque<MailProcessor>();
private final ChunkedWriter writer;
- private long messageCount;
+ private Deque<Long> messageCounts = new ArrayDeque<Long>();
- public PrefixAdditionFilter(MailProcessor processor, ChunkedWriter writer)
{
- this.processor = processor;
+ public PrefixAdditionDirectoryWalker(MailProcessor processor,
ChunkedWriter writer) {
+ processors.addFirst(processor);
this.writer = writer;
- this.messageCount = 0;
+ messageCounts.addFirst(0L);
+ }
+
+ public void walk(File startDirectory) throws IOException {
+ super.walk(startDirectory, null);
}
public long getMessageCount() {
- return messageCount;
+ return messageCounts.getFirst();
}
@Override
- public boolean accept(File current) {
- if (current.isDirectory()) {
+ protected void handleDirectoryStart(File current, int depth,
Collection<Object> results) throws IOException {
+ if (depth > 0) {
log.info("At {}", current.getAbsolutePath());
- PrefixAdditionFilter nested = new PrefixAdditionFilter(
- new MailProcessor(processor.getOptions(), processor.getPrefix()
- + File.separator + current.getName(), writer), writer);
- File[] nestedInputFilesAndDirs = current.listFiles();
- Arrays.sort(nestedInputFilesAndDirs, FILE_COMPARATOR);
- for (File aFile : nestedInputFilesAndDirs) {
- nested.accept(aFile);
- }
- long dirCount = nested.getMessageCount();
- log.info("Parsed {} messages from directory {}", dirCount,
current.getAbsolutePath());
- messageCount += dirCount;
- } else {
- try {
- messageCount += processor.parseMboxLineByLine(current);
- } catch (IOException e) {
- throw new IllegalStateException("Error processing " + current, e);
- }
+ MailProcessor processor = processors.getFirst();
+ MailProcessor subDirProcessor = new
MailProcessor(processor.getOptions(), processor.getPrefix()
+ + File.separator + current.getName(), writer);
+ processors.push(subDirProcessor);
+ messageCounts.push(0L);
+ }
+ }
+
+ @Override
+ protected File[] filterDirectoryContents(File directory, int depth, File[]
files) throws IOException {
+ Arrays.sort(files, FILE_COMPARATOR);
+ return files;
+ }
+
+ @Override
+ protected void handleFile(File current, int depth, Collection<Object>
results) throws IOException {
+ MailProcessor processor = processors.getFirst();
+ long currentDirMessageCount = messageCounts.pop();
+ try {
+ currentDirMessageCount += processor.parseMboxLineByLine(current);
+ } catch (IOException e) {
+ throw new IllegalStateException("Error processing " + current, e);
+ }
+ messageCounts.push(currentDirMessageCount);
+ }
+
+ @Override
+ protected void handleDirectoryEnd(File current, int depth,
Collection<Object> results) throws IOException {
+ if (depth > 0) {
+ final long currentDirMessageCount = messageCounts.pop();
+ log.info("Parsed {} messages from directory {}",
currentDirMessageCount, current.getAbsolutePath());
+
+ processors.pop();
+
+ // aggregate message counts
+ long parentDirMessageCount = messageCounts.pop();
+ parentDirMessageCount += currentDirMessageCount;
+ messageCounts.push(parentDirMessageCount);
}
- return false;
}
}
Modified: mahout/trunk/pom.xml
URL:
http://svn.apache.org/viewvc/mahout/trunk/pom.xml?rev=1518338&r1=1518337&r2=1518338&view=diff
==============================================================================
--- mahout/trunk/pom.xml (original)
+++ mahout/trunk/pom.xml Wed Aug 28 19:35:36 2013
@@ -366,6 +366,12 @@
</dependency>
<dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>