Author: ssc
Date: Tue Jul 30 05:15:22 2013
New Revision: 1508302

URL: http://svn.apache.org/r1508302
Log:
MAHOUT-1289 Move downsampling code into RowSimilarityJob

Modified:
    mahout/trunk/CHANGELOG
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
    
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java

Modified: mahout/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Tue Jul 30 05:15:22 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.9 - unreleased
 
+  MAHOUT-1289: Move downsampling code into RowSimilarityJob (ssc)
+
   MAHOUT-1296: Remove deprecated algorithms (ssc)
 
   MAHOUT-1295: Excluded all Maven's target directories from distribution 
archives (sslavic)

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java 
(original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java 
Tue Jul 30 05:15:22 2013
@@ -52,19 +52,6 @@ final class ALS {
     return iterator.hasNext() ? iterator.next().get() : null;
   }
 
-  /**
-   * assumes that first entry always exists
-   *
-   * @param vectors
-   */
-  public static Vector sum(Iterator<VectorWritable> vectors) {
-    Vector sum = vectors.next().get();
-    while (vectors.hasNext()) {
-      sum.assign(vectors.next().get(), Functions.PLUS);
-    }
-    return sum;
-  }
-
   public static OpenIntObjectHashMap<Vector> 
readMatrixByRowsFromDistributedCache(int numEntities,
       Configuration conf) throws IOException {
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
 Tue Jul 30 05:15:22 2013
@@ -51,6 +51,7 @@ import org.apache.mahout.common.iterator
 import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
 import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
 import org.apache.mahout.common.mapreduce.TransposeMapper;
+import org.apache.mahout.common.mapreduce.VectorSumCombiner;
 import org.apache.mahout.math.DenseVector;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -58,6 +59,7 @@ import org.apache.mahout.math.VarIntWrit
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -243,19 +245,6 @@ public class ParallelALSFactorizationJob
     }
   }
 
-  static class VectorSumCombiner
-      extends Reducer<WritableComparable<?>, VectorWritable, 
WritableComparable<?>, VectorWritable> {
-
-    private final VectorWritable result = new VectorWritable();
-
-    @Override
-    protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> 
values, Context ctx)
-      throws IOException, InterruptedException {
-      result.set(ALS.sum(values.iterator()));
-      ctx.write(key, result);
-    }
-  }
-
   static class VectorSumReducer
       extends Reducer<WritableComparable<?>, VectorWritable, 
WritableComparable<?>, VectorWritable> {
 
@@ -264,7 +253,7 @@ public class ParallelALSFactorizationJob
     @Override
     protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> 
values, Context ctx)
       throws IOException, InterruptedException {
-      Vector sum = ALS.sum(values.iterator());
+      Vector sum = Vectors.sum(values.iterator());
       result.set(new SequentialAccessSparseVector(sum));
       ctx.write(key, result);
     }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
 Tue Jul 30 05:15:22 2013
@@ -94,7 +94,7 @@ public final class RecommenderJob extend
   public static final String BOOLEAN_DATA = "booleanData";
 
   private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
-  private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
+  private static final int DEFAULT_MAX_PREFS = 500;
   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
 
   @Override
@@ -116,14 +116,15 @@ public final class RecommenderJob extend
             + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
     addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities 
considered per item ",
             String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
