Author: gsingers
Date: Thu Sep 8 12:04:39 2011
New Revision: 1166645
URL: http://svn.apache.org/viewvc?rev=1166645&view=rev
Log:
MAHOUT-802: count the users if they haven't already been counted
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1166645&r1=1166644&r2=1166645&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Thu Sep 8 12:04:39 2011
@@ -35,6 +35,8 @@ import org.apache.mahout.cf.taste.hadoop
import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
import org.apache.mahout.cf.taste.hadoop.similarity.item.ToItemVectorsReducer;
import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.math.VarIntWritable;
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.VectorWritable;
@@ -161,7 +163,7 @@ public final class RecommenderJob extend
itemIDIndex.waitForCompletion(true);
}
- int numberOfUsers = 0;
+ int numberOfUsers = -1;
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job toUserVector = prepareJob(
inputPath, userVectorPath, TextInputFormat.class,
@@ -195,6 +197,9 @@ public final class RecommenderJob extend
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should
refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
try {
+ if (numberOfUsers == -1){
+ numberOfUsers = (int) HadoopUtil.countRecords(userVectorPath,
PathType.LIST, null, getConf());
+ }
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
"-Dmapred.input.dir=" + itemUserMatrixPath,
"-Dmapred.output.dir=" + similarityMatrixPath,
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java?rev=1166645&r1=1166644&r2=1166645&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
(original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/HadoopUtil.java
Thu Sep 8 12:04:39 2011
@@ -25,7 +25,10 @@ import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator;
import
org.apache.mahout.common.iterator.sequencefile.SequenceFileValueIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -63,6 +66,25 @@ public final class HadoopUtil {
return count;
}
+ /**
+ * Count all the records in a directory using a {@link
org.apache.mahout.common.iterator.sequencefile.SequenceFileDirValueIterator}
+ * @param path The {@link org.apache.hadoop.fs.Path} to count
+ * @param pt The {@link
org.apache.mahout.common.iterator.sequencefile.PathType}
+ * @param filter Apply the {@link org.apache.hadoop.fs.PathFilter}. May be
null
+ * @param conf The Hadoop {@link org.apache.hadoop.conf.Configuration}
+ * @return The number of records
+ * @throws IOException if there was an IO error
+ */
+ public static long countRecords(Path path, PathType pt, PathFilter filter,
Configuration conf) throws IOException {
+ long count = 0;
+ Iterator<?> iterator = new SequenceFileDirValueIterator<Writable>(path,
pt, filter, null, true, conf);
+ while (iterator.hasNext()) {
+ iterator.next();
+ count++;
+ }
+ return count;
+ }
+
public static InputStream openStream(Path path, Configuration conf) throws
IOException {
FileSystem fs = FileSystem.get(path.toUri(), conf);
return fs.open(path.makeQualified(fs));