Author: srowen
Date: Mon Apr 18 21:42:12 2011
New Revision: 1094776

URL: http://svn.apache.org/viewvc?rev=1094776&view=rev
Log:
Maybe fix issue with accidentally reading _SUCCESS files from Cloudera distro

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java?rev=1094776&r1=1094775&r2=1094776&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/df/mapreduce/partial/Step0Job.java
 Mon Apr 18 21:42:12 2011
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -42,8 +41,9 @@ import org.apache.hadoop.mapreduce.lib.i
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
-import org.apache.mahout.df.DFUtils;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -129,22 +129,20 @@ public class Step0Job {
     Configuration conf = job.getConfiguration();
     
     log.info("mapred.map.tasks = {}", conf.getInt("mapred.map.tasks", -1));
-    
-    FileSystem fs = outputPath.getFileSystem(conf);
-    
-    Path[] outfiles = DFUtils.listOutputFiles(fs, outputPath);
-    
+
     List<Integer> keys = new ArrayList<Integer>();
     List<Step0Output> values = new ArrayList<Step0Output>();
-    
+
     // read all the outputs
-    for (Path path : outfiles) {
-      for (Pair<IntWritable,Step0Output> record : new 
SequenceFileIterable<IntWritable,Step0Output>(path, conf)) {
-        keys.add(record.getFirst().get());
-        values.add(record.getSecond());
-      }
+    for (Pair<IntWritable,Step0Output> record
+         : new SequenceFileDirIterable<IntWritable,Step0Output>(outputPath,
+                                                                PathType.LIST,
+                                                                
PathFilters.logsCRCFilter(), 
+                                                                conf)) {
+      keys.add(record.getFirst().get());
+      values.add(record.getSecond());
     }
-    
+
     return processOutput(keys, values);
   }
   


Reply via email to