-    addOption("maxPrefsPerUserInItemSimilarity", "mppuiis", "max number of 
preferences to consider per user in the " 
-            + "item similarity computation phase, users with more preferences 
will be sampled down (default: "
-        + DEFAULT_MAX_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+    addOption("maxPrefsInItemSimilarity", "mpiis", "max number of preferences 
to consider per user or item in the "
+            + "item similarity computation phase, users or items with more 
preferences will be sampled down (default: "
+        + DEFAULT_MAX_PREFS + ')', String.valueOf(DEFAULT_MAX_PREFS));
     addOption("similarityClassname", "s", "Name of distributed similarity 
measures class to instantiate, " 
             + "alternatively use one of the predefined similarities (" + 
VectorSimilarityMeasures.list() + ')', true);
     addOption("threshold", "tr", "discard item pairs with a similarity value 
below this", false);
     addOption("outputPathForSimilarityMatrix", "opfsm", "write the item 
similarity matrix to this path (optional)",
         false);
+    addOption("randomSeed", null, "use this seed for sampling", false);
 
     Map<String, List<String>> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
@@ -138,11 +139,13 @@ public final class RecommenderJob extend
     boolean booleanData = Boolean.valueOf(getOption("booleanData"));
     int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
     int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
-    int maxPrefsPerUserInItemSimilarity = 
Integer.parseInt(getOption("maxPrefsPerUserInItemSimilarity"));
+    int maxPrefsInItemSimilarity = 
Integer.parseInt(getOption("maxPrefsInItemSimilarity"));
     int maxSimilaritiesPerItem = 
Integer.parseInt(getOption("maxSimilaritiesPerItem"));
     String similarityClassname = getOption("similarityClassname");
     double threshold = hasOption("threshold")
         ? Double.parseDouble(getOption("threshold")) : 
RowSimilarityJob.NO_THRESHOLD;
+    long randomSeed = hasOption("randomSeed")
+        ? Long.parseLong(getOption("randomSeed")) : 
RowSimilarityJob.NO_FIXED_RANDOM_SEED;
 
 
     Path prepPath = getTempPath("preparePreferenceMatrix");
@@ -158,7 +161,6 @@ public final class RecommenderJob extend
       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
         "--input", getInputPath().toString(),
         "--output", prepPath.toString(),
-        "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
         "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
         "--booleanData", String.valueOf(booleanData),
         "--tempDir", getTempPath().toString(),
@@ -176,17 +178,18 @@ public final class RecommenderJob extend
                 PathType.LIST, null, getConf());
       }
 
-      /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should 
refactor this call to something like
-       * new DistributedRowMatrix(...).rowSimilarity(...) */
       //calculate the co-occurrence matrix
       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
         "--input", new Path(prepPath, 
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
         "--output", similarityMatrixPath.toString(),
         "--numberOfColumns", String.valueOf(numberOfUsers),
         "--similarityClassname", similarityClassname,
+        "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
+        "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
         "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
         "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
         "--threshold", String.valueOf(threshold),
+        "--randomSeed", String.valueOf(randomSeed),
         "--tempDir", getTempPath().toString(),
       });
 

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
 Tue Jul 30 05:15:22 2013
@@ -56,8 +56,6 @@ public class PreparePreferenceMatrixJob 
 
     addInputOption();
     addOutputOption();
-    addOption("maxPrefsPerUser", "mppu", "max number of preferences to 
consider per user, " 
-            + "users with more preferences will be sampled down");
     addOption("minPrefsPerUser", "mp", "ignore users with less preferences 
than this "
             + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
     addOption("booleanData", "b", "Treat input as without pref values", 
Boolean.FALSE.toString());
@@ -107,12 +105,6 @@ public class PreparePreferenceMatrixJob 
             IntWritable.class, VectorWritable.class);
     toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
 
-    /* configure sampling regarding the uservectors */
-    if (hasOption("maxPrefsPerUser")) {
-      int samplingSize = Integer.parseInt(getOption("maxPrefsPerUser"));
-      toItemVectors.getConfiguration().setInt(ToItemVectorsMapper.SAMPLE_SIZE, 
samplingSize);
-    }
-
     succeeded = toItemVectors.waitForCompletion(true);
     if (!succeeded) {
       return -1;

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
 Tue Jul 30 05:15:22 2013
@@ -24,38 +24,20 @@ import org.apache.mahout.math.RandomAcce
 import org.apache.mahout.math.VarLongWritable;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
 
 import java.io.IOException;
 
 public class ToItemVectorsMapper
     extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
 
-  public static final String SAMPLE_SIZE = ToItemVectorsMapper.class + 
".sampleSize";
-
-  enum Elements {
-    USER_RATINGS_USED, USER_RATINGS_NEGLECTED
-  }
-
   private final IntWritable itemID = new IntWritable();
   private final VectorWritable itemVectorWritable = new VectorWritable();
 
-  private int sampleSize;
-
-  @Override
-  protected void setup(Context ctx) throws IOException, InterruptedException {
-    sampleSize = ctx.getConfiguration().getInt(SAMPLE_SIZE, Integer.MAX_VALUE);
-  }
-
   @Override
   protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, 
Context ctx)
     throws IOException, InterruptedException {
     Vector userRatings = vectorWritable.get();
 
-    int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
-    userRatings = Vectors.maybeSample(userRatings, sampleSize);
-    int numElementsAfterSampling = userRatings.getNumNondefaultElements();
-
     int column = TasteHadoopUtils.idToIndex(rowIndex.get());
 
     itemVectorWritable.setWritesLaxPrecision(true);
@@ -69,9 +51,6 @@ public class ToItemVectorsMapper
       // reset vector for reuse
       itemVector.setQuick(elem.index(), 0.0);
     }
-
-    
ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
-    
ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling
 - numElementsAfterSampling);
   }
 
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
 Tue Jul 30 05:15:22 2013
