Author: ssc
Date: Tue Jul 30 05:15:22 2013
New Revision: 1508302
URL: http://svn.apache.org/r1508302
Log:
MAHOUT-1289 Move downsampling code into RowSimilarityJob
Modified:
mahout/trunk/CHANGELOG
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
Modified: mahout/trunk/CHANGELOG
URL:
http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Tue Jul 30 05:15:22 2013
@@ -2,6 +2,8 @@ Mahout Change Log
Release 0.9 - unreleased
+ MAHOUT-1289: Move downsampling code into RowSimilarityJob (ssc)
+
MAHOUT-1296: Remove deprecated algorithms (ssc)
MAHOUT-1295: Excluded all Maven's target directories from distribution
archives (sslavic)
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ALS.java
Tue Jul 30 05:15:22 2013
@@ -52,19 +52,6 @@ final class ALS {
return iterator.hasNext() ? iterator.next().get() : null;
}
- /**
- * assumes that first entry always exists
- *
- * @param vectors
- */
- public static Vector sum(Iterator<VectorWritable> vectors) {
- Vector sum = vectors.next().get();
- while (vectors.hasNext()) {
- sum.assign(vectors.next().get(), Functions.PLUS);
- }
- return sum;
- }
-
public static OpenIntObjectHashMap<Vector>
readMatrixByRowsFromDistributedCache(int numEntities,
Configuration conf) throws IOException {
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
Tue Jul 30 05:15:22 2013
@@ -51,6 +51,7 @@ import org.apache.mahout.common.iterator
import org.apache.mahout.common.mapreduce.MergeVectorsCombiner;
import org.apache.mahout.common.mapreduce.MergeVectorsReducer;
import org.apache.mahout.common.mapreduce.TransposeMapper;
+import org.apache.mahout.common.mapreduce.VectorSumCombiner;
import org.apache.mahout.math.DenseVector;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.SequentialAccessSparseVector;
@@ -58,6 +59,7 @@ import org.apache.mahout.math.VarIntWrit
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -243,19 +245,6 @@ public class ParallelALSFactorizationJob
}
}
- static class VectorSumCombiner
- extends Reducer<WritableComparable<?>, VectorWritable,
WritableComparable<?>, VectorWritable> {
-
- private final VectorWritable result = new VectorWritable();
-
- @Override
- protected void reduce(WritableComparable<?> key, Iterable<VectorWritable>
values, Context ctx)
- throws IOException, InterruptedException {
- result.set(ALS.sum(values.iterator()));
- ctx.write(key, result);
- }
- }
-
static class VectorSumReducer
extends Reducer<WritableComparable<?>, VectorWritable,
WritableComparable<?>, VectorWritable> {
@@ -264,7 +253,7 @@ public class ParallelALSFactorizationJob
@Override
protected void reduce(WritableComparable<?> key, Iterable<VectorWritable>
values, Context ctx)
throws IOException, InterruptedException {
- Vector sum = ALS.sum(values.iterator());
+ Vector sum = Vectors.sum(values.iterator());
result.set(new SequentialAccessSparseVector(sum));
ctx.write(key, result);
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Tue Jul 30 05:15:22 2013
@@ -94,7 +94,7 @@ public final class RecommenderJob extend
public static final String BOOLEAN_DATA = "booleanData";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
- private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
+ private static final int DEFAULT_MAX_PREFS = 500;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@Override
@@ -116,14 +116,15 @@ public final class RecommenderJob extend
+ "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities
considered per item ",
String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
- addOption("maxPrefsPerUserInItemSimilarity", "mppuiis", "max number of
preferences to consider per user in the "
- + "item similarity computation phase, users with more preferences
will be sampled down (default: "
- + DEFAULT_MAX_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+ addOption("maxPrefsInItemSimilarity", "mpiis", "max number of preferences
to consider per user or item in the "
+ + "item similarity computation phase, users or items with more
preferences will be sampled down (default: "
+ + DEFAULT_MAX_PREFS + ')', String.valueOf(DEFAULT_MAX_PREFS));
addOption("similarityClassname", "s", "Name of distributed similarity
measures class to instantiate, "
+ "alternatively use one of the predefined similarities (" +
VectorSimilarityMeasures.list() + ')', true);
addOption("threshold", "tr", "discard item pairs with a similarity value
below this", false);
addOption("outputPathForSimilarityMatrix", "opfsm", "write the item
similarity matrix to this path (optional)",
false);
+ addOption("randomSeed", null, "use this seed for sampling", false);
Map<String, List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -138,11 +139,13 @@ public final class RecommenderJob extend
boolean booleanData = Boolean.valueOf(getOption("booleanData"));
int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
- int maxPrefsPerUserInItemSimilarity =
Integer.parseInt(getOption("maxPrefsPerUserInItemSimilarity"));
+ int maxPrefsInItemSimilarity =
Integer.parseInt(getOption("maxPrefsInItemSimilarity"));
int maxSimilaritiesPerItem =
Integer.parseInt(getOption("maxSimilaritiesPerItem"));
String similarityClassname = getOption("similarityClassname");
double threshold = hasOption("threshold")
? Double.parseDouble(getOption("threshold")) :
RowSimilarityJob.NO_THRESHOLD;
+ long randomSeed = hasOption("randomSeed")
+ ? Long.parseLong(getOption("randomSeed")) :
RowSimilarityJob.NO_FIXED_RANDOM_SEED;
Path prepPath = getTempPath("preparePreferenceMatrix");
@@ -158,7 +161,6 @@ public final class RecommenderJob extend
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
- "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString(),
@@ -176,17 +178,18 @@ public final class RecommenderJob extend
PathType.LIST, null, getConf());
}
- /* Once DistributedRowMatrix uses the hadoop 0.20 API, we should
refactor this call to something like
- * new DistributedRowMatrix(...).rowSimilarity(...) */
//calculate the co-occurrence matrix
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
"--input", new Path(prepPath,
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassname,
+ "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
+ "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
"--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
+ "--randomSeed", String.valueOf(randomSeed),
"--tempDir", getTempPath().toString(),
});
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
Tue Jul 30 05:15:22 2013
@@ -56,8 +56,6 @@ public class PreparePreferenceMatrixJob
addInputOption();
addOutputOption();
- addOption("maxPrefsPerUser", "mppu", "max number of preferences to
consider per user, "
- + "users with more preferences will be sampled down");
addOption("minPrefsPerUser", "mp", "ignore users with less preferences
than this "
+ "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
addOption("booleanData", "b", "Treat input as without pref values",
Boolean.FALSE.toString());
@@ -107,12 +105,6 @@ public class PreparePreferenceMatrixJob
IntWritable.class, VectorWritable.class);
toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
- /* configure sampling regarding the uservectors */
- if (hasOption("maxPrefsPerUser")) {
- int samplingSize = Integer.parseInt(getOption("maxPrefsPerUser"));
- toItemVectors.getConfiguration().setInt(ToItemVectorsMapper.SAMPLE_SIZE,
samplingSize);
- }
-
succeeded = toItemVectors.waitForCompletion(true);
if (!succeeded) {
return -1;
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
Tue Jul 30 05:15:22 2013
@@ -24,38 +24,20 @@ import org.apache.mahout.math.RandomAcce
import org.apache.mahout.math.VarLongWritable;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
import java.io.IOException;
public class ToItemVectorsMapper
extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
- public static final String SAMPLE_SIZE = ToItemVectorsMapper.class +
".sampleSize";
-
- enum Elements {
- USER_RATINGS_USED, USER_RATINGS_NEGLECTED
- }
-
private final IntWritable itemID = new IntWritable();
private final VectorWritable itemVectorWritable = new VectorWritable();
- private int sampleSize;
-
- @Override
- protected void setup(Context ctx) throws IOException, InterruptedException {
- sampleSize = ctx.getConfiguration().getInt(SAMPLE_SIZE, Integer.MAX_VALUE);
- }
-
@Override
protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable,
Context ctx)
throws IOException, InterruptedException {
Vector userRatings = vectorWritable.get();
- int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
- userRatings = Vectors.maybeSample(userRatings, sampleSize);
- int numElementsAfterSampling = userRatings.getNumNondefaultElements();
-
int column = TasteHadoopUtils.idToIndex(rowIndex.get());
itemVectorWritable.setWritesLaxPrecision(true);
@@ -69,9 +51,6 @@ public class ToItemVectorsMapper
// reset vector for reuse
itemVector.setQuick(elem.index(), 0.0);
}
-
-
ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
-
ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling
- numElementsAfterSampling);
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java
Tue Jul 30 05:15:22 2013
@@ -86,7 +86,7 @@ public final class ItemSimilarityJob ext
public static final String MAX_SIMILARITIES_PER_ITEM =
ItemSimilarityJob.class.getName() + ".maxSimilarItemsPerItem";
private static final int DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM = 100;
- private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
+ private static final int DEFAULT_MAX_PREFS = 500;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
public static void main(String[] args) throws Exception {
@@ -103,13 +103,14 @@ public final class ItemSimilarityJob ext
addOption("maxSimilaritiesPerItem", "m", "try to cap the number of similar
items per item to this number "
+ "(default: " + DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM + ')',
String.valueOf(DEFAULT_MAX_SIMILAR_ITEMS_PER_ITEM));
- addOption("maxPrefsPerUser", "mppu", "max number of preferences to
consider per user, "
- + "users with more preferences will be sampled down (default: " +
DEFAULT_MAX_PREFS_PER_USER + ')',
- String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+ addOption("maxPrefs", "mppu", "max number of preferences to consider per
user or item, "
+ + "users or items with more preferences will be sampled down (default:
" + DEFAULT_MAX_PREFS + ')',
+ String.valueOf(DEFAULT_MAX_PREFS));
addOption("minPrefsPerUser", "mp", "ignore users with less preferences
than this "
+ "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
addOption("booleanData", "b", "Treat input as without pref values",
String.valueOf(Boolean.FALSE));
addOption("threshold", "tr", "discard item pairs with a similarity value
below this", false);
+ addOption("randomSeed", null, "use this seed for sampling", false);
Map<String,List<String>> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
@@ -118,12 +119,14 @@ public final class ItemSimilarityJob ext
String similarityClassName = getOption("similarityClassname");
int maxSimilarItemsPerItem =
Integer.parseInt(getOption("maxSimilaritiesPerItem"));
- int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
+ int maxPrefs = Integer.parseInt(getOption("maxPrefs"));
int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
boolean booleanData = Boolean.valueOf(getOption("booleanData"));
double threshold = hasOption("threshold")
? Double.parseDouble(getOption("threshold")) :
RowSimilarityJob.NO_THRESHOLD;
+ long randomSeed = hasOption("randomSeed")
+ ? Long.parseLong(getOption("randomSeed")) :
RowSimilarityJob.NO_FIXED_RANDOM_SEED;
Path similarityMatrixPath = getTempPath("similarityMatrix");
Path prepPath = getTempPath("prepareRatingMatrix");
@@ -134,7 +137,6 @@ public final class ItemSimilarityJob ext
ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]
{
"--input", getInputPath().toString(),
"--output", prepPath.toString(),
- "--maxPrefsPerUser", String.valueOf(maxPrefsPerUser),
"--minPrefsPerUser", String.valueOf(minPrefsPerUser),
"--booleanData", String.valueOf(booleanData),
"--tempDir", getTempPath().toString(),
@@ -142,17 +144,19 @@ public final class ItemSimilarityJob ext
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- int numberOfUsers = HadoopUtil.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS),
- getConf());
+ int numberOfUsers = HadoopUtil.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS), getConf());
ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
"--input", new Path(prepPath,
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
"--output", similarityMatrixPath.toString(),
"--numberOfColumns", String.valueOf(numberOfUsers),
"--similarityClassname", similarityClassName,
+ "--maxObservationsPerRow", String.valueOf(maxPrefs),
+ "--maxObservationsPerColumn", String.valueOf(maxPrefs),
"--maxSimilaritiesPerRow", String.valueOf(maxSimilarItemsPerItem),
"--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
"--threshold", String.valueOf(threshold),
+ "--randomSeed", String.valueOf(randomSeed),
"--tempDir", getTempPath().toString(),
});
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/common/mapreduce/VectorSumReducer.java
Tue Jul 30 05:15:22 2013
@@ -19,9 +19,8 @@ package org.apache.mahout.common.mapredu
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.mahout.math.Vector;
import org.apache.mahout.math.VectorWritable;
-import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.Vectors;
import java.io.IOException;
@@ -31,14 +30,6 @@ public class VectorSumReducer
@Override
protected void reduce(WritableComparable<?> key, Iterable<VectorWritable>
values, Context ctx)
throws IOException, InterruptedException {
- Vector vector = null;
- for (VectorWritable v : values) {
- if (vector == null) {
- vector = v.get();
- } else {
- vector.assign(v.get(), Functions.PLUS);
- }
- }
- ctx.write(key, new VectorWritable(vector));
+ ctx.write(key, new VectorWritable(Vectors.sum(values.iterator())));
}
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJob.java
Tue Jul 30 05:15:22 2013
@@ -22,6 +22,7 @@ import com.google.common.primitives.Ints
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
@@ -29,7 +30,9 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.ClassUtils;
import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.RandomUtils;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
+import org.apache.mahout.common.mapreduce.VectorSumCombiner;
import org.apache.mahout.common.mapreduce.VectorSumReducer;
import org.apache.mahout.math.RandomAccessSparseVector;
import org.apache.mahout.math.Vector;
@@ -45,11 +48,13 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
public class RowSimilarityJob extends AbstractJob {
public static final double NO_THRESHOLD = Double.MIN_VALUE;
+ public static final long NO_FIXED_RANDOM_SEED = Long.MIN_VALUE;
private static final String SIMILARITY_CLASSNAME = RowSimilarityJob.class +
".distributedSimilarityClassname";
private static final String NUMBER_OF_COLUMNS = RowSimilarityJob.class +
".numberOfColumns";
@@ -63,17 +68,25 @@ public class RowSimilarityJob extends Ab
private static final String NUM_NON_ZERO_ENTRIES_PATH =
RowSimilarityJob.class + ".nonZeroEntriesPath";
private static final int DEFAULT_MAX_SIMILARITIES_PER_ROW = 100;
+ private static final String OBSERVATIONS_PER_COLUMN_PATH =
RowSimilarityJob.class + ".observationsPerColumnPath";
+
+ private static final String MAX_OBSERVATIONS_PER_ROW =
RowSimilarityJob.class + ".maxObservationsPerRow";
+ private static final String MAX_OBSERVATIONS_PER_COLUMN =
RowSimilarityJob.class + ".maxObservationsPerColumn";
+ private static final String RANDOM_SEED = RowSimilarityJob.class +
".randomSeed";
+
+ private static final int DEFAULT_MAX_OBSERVATIONS_PER_ROW = 500;
+ private static final int DEFAULT_MAX_OBSERVATIONS_PER_COLUMN = 500;
+
private static final int NORM_VECTOR_MARKER = Integer.MIN_VALUE;
private static final int MAXVALUE_VECTOR_MARKER = Integer.MIN_VALUE + 1;
private static final int NUM_NON_ZERO_ENTRIES_VECTOR_MARKER =
Integer.MIN_VALUE + 2;
- enum Counters { ROWS, COOCCURRENCES, PRUNED_COOCCURRENCES }
+ enum Counters { ROWS, USED_OBSERVATIONS, NEGLECTED_OBSERVATIONS,
COOCCURRENCES, PRUNED_COOCCURRENCES }
public static void main(String[] args) throws Exception {
ToolRunner.run(new RowSimilarityJob(), args);
}
-
@Override
public int run(String[] args) throws Exception {
@@ -86,6 +99,11 @@ public class RowSimilarityJob extends Ab
+ DEFAULT_MAX_SIMILARITIES_PER_ROW + ')',
String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ROW));
addOption("excludeSelfSimilarity", "ess", "compute similarity of rows to
themselves?", String.valueOf(false));
addOption("threshold", "tr", "discard row pairs with a similarity value
below this", false);
+ addOption("maxObservationsPerRow", null, "sample rows down to this number
of entries",
+ String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_ROW));
+ addOption("maxObservationsPerColumn", null, "sample columns down to this
number of entries",
+ String.valueOf(DEFAULT_MAX_OBSERVATIONS_PER_COLUMN));
+ addOption("randomSeed", null, "use this seed for sampling", false);
addOption(DefaultOptionCreator.overwriteOption().create());
Map<String,List<String>> parsedArgs = parseArguments(args);
@@ -123,6 +141,11 @@ public class RowSimilarityJob extends Ab
boolean excludeSelfSimilarity =
Boolean.parseBoolean(getOption("excludeSelfSimilarity"));
double threshold = hasOption("threshold")
? Double.parseDouble(getOption("threshold")) : NO_THRESHOLD;
+ long randomSeed = hasOption("randomSeed")
+ ? Long.parseLong(getOption("randomSeed")) : NO_FIXED_RANDOM_SEED;
+
+ int maxObservationsPerRow =
Integer.parseInt(getOption("maxObservationsPerRow"));
+ int maxObservationsPerColumn =
Integer.parseInt(getOption("maxObservationsPerColumn"));
Path weightsPath = getTempPath("weights");
Path normsPath = getTempPath("norms.bin");
@@ -130,8 +153,18 @@ public class RowSimilarityJob extends Ab
Path maxValuesPath = getTempPath("maxValues.bin");
Path pairwiseSimilarityPath = getTempPath("pairwiseSimilarity");
+ Path observationsPerColumnPath = getTempPath("observationsPerColumn.bin");
+
AtomicInteger currentPhase = new AtomicInteger();
+ Job countObservations = prepareJob(getInputPath(), getTempPath("notUsed"),
CountObservationsMapper.class,
+ NullWritable.class, VectorWritable.class,
SumObservationsReducer.class, NullWritable.class,
+ VectorWritable.class);
+ countObservations.setCombinerClass(VectorSumCombiner.class);
+ countObservations.getConfiguration().set(OBSERVATIONS_PER_COLUMN_PATH,
observationsPerColumnPath.toString());
+ countObservations.setNumReduceTasks(1);
+ countObservations.waitForCompletion(true);
+
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job normsAndTranspose = prepareJob(getInputPath(), weightsPath,
VectorNormMapper.class, IntWritable.class,
VectorWritable.class, MergeVectorsReducer.class, IntWritable.class,
VectorWritable.class);
@@ -142,6 +175,11 @@ public class RowSimilarityJob extends Ab
normsAndTransposeConf.set(NUM_NON_ZERO_ENTRIES_PATH,
numNonZeroEntriesPath.toString());
normsAndTransposeConf.set(MAXVALUES_PATH, maxValuesPath.toString());
normsAndTransposeConf.set(SIMILARITY_CLASSNAME, similarityClassname);
+ normsAndTransposeConf.set(OBSERVATIONS_PER_COLUMN_PATH,
observationsPerColumnPath.toString());
+ normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_ROW,
String.valueOf(maxObservationsPerRow));
+ normsAndTransposeConf.set(MAX_OBSERVATIONS_PER_COLUMN,
String.valueOf(maxObservationsPerColumn));
+ normsAndTransposeConf.set(RANDOM_SEED, String.valueOf(randomSeed));
+
boolean succeeded = normsAndTranspose.waitForCompletion(true);
if (!succeeded) {
return -1;
@@ -181,6 +219,35 @@ public class RowSimilarityJob extends Ab
return 0;
}
+ public static class CountObservationsMapper extends
Mapper<IntWritable,VectorWritable,NullWritable,VectorWritable> {
+
+ private Vector columnCounts = new
RandomAccessSparseVector(Integer.MAX_VALUE);
+
+ @Override
+ protected void map(IntWritable rowIndex, VectorWritable rowVectorWritable,
Context ctx)
+ throws IOException, InterruptedException {
+
+ Vector row = rowVectorWritable.get();
+ for (Vector.Element elem : row.nonZeroes()) {
+ columnCounts.setQuick(elem.index(),
columnCounts.getQuick(elem.index()) + 1);
+ }
+ }
+
+ @Override
+ protected void cleanup(Context ctx) throws IOException,
InterruptedException {
+ ctx.write(NullWritable.get(), new VectorWritable(columnCounts));
+ }
+ }
+
+ public static class SumObservationsReducer extends
Reducer<NullWritable,VectorWritable,NullWritable,VectorWritable> {
+ @Override
+ protected void reduce(NullWritable nullWritable, Iterable<VectorWritable>
partialVectors, Context ctx)
+ throws IOException, InterruptedException {
+ Vector counts = Vectors.sum(partialVectors.iterator());
+ Vectors.write(counts, new
Path(ctx.getConfiguration().get(OBSERVATIONS_PER_COLUMN_PATH)),
ctx.getConfiguration());
+ }
+ }
+
public static class VectorNormMapper extends
Mapper<IntWritable,VectorWritable,IntWritable,VectorWritable> {
private VectorSimilarityMeasure similarity;
@@ -189,21 +256,71 @@ public class RowSimilarityJob extends Ab
private Vector maxValues;
private double threshold;
+ private OpenIntIntHashMap observationsPerColumn;
+ private int maxObservationsPerRow;
+ private int maxObservationsPerColumn;
+
+ private Random random;
+
@Override
protected void setup(Context ctx) throws IOException, InterruptedException
{
- similarity =
ClassUtils.instantiateAs(ctx.getConfiguration().get(SIMILARITY_CLASSNAME),
- VectorSimilarityMeasure.class);
+
+ Configuration conf = ctx.getConfiguration();
+
+ similarity = ClassUtils.instantiateAs(conf.get(SIMILARITY_CLASSNAME),
VectorSimilarityMeasure.class);
norms = new RandomAccessSparseVector(Integer.MAX_VALUE);
nonZeroEntries = new RandomAccessSparseVector(Integer.MAX_VALUE);
maxValues = new RandomAccessSparseVector(Integer.MAX_VALUE);
- threshold = Double.parseDouble(ctx.getConfiguration().get(THRESHOLD));
+ threshold = Double.parseDouble(conf.get(THRESHOLD));
+
+ observationsPerColumn = Vectors.readAsIntMap(new
Path(conf.get(OBSERVATIONS_PER_COLUMN_PATH)), conf);
+ maxObservationsPerRow = conf.getInt(MAX_OBSERVATIONS_PER_ROW,
DEFAULT_MAX_OBSERVATIONS_PER_ROW);
+ maxObservationsPerColumn = conf.getInt(MAX_OBSERVATIONS_PER_COLUMN,
DEFAULT_MAX_OBSERVATIONS_PER_COLUMN);
+
+ long seed = Long.parseLong(conf.get(RANDOM_SEED));
+ if (seed == NO_FIXED_RANDOM_SEED) {
+ random = RandomUtils.getRandom();
+ } else {
+ random = RandomUtils.getRandom(seed);
+ }
+ }
+
+ private Vector sampleDown(Vector rowVector, Context ctx) {
+
+ int observationsPerRow = rowVector.getNumNondefaultElements();
+ double rowSampleRate = Math.min(maxObservationsPerRow,
observationsPerRow) / observationsPerRow;
+
+ Vector downsampledRow = rowVector.like();
+ long usedObservations = 0;
+ long neglectedObservations = 0;
+
+ for (Vector.Element elem : rowVector.nonZeroes()) {
+
+ int columnCount = observationsPerColumn.get(elem.index());
+ double columnSampleRate = Math.min(maxObservationsPerColumn,
columnCount) / columnCount;
+
+ if (random.nextDouble() <= Math.min(rowSampleRate, columnSampleRate)) {
+ downsampledRow.setQuick(elem.index(), elem.get());
+ usedObservations++;
+ } else {
+ neglectedObservations++;
+ }
+
+ }
+
+ ctx.getCounter(Counters.USED_OBSERVATIONS).increment(usedObservations);
+
ctx.getCounter(Counters.NEGLECTED_OBSERVATIONS).increment(neglectedObservations);
+
+ return downsampledRow;
}
@Override
protected void map(IntWritable row, VectorWritable vectorWritable, Context
ctx)
throws IOException, InterruptedException {
- Vector rowVector = similarity.normalize(vectorWritable.get());
+ Vector sampledRowVector = sampleDown(vectorWritable.get(), ctx);
+
+ Vector rowVector = similarity.normalize(sampledRowVector);
int numNonZeroEntries = 0;
double maxValue = Double.MIN_VALUE;
@@ -230,8 +347,6 @@ public class RowSimilarityJob extends Ab
@Override
protected void cleanup(Context ctx) throws IOException,
InterruptedException {
- super.cleanup(ctx);
- // dirty trick
ctx.write(new IntWritable(NORM_VECTOR_MARKER), new
VectorWritable(norms));
ctx.write(new IntWritable(NUM_NON_ZERO_ENTRIES_VECTOR_MARKER), new
VectorWritable(nonZeroEntries));
ctx.write(new IntWritable(MAXVALUE_VECTOR_MARKER), new
VectorWritable(maxValues));
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/Vectors.java
Tue Jul 30 05:15:22 2013
@@ -34,6 +34,7 @@ import org.apache.mahout.math.Varint;
import org.apache.mahout.math.Vector;
import org.apache.mahout.math.Vector.Element;
import org.apache.mahout.math.VectorWritable;
+import org.apache.mahout.math.function.Functions;
import org.apache.mahout.math.map.OpenIntIntHashMap;
public final class Vectors {
@@ -91,6 +92,14 @@ public final class Vectors {
return accumulator;
}
+ public static Vector sum(Iterator<VectorWritable> vectors) {
+ Vector sum = vectors.next().get();
+ while (vectors.hasNext()) {
+ sum.assign(vectors.next().get(), Functions.PLUS);
+ }
+ return sum;
+ }
+
static class TemporaryElement implements Vector.Element {
private final int index;
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java?rev=1508302&r1=1508301&r2=1508302&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/math/hadoop/similarity/cooccurrence/RowSimilarityJobTest.java
Tue Jul 30 05:15:22 2013
@@ -24,6 +24,7 @@ import org.apache.mahout.common.MahoutTe
import org.apache.mahout.math.Matrix;
import org.apache.mahout.math.hadoop.MathHelper;
import
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.TanimotoCoefficientSimilarity;
+import org.apache.mahout.math.map.OpenIntIntHashMap;
import org.junit.Test;
import java.io.File;
@@ -73,6 +74,15 @@ public class RowSimilarityJobTest extend
"--numberOfColumns", String.valueOf(5), "--similarityClassname",
TanimotoCoefficientSimilarity.class.getName(),
"--tempDir", tmpDir.getAbsolutePath() });
+
+ OpenIntIntHashMap observationsPerColumn =
+ Vectors.readAsIntMap(new Path(tmpDir.getAbsolutePath(),
"observationsPerColumn.bin"), conf);
+ assertEquals(4, observationsPerColumn.size());
+ assertEquals(1, observationsPerColumn.get(0));
+ assertEquals(2, observationsPerColumn.get(2));
+ assertEquals(2, observationsPerColumn.get(3));
+ assertEquals(1, observationsPerColumn.get(4));
+
Matrix similarityMatrix = MathHelper.readMatrix(conf, new
Path(outputDir.getAbsolutePath(), "part-r-00000"), 3, 3);
assertNotNull(similarityMatrix);