Author: gsingers
Date: Thu Sep  8 12:04:39 2011
New Revision: 1166645

URL: http://svn.apache.org/viewvc?rev=1166645&view=rev
Log:
MAHOUT-802: count the users if they haven't already been counted

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1166645&r1=1166644&r2=1166645&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
 Thu Sep  8 12:04:39 2011
@@ -35,6 +35,8 @@ import org.apache.mahout.cf.taste.hadoop
 import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
 import org.apache.mahout.cf.taste.hadoop.similarity.item.ToItemVectorsReducer;
 import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
 import org.apache.mahout.math.VarIntWritable;
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.VectorWritable;
@@ -161,7 +163,7 @@ public final class RecommenderJob extend
       itemIDIndex.waitForCompletion(true);
     }
 
-    int numberOfUsers = 0;
+    int numberOfUsers = -1;
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       Job toUserVector = prepareJob(
         inputPath, userVectorPath, TextInputFormat.class,
@@ -195,6 +197,9 @@ public final class RecommenderJob extend
       /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should 
refactor this call to something like
        * new DistributedRowMatrix(...).rowSimilarity(...) */
       try {
+        if (numberOfUsers == -1){
+           numberOfUsers = (int) HadoopUtil.countRecords(userVectorPath, 
PathType.LIST, null, getConf());
+        }
         ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
           "-Dmapred.input.dir=" + itemUserMatrixPath,
           "-Dmapred.output.dir=" + similarityMatrixPath,

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1166645&r1=1166644&r2=1166645&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java 
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java 
Thu Sep  8 12:04:39 2011
@@ -25,7 +25,10 @@ import java.util.Iterator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
 import 
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +66,25 @@ public final class HadoopUtil {
     return count;
   }
 
+  /**
+   * Count all the records in a directory using a {@link 
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator}
+   * @param path The {@link org.apache.hadoop.fs.Path} to count
+   * @param pt The {@link 
org.apache.mahout.common.iterator.sequencefile.PathType}
+   * @param filter Apply the {@link org.apache.hadoop.fs.PathFilter}.  May be 
null
+   * @param conf The Hadoop {@link org.apache.hadoop.conf.Configuration}
+   * @return The number of records
+   * @throws IOException if there was an IO error
+   */
+  public static long countRecords(Path path, PathType pt, PathFilter filter, 
Configuration conf) throws IOException {
+    long count = 0;
+    Iterator<?> iterator = new SequenceFileDirValueIterator<Writable>(path, 
pt, filter, null, true, conf);
+    while (iterator.hasNext()) {
+      iterator.next();
+      count++;
+    }
+    return count;
+  }
+
   public static InputStream openStream(Path path, Configuration conf) throws 
IOException {
     FileSystem fs = FileSystem.get(path.toUri(), conf);
     return fs.open(path.makeQualified(fs));


Reply via email to