Author: srowen
Date: Mon Apr 18 21:42:12 2011
New Revision: 1094776
URL: http://svn.apache.org/viewvc?rev=1094776&view=rev
Log:
Maybe fix issue with accidentally reading _SUCCESS files from Cloudera distro
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java?rev=1094776&r1=1094775&r2=1094776&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
Mon Apr 18 21:42:12 2011
@@ -28,7 +28,6 @@ import java.util.List;
import org.apache.commons.lang.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -42,8 +41,9 @@ import org.apache.hadoop.mapreduce.lib.i
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
-import org.apache.mahout.df.DFUtils;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -129,22 +129,20 @@ public class Step0Job {
Configuration conf = job.getConfiguration();
log.info("mapred.map.tasks = {}", conf.getInt("mapred.map.tasks", -1));
-
- FileSystem fs = outputPath.getFileSystem(conf);
-
- Path[] outfiles = DFUtils.listOutputFiles(fs, outputPath);
-
+
List<Integer> keys = new ArrayList<Integer>();
List<Step0Output> values = new ArrayList<Step0Output>();
-
+
// read all the outputs
- for (Path path : outfiles) {
- for (Pair<IntWritable,Step0Output> record : new
SequenceFileIterable<IntWritable,Step0Output>(path, conf)) {
- keys.add(record.getFirst().get());
- values.add(record.getSecond());
- }
+ for (Pair<IntWritable,Step0Output> record
+ : new SequenceFileDirIterable<IntWritable,Step0Output>(outputPath,
+ PathType.LIST,
+
PathFilters.logsCRCFilter(),
+ conf)) {
+ keys.add(record.getFirst().get());
+ values.add(record.getSecond());
}
-
+
return processOutput(keys, values);
}