Author: ssc
Date: Thu Nov 10 15:09:01 2011
New Revision: 1200366
URL: http://svn.apache.org/viewvc?rev=1200366&view=rev
Log:
MAHOUT-878 Provide better examples for the parallel ALS recommender code
Added:
mahout/trunk/examples/bin/factorize-netflix.sh
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/NetflixDatasetConverter.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
mahout/trunk/math/src/main/java/org/apache/mahout/math/als/AlternatingLeastSquaresSolver.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java?rev=1200366&r1=1200365&r2=1200366&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/FactorizationEvaluator.java
Thu Nov 10 15:09:01 2011
@@ -105,10 +105,8 @@ public class FactorizationEvaluator exte
protected double computeRmse(Path errors) {
RunningAverage average = new FullRunningAverage();
for (Pair<DoubleWritable,NullWritable> entry :
- new SequenceFileDirIterable<DoubleWritable, NullWritable>(errors,
-
PathType.LIST,
-
PathFilters.logsCRCFilter(),
- getConf())) {
+ new SequenceFileDirIterable<DoubleWritable, NullWritable>(errors,
PathType.LIST, PathFilters.logsCRCFilter(),
+ getConf())) {
DoubleWritable error = entry.getFirst();
average.addDatum(error.get() * error.get());
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java?rev=1200366&r1=1200365&r2=1200366&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/als/ParallelALSFactorizationJob.java
Thu Nov 10 15:09:01 2011
@@ -50,6 +50,8 @@ import org.apache.mahout.math.VectorWrit
import org.apache.mahout.math.als.AlternatingLeastSquaresSolver;
import
org.apache.mahout.math.als.ImplicitFeedbackAlternatingLeastSquaresSolver;
import org.apache.mahout.math.map.OpenIntObjectHashMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
@@ -79,6 +81,8 @@ import java.util.Random;
*/
public class ParallelALSFactorizationJob extends AbstractJob {
+ private static final Logger log =
LoggerFactory.getLogger(ParallelALSFactorizationJob.class);
+
static final String NUM_FEATURES =
ParallelALSFactorizationJob.class.getName() + ".numFeatures";
static final String LAMBDA = ParallelALSFactorizationJob.class.getName() +
".lambda";
static final String ALPHA = ParallelALSFactorizationJob.class.getName() +
".alpha";
@@ -153,8 +157,10 @@ public class ParallelALSFactorizationJob
for (int currentIteration = 0; currentIteration < numIterations;
currentIteration++) {
/* broadcast M, read A row-wise, recompute U row-wise */
+ log.info("Recomputing U (iteration {}/{})", currentIteration,
numIterations);
runSolver(pathToUserRatings(), pathToU(currentIteration),
pathToM(currentIteration - 1));
/* broadcast U, read A' row-wise, recompute M row-wise */
+ log.info("Recomputing M (iteration {}/{})", currentIteration,
numIterations);
runSolver(pathToItemRatings(), pathToM(currentIteration),
pathToU(currentIteration));
}
Added: mahout/trunk/examples/bin/factorize-netflix.sh
URL:
http://svn.apache.org/viewvc/mahout/trunk/examples/bin/factorize-netflix.sh?rev=1200366&view=auto
==============================================================================
--- mahout/trunk/examples/bin/factorize-netflix.sh (added)
+++ mahout/trunk/examples/bin/factorize-netflix.sh Thu Nov 10 15:09:01 2011
@@ -0,0 +1,77 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Instructions:
+#
+# You can only use this script in conjunction with the Netflix dataset. Unpack
the Netflix dataset and provide the
+# following:
+#
+# 1) the path to the folder 'training_set' that contains all the movie
rating files
+# 2) the path to the file 'qualifying.txt' that contains the user,item pairs
to predict
+# 3) the path to the file 'judging.txt' that contains the ratings of
user,item pairs to predict for
+#
+# To run:
+# ./factorize-netflix.sh /path/to/training_set/ /path/to/qualifying.txt
/path/to/judging.txt
+
+if [ $# -ne 3 ]
+then
+ echo -e "Syntax: $0 /path/to/training_set/ /path/to/qualifying.txt
/path/to/judging.txt\n"
+ exit -1
+fi
+
+MAHOUT="../../bin/mahout"
+
+WORK_DIR=/tmp/mahout-work-${USER}
+
+echo "Preparing data..."
+$MAHOUT
org.apache.mahout.cf.taste.hadoop.example.als.netflix.NetflixDatasetConverter
$1 $2 $3 ${WORK_DIR}
+
+# run distributed ALS-WR to factorize the rating matrix defined by the
training set
+$MAHOUT parallelALS --input ${WORK_DIR}/trainingSet/ratings.tsv --output
${WORK_DIR}/als/out \
+ --tempDir ${WORK_DIR}/als/tmp --numFeatures 25 --numIterations 10 --lambda
0.065
+
+# compute predictions against the probe set, measure the error
+$MAHOUT evaluateFactorization --input ${WORK_DIR}/probeSet/ratings.tsv
--output ${WORK_DIR}/als/rmse/ \
+ --userFeatures ${WORK_DIR}/als/out/U/ --itemFeatures
${WORK_DIR}/als/out/M/ --tempDir ${WORK_DIR}/als/tmp
+
+if [ "$HADOOP_HOME" != "" ] && [ "$MAHOUT_LOCAL" == "" ] ; then
+ HADOOP="$HADOOP_HOME/bin/hadoop"
+ if [ ! -e $HADOOP ]; then
+ echo "Can't find hadoop in $HADOOP, exiting"
+ exit 1
+ fi
+
+ # print the error, should be around 0.923
+ echo -e "\nRMSE is:\n"
+ $HADOOP fs -tail ${WORK_DIR}/als/rmse/rmse.txt
+ echo -e "\n"
+ echo "removing work directory"
+ set +e
+ $HADOOP fs -rmr ${WORK_DIR}
+
+else
+
+ # print the error, should be around 0.923
+ echo -e "\nRMSE is:\n"
+ cat ${WORK_DIR}/als/rmse/rmse.txt
+ echo -e "\n"
+ echo "removing work directory"
+ rm -rf ${WORK_DIR}
+
+fi
+
Added:
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/NetflixDatasetConverter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/NetflixDatasetConverter.java?rev=1200366&view=auto
==============================================================================
---
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/NetflixDatasetConverter.java
(added)
+++
mahout/trunk/examples/src/main/java/org/apache/mahout/cf/taste/hadoop/example/als/netflix/NetflixDatasetConverter.java
Thu Nov 10 15:09:01 2011
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.example.als.netflix;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.cf.taste.impl.model.GenericPreference;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/** converts the raw files provided by netflix to an appropriate input format
*/
+public class NetflixDatasetConverter {
+
+ private static final Logger log =
LoggerFactory.getLogger(NetflixDatasetConverter.class);
+
+ private static final Pattern SEPARATOR = Pattern.compile(",");
+ private static final String MOVIE_DENOTER = ":";
+ private static final String TAB = "\t";
+ private static final String NEWLINE = "\n";
+
+ public static void main(String[] args) throws IOException {
+
+ if (args.length != 4) {
+ System.err.println("Usage: NetflixDatasetConverter
/path/to/training_set/ /path/to/qualifying.txt " +
+ "/path/to/judging.txt /path/to/destination");
+ return;
+ }
+
+ String trainingDataDir = args[0];
+ String qualifyingTxt = args[1];
+ String judgingTxt = args[2];
+ Path outputPath = new Path(args[3]);
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(outputPath.toUri(), conf);
+
+ log.info("Creating training set at {}/trainingSet/ratings.tsv ...",
outputPath);
+ BufferedWriter writer = null;
+ try {
+ FSDataOutputStream outputStream = fs.create(new Path(outputPath,
"trainingSet/ratings.tsv"));
+ writer = new BufferedWriter(new OutputStreamWriter(outputStream,
Charsets.UTF_8));
+
+ int ratingsProcessed = 0;
+ for (File movieRatings : new File(trainingDataDir).listFiles()) {
+ boolean firstLineRead = false;
+ String movieID = null;
+ FileLineIterator lines = null;
+ try {
+ lines = new FileLineIterator(movieRatings);
+ while (lines.hasNext()) {
+ String line = lines.next();
+ if (!firstLineRead) {
+ movieID = line.replaceAll(MOVIE_DENOTER, "");
+ firstLineRead = true;
+ } else {
+ String[] tokens = SEPARATOR.split(line);
+ String userID = tokens[0];
+ String rating = tokens[1];
+ writer.write(userID + TAB + movieID + TAB + rating + NEWLINE);
+ ratingsProcessed++;
+ if (ratingsProcessed % 1000000 == 0) {
+ log.info("{} ratings processed...", ratingsProcessed);
+ }
+ }
+ }
+ } finally {
+ Closeables.closeQuietly(lines);
+ }
+ }
+ log.info("{} ratings processed. done.", ratingsProcessed);
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+
+ log.info("Reading probes...");
+ List<Preference> probes = Lists.newArrayListWithExpectedSize(2817131);
+ long currentMovieID = -1;
+ for (String line : new FileLineIterable(new File(qualifyingTxt))) {
+ if (line.contains(MOVIE_DENOTER)) {
+ currentMovieID = Long.parseLong(line.replaceAll(MOVIE_DENOTER, ""));
+ } else {
+ long userID = Long.parseLong(SEPARATOR.split(line)[0]);
+ probes.add(new GenericPreference(userID, currentMovieID, 0));
+ }
+ }
+ log.info("{} probes read...", probes.size());
+
+ log.info("Reading ratings, creating probe set at {}/probeSet/ratings.tsv
...", outputPath);
+ writer = null;
+ try {
+ FSDataOutputStream outputStream = fs.create(new Path(outputPath,
"probeSet/ratings.tsv"));
+ writer = new BufferedWriter(new OutputStreamWriter(outputStream,
Charsets.UTF_8));
+
+ int ratingsProcessed = 0;
+ for (String line : new FileLineIterable(new File(judgingTxt))) {
+ if (line.contains(MOVIE_DENOTER)) {
+ currentMovieID = Long.parseLong(line.replaceAll(MOVIE_DENOTER, ""));
+ } else {
+ float rating = Float.parseFloat(SEPARATOR.split(line)[0]);
+ Preference pref = probes.get(ratingsProcessed);
+ Preconditions.checkState(pref.getItemID() == currentMovieID);
+ ratingsProcessed++;
+ writer.write(pref.getUserID() + TAB + pref.getItemID() + TAB +
rating + NEWLINE);
+ if (ratingsProcessed % 1000000 == 0) {
+ log.info("{} ratings processed...", ratingsProcessed);
+ }
+ }
+ }
+ log.info("{} ratings processed. done.", ratingsProcessed);
+ } finally {
+ Closeables.closeQuietly(writer);
+ }
+ }
+
+
+}
Modified:
mahout/trunk/math/src/main/java/org/apache/mahout/math/als/AlternatingLeastSquaresSolver.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/math/src/main/java/org/apache/mahout/math/als/AlternatingLeastSquaresSolver.java?rev=1200366&r1=1200365&r2=1200366&view=diff
==============================================================================
---
mahout/trunk/math/src/main/java/org/apache/mahout/math/als/AlternatingLeastSquaresSolver.java
(original)
+++
mahout/trunk/math/src/main/java/org/apache/mahout/math/als/AlternatingLeastSquaresSolver.java
Thu Nov 10 15:09:01 2011
@@ -70,7 +70,7 @@ public class AlternatingLeastSquaresSolv
int n = 0;
for (Vector featureVector : featureVectors) {
for (int m = 0; m < numFeatures; m++) {
- MiIi.setQuick(m, n, featureVector.get(m));
+ MiIi.setQuick(m, n, featureVector.getQuick(m));
}
n++;
}