@@ -86,7 +86,7 @@ public final class ItemSimilarityJob ext
   public static final String MAX_SIMILARITIES_PER_ITEM = 
ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
 
   private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
-  private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
+  private static final int DEFAULT_MAX_PREFS = 500;
   private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
 
   public static void main(String[] args) throws Exception {
@@ -103,13 +103,14 @@ public final class ItemSimilarityJob ext
     addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar 
items per item to this number "
         + "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')',
         String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
-    addOption("maxPrefsPerUser", "mppu", "max number of preferences to 
consider per user, " 
-        + "users with more preferences will be sampled down (default: " + 
DEFAULT_MAX_PREFS_PER_USER + ')',
-        String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+    addOption("maxPrefs", "mppu", "max number of preferences to consider per 
user or item, " 
+        + "users or items with more preferences will be sampled down (default: 
" + DEFAULT_MAX_PREFS + ')',
+        String.valueOf(DEFAULT_MAX_PREFS));
     addOption("minPrefsPerUser", "mp", "ignore users with less preferences 
than this "
         + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
     addOption("booleanData", "b", "Treat input as without pref values", 
String.valueOf(Boolean.FALSE));
     addOption("threshold", "tr", "discard item pairs with a similarity value 
below this", false);
+    addOption("randomSeed", null, "use this seed for sampling", false);
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
     if (parsedArgs == null) {
@@ -118,12 +119,14 @@ public final class ItemSimilarityJob ext
 
     String similarityClassName = getOption("similarityClassname");
     int maxSimilarItemsPerItem = 
Integer.parseInt(getOption("maxSimilaritiesPerItem"));
-    int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
+    int maxPrefs = Integer.parseInt(getOption("maxPrefs"));
     int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
     boolean booleanData = Boolean.valueOf(getOption("booleanData"));
 
     double threshold = hasOption("threshold")
         ? Double.parseDouble(getOption("threshold")) : 
RowSimilarityJob.NO_THRESHOLD;
+    long randomSeed = hasOption("randomSeed")
+        ? Long.parseLong(getOption("randomSeed")) : 
RowSimilarityJob.NO_FIXED_RANDOM_SEED;
 
     Path similarityMatrixPath = getTempPath("similarityMatrix");
     Path prepPath = getTempPath("prepareRatingMatrix");
@@ -134,7 +137,6 @@ public final class ItemSimilarityJob ext
       ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[] 
{
         "--input", getInputPath().toString(),
         "--output", prepPath.toString(),
-        "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),
         "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
         "--booleanData", String.valueOf(booleanData),
         "--tempDir", getTempPath().toString(),
@@ -142,17 +144,19 @@ public final class ItemSimilarityJob ext
     }
 
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-      int numberOfUsers = HadoopUtil.readInt(new Path(prepPath, 
PreparePreferenceMatrixJob.NUM_USERS),
-          getConf());
+      int numberOfUsers = HadoopUtil.readInt(new Path(prepPath, 
PreparePreferenceMatrixJob.NUM_USERS), getConf());
 
       ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
         "--input", new Path(prepPath, 
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
         "--output", similarityMatrixPath.toString(),
         "--numberOfColumns", String.valueOf(numberOfUsers),
         "--similarityClassname", similarityClassName,
+        "--maxObservationsPerRow", String.valueOf(maxPrefs),
+        "--maxObservationsPerColumn", String.valueOf(maxPrefs),
         "--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem),
         "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
         "--threshold", String.valueOf(threshold),
+        "--randomSeed", String.valueOf(randomSeed),
         "--tempDir", getTempPath().toString(),
       });
     }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
 Tue Jul 30 05:15:22 2013
@@ -19,9 +19,8 @@ package org.apache.mahout.common.mapredu
 
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
 
 import java.io.IOException;
 
@@ -31,14 +30,6 @@ public class VectorSumReducer
   @Override
   protected void reduce(WritableComparable<?> key, Iterable<VectorWritable> 
values, Context ctx)
     throws IOException, InterruptedException {
-    Vector vector = null;
-    for (VectorWritable v : values) {
-      if (vector == null) {
-        vector = v.get();
-      } else {
-        vector.assign(v.get(), Functions.PLUS);
-      }
-    }
-    ctx.write(key, new VectorWritable(vector));
+    ctx.write(key, new VectorWritable(Vectors.sum(values.iterator())));
   }
 }

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
 Tue Jul 30 05:15:22 2013
@@ -22,6 +22,7 @@ import com.google.common.primitives.Ints
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -29,7 +30,9 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.ClassUtils;
 import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.RandomUtils;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.mapreduce.VectorSumCombiner;
 import org.apache.mahout.common.mapreduce.VectorSumReducer;
 import org.apache.mahout.math.RandomAccessSparseVector;
 import org.apache.mahout.math.Vector;
@@ -45,11 +48,13 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class RowSimilarityJob extends AbstractJob {
 
   public static final double NO_THRESHOLD = Double.MIN_VALUE;
+  public static final long NO_FIXED_RANDOM_SEED = Long.MIN_VALUE;
 
   private static final String SIMILARITY_CLASSNAME = RowSimilarityJob.class + 
".distributedSimilarityClassname";
   private static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class + 
".numberOfColumns";
@@ -63,17 +68,25 @@ public class RowSimilarityJob extends Ab
   private static final String NUM_NON_ZERO_ENTRIES_PATH = 
RowSimilarityJob.class + ".nonZeroEntriesPath";
   private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
 
+  private static final String OBSERVATIONS_PER_COLUMN_PATH = 
RowSimilarityJob.class + ".observationsPerColumnPath";
+
+  private static final String MAX_OBSERVATIONS_PER_ROW = 
RowSimilarityJob.class + ".maxObservationsPerRow";
+  private static final String MAX_OBSERVATIONS_PER_COLUMN = 
RowSimilarityJob.class + ".maxObservationsPerColumn";
+  private static final String RANDOM_SEED = RowSimilarityJob.class + 
".randomSeed";
+
+  private static final int DEFAULT_MAX_OBSERVATIONS_PER_ROW = 500;
+  private static final int DEFAULT_MAX_OBSERVATIONS_PER_COLUMN = 500;
+
   private static final int NORM_VECTOR_MARKER = Integer.MIN_VALUE;
   private static final int MAXVALUE_VECTOR_MARKER = Integer.MIN_VALUE + 1;
   private static final int NUM_NON_ZERO_ENTRIES_VECTOR_MARKER = 
Integer.MIN_VALUE + 2;
 
-  enum Counters { ROWS, COOCCURRENCES, PRUNED_COOCCURRENCES }
+  enum Counters { ROWS, USED_OBSERVATIONS, NEGLECTED_OBSERVATIONS, 
COOCCURRENCES, PRUNED_COOCCURRENCES }
 
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new RowSimilarityJob(), args);
   }
 
-
   @Override
   public int run(String[] args) throws Exception {
 
@@ -86,6 +99,11 @@ public class RowSimilarityJob extends Ab
         + DEFAULT_MAX_SIMILARITIES_PER_ROW + ')', 
String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
     addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to 
themselves?", String.valueOf(false));
     addOption("threshold", "tr", "discard row pairs with a similarity value 
below this", false);
+    addOption("maxObservationsPerRow", null, "sample rows down to this number 
of entries",
+        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW));
+    addOption("maxObservationsPerColumn", null, "sample columns down to this 
number of entries",
+        String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN));
+    addOption("randomSeed", null, "use this seed for sampling", false);
     addOption(DefaultOptionCreator.overwriteOption().create());
 
     Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -123,6 +141,11 @@ public class RowSimilarityJob extends Ab
     boolean excludeSelfSimilarity = 
Boolean.parseBoolean(getOption("excludeSelfSimilarity"));
     double threshold = hasOption("threshold")
         ? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD;
+    long randomSeed = hasOption("randomSeed")
+        ? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED;
+
+    int maxObservationsPerRow = 
Integer.parseInt(getOption("maxObservationsPerRow"));
+    int maxObservationsPerColumn = 
Integer.parseInt(getOption("maxObservationsPerColumn"));
 
     Path weightsPath = getTempPath("weights");
     Path normsPath = getTempPath("norms.bin");
@@ -130,8 +153,18 @@ public class RowSimilarityJob extends Ab
     Path maxValuesPath = getTempPath("maxValues.bin");
     Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");
 
+    Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin");
+
     AtomicInteger currentPhase = new AtomicInteger();
 
+    Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"), 
CountObservationsMapper.class,
+        NullWritable.class, VectorWritable.class, 
SumObservationsReducer.class, NullWritable.class,
+        VectorWritable.class);
+    countObservations.setCombinerClass(VectorSumCombiner.class);
+    countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH, 
observationsPerColumnPath.toString());
+    countObservations.setNumReduceTasks(1);
+    countObservations.waitForCompletion(true);
+
     if (shouldRunNextPhase(parsedArgs, currentPhase)) {
       Job normsAndTranspose = prepareJob(getInputPath(), weightsPath, 
VectorNormMapper.class, IntWritable.class,
           VectorWritable.class, MergeVectorsReducer.class, IntWritable.class, 
VectorWritable.class);
@@ -142,6 +175,11 @@ public class RowSimilarityJob extends Ab
       normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH, 
numNonZeroEntriesPath.toString());
       normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
       normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+      normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH, 
