Author: gsingers
Date: Thu Nov 3 19:52:07 2011
New Revision: 1197292
URL: http://svn.apache.org/viewvc?rev=1197292&view=rev
Log:
added some comments
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java?rev=1197292&r1=1197291&r2=1197292&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
Thu Nov 3 19:52:07 2011
@@ -46,26 +46,26 @@ import java.util.regex.Pattern;
/**
* <p>Runs a completely distributed recommender job as a series of
mapreduces.</p>
- *
- * <p>Preferences in the input file should look like {@code
userID,itemID[,preferencevalue]}</p>
- *
+ * <p/>
+ * <p>Preferences in the input file should look like {@code userID, itemID[,
preferencevalue]}</p>
+ * <p/>
* <p>
* Preference value is optional to accommodate applications that have no
notion of a preference value (that is, the user
* simply expresses a preference for an item, but no degree of preference).
* </p>
- *
+ * <p/>
* <p>
* The preference value is assumed to be parseable as a {@code double}. The
user IDs and item IDs are
* parsed as {@code long}s.
* </p>
- *
+ * <p/>
* <p>Command line arguments specific to this class are:</p>
- *
+ * <p/>
* <ol>
* <li>--input(path): Directory containing one or more text files with the
preference data</li>
* <li>--output(path): output path where recommender output should go</li>
* <li>--similarityClassname (classname): Name of vector similarity class to
instantiate or a predefined similarity
- * from {@link
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
+ * from {@link
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
* <li>--usersFile (path): only compute recommendations for user IDs contained
in this file (optional)</li>
* <li>--itemsFile (path): only include item IDs from this file in the
recommendations (optional)</li>
* <li>--filterFile (path): file containing comma-separated userID,itemID
pairs. Used to exclude the item from the
@@ -79,16 +79,16 @@ import java.util.regex.Pattern;
* users with more preferences will be sampled down (1000)</li>
* <li>--threshold (double): discard item pairs with a similarity value below
this</li>
* </ol>
- *
+ * <p/>
* <p>General command line options are documented in {@link AbstractJob}.</p>
- *
+ * <p/>
* <p>Note that because of how Hadoop parses arguments, all "-D" arguments
must appear before all other
* arguments.</p>
*/
public final class RecommenderJob extends AbstractJob {
public static final String BOOLEAN_DATA = "booleanData";
-
+
private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
private static final int DEFAULT_MAX_PREFS_PER_USER = 1000;
private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
@@ -99,27 +99,27 @@ public final class RecommenderJob extend
addInputOption();
addOutputOption();
addOption("numRecommendations", "n", "Number of recommendations per user",
-
String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
+
String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
addOption("usersFile", null, "File of users to recommend for", null);
addOption("itemsFile", null, "File of items to recommend for", null);
addOption("filterFile", "f", "File containing comma-separated
userID,itemID pairs. Used to exclude the item from "
- + "the recommendations for that user (optional)", null);
+ + "the recommendations for that user (optional)", null);
addOption("booleanData", "b", "Treat input as without pref values",
Boolean.FALSE.toString());
addOption("maxPrefsPerUser", "mxp",
- "Maximum number of preferences considered per user in final
recommendation phase",
-
String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
+ "Maximum number of preferences considered per user in final
recommendation phase",
+
String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
addOption("minPrefsPerUser", "mp", "ignore users with less preferences
than this in the similarity computation "
- + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+ + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities
considered per item ",
- String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
+ String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
addOption("maxPrefsPerUserInItemSimilarity", "mppuiis", "max number of
preferences to consider per user in the " +
- "item similarity computation phase, users with more preferences will
be sampled down (default: " +
- DEFAULT_MAX_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
+ "item similarity computation phase, users with more preferences
will be sampled down (default: " +
+ DEFAULT_MAX_PREFS_PER_USER + ')',
String.valueOf(DEFAULT_MAX_PREFS_PER_USER));
addOption("similarityClassname", "s", "Name of distributed similarity
measures class to instantiate, " +
- "alternatively use one of the predefined similarities (" +
VectorSimilarityMeasures.list() + ')', true);
+ "alternatively use one of the predefined similarities (" +
VectorSimilarityMeasures.list() + ')', true);
addOption("threshold", "tr", "discard item pairs with a similarity value
below this", false);
- Map<String,String> parsedArgs = parseArguments(args);
+ Map<String, String> parsedArgs = parseArguments(args);
if (parsedArgs == null) {
return -1;
}
@@ -136,7 +136,7 @@ public final class RecommenderJob extend
int maxSimilaritiesPerItem =
Integer.parseInt(parsedArgs.get("--maxSimilaritiesPerItem"));
String similarityClassname = parsedArgs.get("--similarityClassname");
double threshold = parsedArgs.containsKey("--threshold") ?
- Double.parseDouble(parsedArgs.get("--threshold")) :
RowSimilarityJob.NO_THRESHOLD;
+ Double.parseDouble(parsedArgs.get("--threshold")) :
RowSimilarityJob.NO_THRESHOLD;
Path prepPath = getTempPath("preparePreferenceMatrix");
@@ -151,13 +151,13 @@ public final class RecommenderJob extend
int numberOfUsers = -1;
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
- ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]
{
- "--input", getInputPath().toString(),
- "--output", prepPath.toString(),
- "--maxPrefsPerUser", String.valueOf(maxPrefsPerUserInItemSimilarity),
- "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
- "--booleanData", String.valueOf(booleanData),
- "--tempDir", getTempPath().toString() });
+ ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
+ "--input", getInputPath().toString(),
+ "--output", prepPath.toString(),
+ "--maxPrefsPerUser",
String.valueOf(maxPrefsPerUserInItemSimilarity),
+ "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+ "--booleanData", String.valueOf(booleanData),
+ "--tempDir", getTempPath().toString()});
numberOfUsers = TasteHadoopUtils.readInt(new Path(prepPath,
PreparePreferenceMatrixJob.NUM_USERS), getConf());
}
@@ -168,58 +168,60 @@ public final class RecommenderJob extend
/* special behavior if phase 1 is skipped */
if (numberOfUsers == -1) {
numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath,
PreparePreferenceMatrixJob.USER_VECTORS),
- PathType.LIST, null, getConf());
+ PathType.LIST, null, getConf());
}
/* Once DistributedRowMatrix uses the hadoop 0.20 API, we should
refactor this call to something like
* new DistributedRowMatrix(...).rowSimilarity(...) */
- ToolRunner.run(getConf(), new RowSimilarityJob(), new String[] {
- "--input", new Path(prepPath,
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
- "--output", similarityMatrixPath.toString(),
- "--numberOfColumns", String.valueOf(numberOfUsers),
- "--similarityClassname", similarityClassname,
- "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
- "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
- "--threshold", String.valueOf(threshold),
- "--tempDir", getTempPath().toString() });
+ //calculate the co-occurrence matrix
+ ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
+ "--input", new Path(prepPath,
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+ "--output", similarityMatrixPath.toString(),
+ "--numberOfColumns", String.valueOf(numberOfUsers),
+ "--similarityClassname", similarityClassname,
+ "--maxSimilaritiesPerRow",
String.valueOf(maxSimilaritiesPerItem),
+ "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
+ "--threshold", String.valueOf(threshold),
+ "--tempDir", getTempPath().toString()});
}
+ //start the multiplication of the co-occurrence matrix by the user vectors
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
Job prePartialMultiply1 = prepareJob(
- similarityMatrixPath, prePartialMultiplyPath1,
SequenceFileInputFormat.class,
- SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class,
- Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
+ similarityMatrixPath, prePartialMultiplyPath1,
SequenceFileInputFormat.class,
+ SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class,
VectorOrPrefWritable.class,
+ Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,
+ SequenceFileOutputFormat.class);
prePartialMultiply1.waitForCompletion(true);
-
+ //continue the multiplication
Job prePartialMultiply2 = prepareJob(new Path(prepPath,
PreparePreferenceMatrixJob.USER_VECTORS),
- prePartialMultiplyPath2, SequenceFileInputFormat.class,
UserVectorSplitterMapper.class, VarIntWritable.class,
- VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class,
VectorOrPrefWritable.class,
- SequenceFileOutputFormat.class);
+ prePartialMultiplyPath2, SequenceFileInputFormat.class,
UserVectorSplitterMapper.class, VarIntWritable.class,
+ VectorOrPrefWritable.class, Reducer.class, VarIntWritable.class,
VectorOrPrefWritable.class,
+ SequenceFileOutputFormat.class);
if (usersFile != null) {
prePartialMultiply2.getConfiguration().set(UserVectorSplitterMapper.USERS_FILE,
usersFile);
}
prePartialMultiply2.getConfiguration().setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
- maxPrefsPerUser);
+ maxPrefsPerUser);
prePartialMultiply2.waitForCompletion(true);
-
+ //finish the job
Job partialMultiply = prepareJob(
- new Path(prePartialMultiplyPath1 + "," + prePartialMultiplyPath2),
partialMultiplyPath,
- SequenceFileInputFormat.class, Mapper.class, VarIntWritable.class,
VectorOrPrefWritable.class,
- ToVectorAndPrefReducer.class, VarIntWritable.class,
VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
+ new Path(prePartialMultiplyPath1 + "," +
prePartialMultiplyPath2), partialMultiplyPath,
+ SequenceFileInputFormat.class, Mapper.class,
VarIntWritable.class, VectorOrPrefWritable.class,
+ ToVectorAndPrefReducer.class, VarIntWritable.class,
VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
setS3SafeCombinedInputPath(partialMultiply, getTempPath(),
prePartialMultiplyPath1, prePartialMultiplyPath2);
partialMultiply.waitForCompletion(true);
}
if (shouldRunNextPhase(parsedArgs, currentPhase)) {
-
+ //filter out any users we don't care about
/* convert the user/item pairs to filter if a filterfile has been
specified */
if (filterFile != null) {
Job itemFiltering = prepareJob(new Path(filterFile),
explicitFilterPath, TextInputFormat.class,
- ItemFilterMapper.class, VarLongWritable.class,
VarLongWritable.class,
- ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class,
VectorAndPrefsWritable.class,
- SequenceFileOutputFormat.class);
+ ItemFilterMapper.class, VarLongWritable.class,
VarLongWritable.class,
+ ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class,
VectorAndPrefsWritable.class,
+ SequenceFileOutputFormat.class);
itemFiltering.waitForCompletion(true);
}
@@ -227,12 +229,12 @@ public final class RecommenderJob extend
if (filterFile != null) {
aggregateAndRecommendInput += "," + explicitFilterPath;
}
-
+ //extract out the recommendations
Job aggregateAndRecommend = prepareJob(
- new Path(aggregateAndRecommendInput), outputPath,
SequenceFileInputFormat.class,
- PartialMultiplyMapper.class, VarLongWritable.class,
PrefAndSimilarityColumnWritable.class,
- AggregateAndRecommendReducer.class, VarLongWritable.class,
RecommendedItemsWritable.class,
- TextOutputFormat.class);
+ new Path(aggregateAndRecommendInput), outputPath,
SequenceFileInputFormat.class,
+ PartialMultiplyMapper.class, VarLongWritable.class,
PrefAndSimilarityColumnWritable.class,
+ AggregateAndRecommendReducer.class, VarLongWritable.class,
RecommendedItemsWritable.class,
+ TextOutputFormat.class);
Configuration aggregateAndRecommendConf =
aggregateAndRecommend.getConfiguration();
if (itemsFile != null) {
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE,
itemsFile);
@@ -243,7 +245,7 @@ public final class RecommenderJob extend
}
setIOSort(aggregateAndRecommend);
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
- new Path(prepPath,
PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+ new Path(prepPath,
PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS,
numRecommendations);
aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
aggregateAndRecommend.waitForCompletion(true);