observationsPerColumnPath.toString());
+      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW, 
String.valueOf(maxObservationsPerRow));
+      normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN, 
String.valueOf(maxObservationsPerColumn));
+      normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));
+
       boolean succeeded = normsAndTranspose.waitForCompletion(true);
       if (!succeeded) {
         return -1;
@@ -181,6 +219,35 @@ public class RowSimilarityJob extends Ab
     return 0;
   }
 
+  public static class CountObservationsMapper extends 
Mapper<IntWritable,VectorWritable,NullWritable,VectorWritable> {
+
+    private Vector columnCounts = new 
RandomAccessSparseVector(Integer.MAX_VALUE);
+
+    @Override
+    protected void map(IntWritable rowIndex, VectorWritable rowVectorWritable, 
Context ctx)
+      throws IOException, InterruptedException {
+
+      Vector row = rowVectorWritable.get();
+      for (Vector.Element elem : row.nonZeroes()) {
+        columnCounts.setQuick(elem.index(), 
columnCounts.getQuick(elem.index()) + 1);
+      }
+    }
+
+    @Override
+    protected void cleanup(Context ctx) throws IOException, 
InterruptedException {
+      ctx.write(NullWritable.get(), new VectorWritable(columnCounts));
+    }
+  }
+
+  public static class SumObservationsReducer extends 
Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+    @Override
+    protected void reduce(NullWritable nullWritable, Iterable<VectorWritable> 
partialVectors, Context ctx)
+    throws IOException, InterruptedException {
+      Vector counts = Vectors.sum(partialVectors.iterator());
+      Vectors.write(counts, new 
Path(ctx.getConfiguration().get(OBSERVATIONS_PER_COLUMN_PATH)), 
ctx.getConfiguration());
+    }
+  }
+
   public static class VectorNormMapper extends 
Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
 
     private VectorSimilarityMeasure similarity;
@@ -189,21 +256,71 @@ public class RowSimilarityJob extends Ab
     private Vector maxValues;
     private double threshold;
 
+    private OpenIntIntHashMap observationsPerColumn;
+    private int maxObservationsPerRow;
+    private int maxObservationsPerColumn;
+
+    private Random random;
+
     @Override
     protected void setup(Context ctx) throws IOException, InterruptedException 
{
-      similarity = 
ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
-          VectorSimilarityMeasure.class);
+
+      Configuration conf = ctx.getConfiguration();
+
+      similarity = ClassUtils.instantiateAs(conf.get(SIMILARITY_CLASSNAME), 
VectorSimilarityMeasure.class);
       norms = new RandomAccessSparseVector(Integer.MAX_VALUE);
       nonZeroEntries = new RandomAccessSparseVector(Integer.MAX_VALUE);
       maxValues = new RandomAccessSparseVector(Integer.MAX_VALUE);
-      threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+      threshold = Double.parseDouble(conf.get(THRESHOLD));
+
+      observationsPerColumn = Vectors.readAsIntMap(new 
Path(conf.get(OBSERVATIONS_PER_COLUMN_PATH)), conf);
+      maxObservationsPerRow = conf.getInt(MAX_OBSERVATIONS_PER_ROW, 
DEFAULT_MAX_OBSERVATIONS_PER_ROW);
+      maxObservationsPerColumn = conf.getInt(MAX_OBSERVATIONS_PER_COLUMN, 
DEFAULT_MAX_OBSERVATIONS_PER_COLUMN);
+
+      long seed = Long.parseLong(conf.get(RANDOM_SEED));
+      if (seed == NO_FIXED_RANDOM_SEED) {
+        random = RandomUtils.getRandom();
+      } else {
+        random = RandomUtils.getRandom(seed);
+      }
+    }
+
+    private Vector sampleDown(Vector rowVector, Context ctx) {
+
+      int observationsPerRow = rowVector.getNumNondefaultElements();
+      double rowSampleRate = Math.min(maxObservationsPerRow, 
observationsPerRow) / observationsPerRow;
+
+      Vector downsampledRow = rowVector.like();
+      long usedObservations = 0;
+      long neglectedObservations = 0;
+
+      for (Vector.Element elem : rowVector.nonZeroes()) {
+
+        int columnCount = observationsPerColumn.get(elem.index());
+        double columnSampleRate = Math.min(maxObservationsPerColumn, 
columnCount) / columnCount;
+
+        if (random.nextDouble() <= Math.min(rowSampleRate, columnSampleRate)) {
+          downsampledRow.setQuick(elem.index(), elem.get());
+          usedObservations++;
+        } else {
+          neglectedObservations++;
+        }
+
+      }
+
+      ctx.getCounter(Counters.USED_OBSERVATIONS).increment(usedObservations);
+      
ctx.getCounter(Counters.NEGLECTED_OBSERVATIONS).increment(neglectedObservations);
+
+      return downsampledRow;
     }
 
     @Override
     protected void map(IntWritable row, VectorWritable vectorWritable, Context 
ctx)
       throws IOException, InterruptedException {
 
-      Vector rowVector = similarity.normalize(vectorWritable.get());
+      Vector sampledRowVector = sampleDown(vectorWritable.get(), ctx);
+
+      Vector rowVector = similarity.normalize(sampledRowVector);
 
       int numNonZeroEntries = 0;
       double maxValue = Double.MIN_VALUE;
@@ -230,8 +347,6 @@ public class RowSimilarityJob extends Ab
 
     @Override
     protected void cleanup(Context ctx) throws IOException, 
InterruptedException {
-      super.cleanup(ctx);
-      // dirty trick
       ctx.write(new IntWritable(NORM_VECTOR_MARKER), new 
VectorWritable(norms));
       ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new 
VectorWritable(nonZeroEntries));
       ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new 
VectorWritable(maxValues));

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
 (original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
 Tue Jul 30 05:15:22 2013
@@ -34,6 +34,7 @@ import org.apache.mahout.math.Varint;
 import org.apache.mahout.math.Vector;
 import org.apache.mahout.math.Vector.Element;
 import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
 import org.apache.mahout.math.map.OpenIntIntHashMap;
 
 public final class Vectors {
@@ -91,6 +92,14 @@ public final class Vectors {
     return accumulator;
   }
 
+  public static Vector sum(Iterator<VectorWritable> vectors) {
+    Vector sum = vectors.next().get();
+    while (vectors.hasNext()) {
+      sum.assign(vectors.next().get(), Functions.PLUS);
+    }
+    return sum;
+  }
+
   static class TemporaryElement implements Vector.Element {
 
     private final int index;

Modified: 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
 (original)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
 Tue Jul 30 05:15:22 2013
@@ -24,6 +24,7 @@ import org.apache.mahout.common.MahoutTe
 import org.apache.mahout.math.Matrix;
 import org.apache.mahout.math.hadoop.MathHelper;
 import 
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.TanimotoCoefficientSimilarity;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
 import org.junit.Test;
 
 import java.io.File;
@@ -73,6 +74,15 @@ public class RowSimilarityJobTest extend
         "--numberOfColumns", String.valueOf(5), "--similarityClassname", 
TanimotoCoefficientSimilarity.class.getName(),
         "--tempDir", tmpDir.getAbsolutePath() });
 
+
+    OpenIntIntHashMap observationsPerColumn =
+        Vectors.readAsIntMap(new Path(tmpDir.getAbsolutePath(), 
"observationsPerColumn.bin"), conf);
+    assertEquals(4, observationsPerColumn.size());
+    assertEquals(1, observationsPerColumn.get(0));
+    assertEquals(2, observationsPerColumn.get(2));
+    assertEquals(2, observationsPerColumn.get(3));
+    assertEquals(1, observationsPerColumn.get(4));
+
     Matrix similarityMatrix = MathHelper.readMatrix(conf, new 
Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
 
     assertNotNull(similarityMatrix);


Reply via email to