Author: ssc
Date: Sun Jul  7 23:51:07 2013
New Revision: 1500553

URL: http://svn.apache.org/r1500553
Log:
MAHOUT-1272 Parallel SGD matrix factorizer for SVDrecommender

Added:
    
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizer.java
    
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizerTest.java
Modified:
    mahout/trunk/CHANGELOG

Modified: mahout/trunk/CHANGELOG
URL: 
http://svn.apache.org/viewvc/mahout/trunk/CHANGELOG?rev=1500553&r1=1500552&r2=1500553&view=diff
==============================================================================
--- mahout/trunk/CHANGELOG (original)
+++ mahout/trunk/CHANGELOG Sun Jul  7 23:51:07 2013
@@ -2,6 +2,8 @@ Mahout Change Log
 
 Release 0.8 - unreleased
 
+  MAHOUT-1272: Parallel SGD matrix factorizer for SVDrecommender (Peng Cheng 
via ssc)
+
   MAHOUT-1271: classify-20newsgroups.sh fails during the seqdirectory step 
(smarthi)
   
   MAHOUT-1269: Cleanup deprecated Lucene 3.x API calls in lucene2seq utility 
unit tests (smarthi)

Added: 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizer.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizer.java?rev=1500553&view=auto
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizer.java
 (added)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizer.java
 Sun Jul  7 23:51:07 2013
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.recommender.svd;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.mahout.cf.taste.common.TasteException;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.common.RunningAverage;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.RandomWrapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Minimalistic implementation of Parallel SGD factorizer based on
+ * <a href="http://www.sze.hu/~gtakacs/download/jmlr_2009.pdf";>
+ * "Scalable Collaborative Filtering Approaches for Large Recommender 
Systems"</a>
+ * and
+ * <a href="hwww.cs.wisc.edu/~brecht/papers/hogwildTR.pdf">
+ * "Hogwild!: A Lock-Free Approach to Parallelizing Stochastic Gradient 
Descent"</a> */
+public class ParallelSGDFactorizer extends AbstractFactorizer {
+
+  private final DataModel dataModel;
+  /** Parameter used to prevent overfitting. */
+  private final double lambda;
+  /** Number of features used to compute this factorization */
+  private final int rank;
+  /** Number of iterations */
+  private final int numEpochs;
+
+  private int numThreads;
+
+  // these next two control decayFactor^steps exponential type of annealing 
learning rate and decay factor
+  private double mu0 = 0.01;
+  private double decayFactor = 1;
+  // these next two control 1/steps^forget type annealing
+  private int stepOffset = 0;
+  // -1 equals even weighting of all examples, 0 means only use exponential 
annealing
+  private double forgettingExponent = 0;
+
+  // The following two should be inversely proportional :)
+  private double biasMuRatio = 0.5;
+  private double biasLambdaRatio = 0.1;
+
+  /** TODO: this is not safe as += is not atomic on many processors, can be 
replaced with AtomicDoubleArray
+   * but it works just fine right now  */
+  /** user features */
+  protected volatile double[][] userVectors;
+  /** item features */
+  protected volatile double[][] itemVectors;
+
+  private final PreferenceShuffler shuffler;
+
+  private int epoch = 1;
+  /** place in user vector where the bias is stored */
+  private static final int USER_BIAS_INDEX = 1;
+  /** place in item vector where the bias is stored */
+  private static final int ITEM_BIAS_INDEX = 2;
+  private static final int FEATURE_OFFSET = 3;
+  /** Standard deviation for random initialization of features */
+  private static final double NOISE = 0.02;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ParallelSGDFactorizer.class);
+
+  protected static class PreferenceShuffler {
+
+    private Preference[] preferences;
+    private Preference[] unstagedPreferences;
+
+    protected final RandomWrapper random = RandomUtils.getRandom();
+
+    public PreferenceShuffler(DataModel dataModel) throws TasteException {
+      cachePreferences(dataModel);
+      shuffle();
+      stage();
+    }
+
+    private int countPreferences(DataModel dataModel) throws TasteException {
+      int numPreferences = 0;
+      LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+      while (userIDs.hasNext()) {
+        PreferenceArray preferencesFromUser = 
dataModel.getPreferencesFromUser(userIDs.nextLong());
+        numPreferences += preferencesFromUser.length();
+      }
+      return numPreferences;
+    }
+
+    private void cachePreferences(DataModel dataModel) throws TasteException {
+      int numPreferences = countPreferences(dataModel);
+      preferences = new Preference[numPreferences];
+
+      LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+      int index = 0;
+      while (userIDs.hasNext()) {
+        long userID = userIDs.nextLong();
+        PreferenceArray preferencesFromUser = 
dataModel.getPreferencesFromUser(userID);
+        for (Preference preference : preferencesFromUser) {
+          preferences[index++] = preference;
+        }
+      }
+    }
+
+    public void shuffle() {
+      unstagedPreferences = preferences.clone();
+      /* Durstenfeld shuffle */
+      for (int i = unstagedPreferences.length - 1; i > 0; i--) {
+        int rand = random.nextInt(i + 1);
+        swapCachedPreferences(i, rand);
+      }
+    }
+
+    //merge this part into shuffle() will make compiler-optimizer do some real 
absurd stuff, test on OpenJDK7
+    private void swapCachedPreferences(int x, int y) {
+      Preference p = unstagedPreferences[x];
+
+      unstagedPreferences[x] = unstagedPreferences[y];
+      unstagedPreferences[y] = p;
+    }
+
+    public void stage() {
+      preferences = unstagedPreferences;
+    }
+
+    public Preference get(int i) {
+      return preferences[i];
+    }
+
+    public int size() {
+      return preferences.length;
+    }
+
+  }
+
+  public ParallelSGDFactorizer(DataModel dataModel, int numFeatures, double 
lambda, int numEpochs)
+    throws TasteException {
+    super(dataModel);
+    this.dataModel = dataModel;
+    this.rank = numFeatures + FEATURE_OFFSET;
+    this.lambda = lambda;
+    this.numEpochs = numEpochs;
+
+    shuffler = new PreferenceShuffler(dataModel);
+
+    //max thread num set to n^0.25 as suggested by hogwild! paper
+    numThreads = Math.min(Runtime.getRuntime().availableProcessors(), (int) 
Math.pow((double) shuffler.size(), 0.25));
+  }
+
+  public ParallelSGDFactorizer(DataModel dataModel, int numFeatures, double 
lambda, int numIterations,
+      double mu0, double decayFactor, int stepOffset, double 
forgettingExponent) throws TasteException {
+    this(dataModel, numFeatures, lambda, numIterations);
+
+    this.mu0 = mu0;
+    this.decayFactor = decayFactor;
+    this.stepOffset = stepOffset;
+    this.forgettingExponent = forgettingExponent;
+  }
+
+  public ParallelSGDFactorizer(DataModel dataModel, int numFeatures, double 
lambda, int numIterations,
+      double mu0, double decayFactor, int stepOffset, double 
forgettingExponent, int numThreads) throws TasteException {
+    this(dataModel, numFeatures, lambda, numIterations, mu0, decayFactor, 
stepOffset, forgettingExponent);
+
+    this.numThreads = numThreads;
+  }
+
+  public ParallelSGDFactorizer(DataModel dataModel, int numFeatures, double 
lambda, int numIterations,
+      double mu0, double decayFactor, int stepOffset, double 
forgettingExponent,
+      double biasMuRatio, double biasLambdaRatio) throws TasteException {
+    this(dataModel, numFeatures, lambda, numIterations, mu0, decayFactor, 
stepOffset, forgettingExponent);
+
+    this.biasMuRatio = biasMuRatio;
+    this.biasLambdaRatio = biasLambdaRatio;
+  }
+
+  public ParallelSGDFactorizer(DataModel dataModel, int numFeatures, double 
lambda, int numIterations,
+      double mu0, double decayFactor, int stepOffset, double 
forgettingExponent,
+      double biasMuRatio, double biasLambdaRatio, int numThreads) throws 
TasteException {
+    this(dataModel, numFeatures, lambda, numIterations, mu0, decayFactor, 
stepOffset, forgettingExponent, biasMuRatio,
+         biasLambdaRatio);
+
+    this.numThreads = numThreads;
+  }
+
+  protected void initialize() throws TasteException {
+    RandomWrapper random = RandomUtils.getRandom();
+    userVectors = new double[dataModel.getNumUsers()][rank];
+    itemVectors = new double[dataModel.getNumItems()][rank];
+
+    double globalAverage = getAveragePreference();
+    for (int userIndex = 0; userIndex < userVectors.length; userIndex++) {
+      userVectors[userIndex][0] = globalAverage;
+      userVectors[userIndex][USER_BIAS_INDEX] = 0; // will store user bias
+      userVectors[userIndex][ITEM_BIAS_INDEX] = 1; // corresponding item 
feature contains item bias
+      for (int feature = FEATURE_OFFSET; feature < rank; feature++) {
+        userVectors[userIndex][feature] = random.nextGaussian() * NOISE;
+      }
+    }
+    for (int itemIndex = 0; itemIndex < itemVectors.length; itemIndex++) {
+      itemVectors[itemIndex][0] = 1; // corresponding user feature contains 
global average
+      itemVectors[itemIndex][USER_BIAS_INDEX] = 1; // corresponding user 
feature contains user bias
+      itemVectors[itemIndex][ITEM_BIAS_INDEX] = 0; // will store item bias
+      for (int feature = FEATURE_OFFSET; feature < rank; feature++) {
+        itemVectors[itemIndex][feature] = random.nextGaussian() * NOISE;
+      }
+    }
+  }
+
+  //TODO: needs optimization
+  private double getMu(int i) {
+    return mu0 * Math.pow(decayFactor, i - 1) * Math.pow(i + stepOffset, 
forgettingExponent);
+  }
+
+  @Override
+  public Factorization factorize() throws TasteException {
+    initialize();
+
+    if (logger.isInfoEnabled()) {
+      logger.info("starting to compute the factorization...");
+    }
+
+    for (epoch = 1; epoch <= numEpochs; epoch++) {
+      shuffler.stage();
+
+      final double mu = getMu(epoch);
+      int subSize = shuffler.size() / numThreads + 1;
+
+      ExecutorService executor=Executors.newFixedThreadPool(numThreads);
+
+      try {
+        for (int t = 0; t < numThreads; t++) {
+          final int iStart = t * subSize;
+          final int iEnd = Math.min((t + 1) * subSize, shuffler.size());
+
+          executor.execute(new Runnable() {
+            @Override
+            public void run() {
+              for (int i = iStart; i < iEnd; i++) {
+                update(shuffler.get(i), mu);
+              }
+            }
+          });
+        }
+      } finally {
+        executor.shutdown();
+        shuffler.shuffle();
+
+        try {
+          boolean terminated = executor.awaitTermination(numEpochs * 
shuffler.size(), TimeUnit.MICROSECONDS);
+          if (!terminated) {
+            logger.error("subtasks takes forever, return anyway");
+          }
+        } catch (InterruptedException e) {
+          throw new TasteException("waiting fof termination interrupted", e);
+        }
+      }
+
+    }
+
+    return createFactorization(userVectors, itemVectors);
+  }
+
+  double getAveragePreference() throws TasteException {
+    RunningAverage average = new FullRunningAverage();
+    LongPrimitiveIterator it = dataModel.getUserIDs();
+    while (it.hasNext()) {
+      for (Preference pref : dataModel.getPreferencesFromUser(it.nextLong())) {
+        average.addDatum(pref.getValue());
+      }
+    }
+    return average.getAverage();
+  }
+
+  /** TODO: this is the vanilla sgd by Tacaks 2009, I speculate that using 
scaling technique proposed in:
+   * Towards Optimal One Pass Large Scale Learning with Averaged Stochastic 
Gradient Descent section 5, page 6
+   * can be beneficial in term s of both speed and accuracy.
+   *
+   * Tacaks' method doesn't calculate gradient of regularization correctly, 
which has non-zero elements everywhere of
+   * the matrix. While Tacaks' method can only updates a single row/column, if 
one user has a lot of recommendation,
+   * her vector will be more affected by regularization using an isolated 
scaling factor for both user vectors and
+   * item vectors can remove this issue without inducing more update cost it 
even reduces it a bit by only performing
+   * one addition and one multiplication.
+   *
+   * BAD SIDE1: the scaling factor decreases fast, it has to be scaled up from 
time to time before dropped to zero or
+   *            caused roundoff error
+   * BAD SIDE2: no body experiment on it before, and people generally use very 
small lambda
+   *            so it's impact on accuracy may still be unknown.
+   * BAD SIDE3: don't know how to make it work for L1-regularization or
+   *            "pseudorank?" (sum of singular values)-regularization */
+  protected void update(Preference preference, double mu) {
+    int userIndex = userIndex(preference.getUserID());
+    int itemIndex = itemIndex(preference.getItemID());
+
+    double[] userVector = userVectors[userIndex];
+    double[] itemVector = itemVectors[itemIndex];
+
+    double prediction = dot(userVector, itemVector);
+    double err = preference.getValue() - prediction;
+
+    // adjust features
+    for (int k = FEATURE_OFFSET; k < rank; k++) {
+      double userFeature = userVector[k];
+      double itemFeature = itemVector[k];
+
+      userVector[k] += mu * (err * itemFeature - lambda * userFeature);
+      itemVector[k] += mu * (err * userFeature - lambda * itemFeature);
+    }
+
+    // adjust user and item bias
+    userVector[USER_BIAS_INDEX] += biasMuRatio * mu * (err - biasLambdaRatio * 
lambda * userVector[USER_BIAS_INDEX]);
+    itemVector[ITEM_BIAS_INDEX] += biasMuRatio * mu * (err - biasLambdaRatio * 
lambda * itemVector[ITEM_BIAS_INDEX]);
+  }
+
+  private double dot(double[] userVector, double[] itemVector) {
+    double sum = 0;
+    for (int k = 0; k < rank; k++) {
+      sum += userVector[k] * itemVector[k];
+    }
+    return sum;
+  }
+}
\ No newline at end of file

Added: 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizerTest.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizerTest.java?rev=1500553&view=auto
==============================================================================
--- 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizerTest.java
 (added)
+++ 
mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/impl/recommender/svd/ParallelSGDFactorizerTest.java
 Sun Jul  7 23:51:07 2013
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.impl.recommender.svd;
+
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+import org.apache.mahout.cf.taste.impl.TasteTestCase;
+import org.apache.mahout.cf.taste.impl.common.FastByIDMap;
+import org.apache.mahout.cf.taste.impl.common.FullRunningAverage;
+import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
+import org.apache.mahout.cf.taste.impl.common.RunningAverage;
+import org.apache.mahout.cf.taste.impl.model.GenericDataModel;
+import org.apache.mahout.cf.taste.impl.model.GenericPreference;
+import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray;
+import 
org.apache.mahout.cf.taste.impl.recommender.svd.ParallelSGDFactorizer.PreferenceShuffler;
+import org.apache.mahout.cf.taste.model.DataModel;
+import org.apache.mahout.cf.taste.model.Preference;
+import org.apache.mahout.cf.taste.model.PreferenceArray;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.common.RandomWrapper;
+import org.apache.mahout.math.DenseMatrix;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.Matrix;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.function.DoubleFunction;
+import org.apache.mahout.math.function.VectorFunction;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ParallelSGDFactorizerTest extends TasteTestCase {
+
+  protected DataModel dataModel;
+
+  protected int rank;
+  protected double lambda;
+  protected int numIterations;
+
+  private RandomWrapper random = (RandomWrapper) RandomUtils.getRandom();
+
+  protected Factorizer factorizer;
+  protected SVDRecommender svdRecommender;
+
+  private static final Logger logger = 
LoggerFactory.getLogger(ParallelSGDFactorizerTest.class);
+
+  private Matrix randomMatrix(int numRows, int numColumns, double range) {
+    double[][] data = new double[numRows][numColumns];
+    for (int i = 0; i < numRows; i++) {
+      for (int j = 0; j < numColumns; j++) {
+        double sqrtUniform = random.nextDouble();
+        data[i][j] = sqrtUniform * range;
+      }
+    }
+    return new DenseMatrix(data);
+  }
+
+  private void normalize(Matrix source, final double range) {
+    final double max = source.aggregateColumns(new VectorFunction() {
+      @Override
+      public double apply(Vector column) {
+        return column.maxValue();
+      }
+    }).maxValue();
+
+    final double min = source.aggregateColumns(new VectorFunction() {
+      @Override
+      public double apply(Vector column) {
+        return column.minValue();
+      }
+    }).minValue();
+
+    source.assign(new DoubleFunction() {
+      @Override
+      public double apply(double value) {
+        return (value - min) * range / (max - min);
+      }
+    });
+  }
+
+  public void setUpSyntheticData() throws Exception {
+
+    int numUsers = 2000;
+    int numItems = 1000;
+    double sparsity = 0.5;
+
+    this.rank = 20;
+    this.lambda = 0.000000001;
+    this.numIterations = 100;
+
+    Matrix users = randomMatrix(numUsers, rank, 1);
+    Matrix items = randomMatrix(rank, numItems, 1);
+    Matrix ratings = users.times(items);
+    normalize(ratings, 5);
+
+    FastByIDMap<PreferenceArray> userData = new FastByIDMap<PreferenceArray>();
+    for (int userIndex = 0; userIndex < numUsers; userIndex++) {
+      List<Preference> row= Lists.newArrayList();
+      for (int itemIndex = 0; itemIndex < numItems; itemIndex++) {
+        if (random.nextDouble() <= sparsity) {
+          row.add(new GenericPreference(userIndex, itemIndex, (float) 
ratings.get(userIndex, itemIndex)));
+        }
+      }
+
+      userData.put(userIndex, new GenericUserPreferenceArray(row));
+    }
+
+    dataModel = new GenericDataModel(userData);
+  }
+
+  public void setUpToyData() throws Exception {
+    this.rank = 3;
+    this.lambda = 0.01;
+    this.numIterations = 1000;
+
+    FastByIDMap<PreferenceArray> userData = new FastByIDMap<PreferenceArray>();
+
+    userData.put(1L, new GenericUserPreferenceArray(Arrays.asList(new 
GenericPreference(1L, 1L, 5.0f),
+        new GenericPreference(1L, 2L, 5.0f),
+        new GenericPreference(1L, 3L, 2.0f))));
+
+    userData.put(2L, new GenericUserPreferenceArray(Arrays.asList(new 
GenericPreference(2L, 1L, 2.0f),
+        new GenericPreference(2L, 3L, 3.0f),
+        new GenericPreference(2L, 4L, 5.0f))));
+
+    userData.put(3L, new GenericUserPreferenceArray(Arrays.asList(new 
GenericPreference(3L, 2L, 5.0f),
+        new GenericPreference(3L, 4L, 3.0f))));
+
+    userData.put(4L, new GenericUserPreferenceArray(Arrays.asList(new 
GenericPreference(4L, 1L, 3.0f),
+        new GenericPreference(4L, 4L, 5.0f))));
+    dataModel = new GenericDataModel(userData);
+  }
+
+  @Test
+  public void testPreferenceShufflerWithSyntheticData() throws Exception {
+    setUpSyntheticData();
+
+    ParallelSGDFactorizer.PreferenceShuffler shuffler = new 
PreferenceShuffler(dataModel);
+    shuffler.shuffle();
+    shuffler.stage();
+
+    FastByIDMap<FastByIDMap<Boolean>> checked = new 
FastByIDMap<FastByIDMap<Boolean>>();
+
+    for (int i = 0; i < shuffler.size(); i++) {
+      Preference pref=shuffler.get(i);
+
+      float value = dataModel.getPreferenceValue(pref.getUserID(), 
pref.getItemID());
+      assertEquals(pref.getValue(), value, 0.0);
+      if (!checked.containsKey(pref.getUserID())) {
+        checked.put(pref.getUserID(), new FastByIDMap<Boolean>());
+      }
+
+      assertNull(checked.get(pref.getUserID()).get(pref.getItemID()));
+
+      checked.get(pref.getUserID()).put(pref.getItemID(), true);
+    }
+
+    LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+    int index=0;
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      PreferenceArray preferencesFromUser = 
dataModel.getPreferencesFromUser(userID);
+      for (Preference preference : preferencesFromUser) {
+        
assertTrue(checked.get(preference.getUserID()).get(preference.getItemID()));
+        index++;
+      }
+    }
+    assertEquals(index, shuffler.size());
+  }
+
+  @Test
+  public void testFactorizerWithToyData() throws Exception {
+
+    setUpToyData();
+
+    long start = System.currentTimeMillis();
+
+    factorizer = new ParallelSGDFactorizer(dataModel, rank, lambda, 
numIterations, 0.01, 1, 0, 0);
+
+    Factorization factorization = factorizer.factorize();
+
+    long duration = System.currentTimeMillis() - start;
+
+    /* a hold out test would be better, but this is just a toy example so we 
only check that the
+     * factorization is close to the original matrix */
+    RunningAverage avg = new FullRunningAverage();
+    LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+    LongPrimitiveIterator itemIDs;
+
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      for (Preference pref : dataModel.getPreferencesFromUser(userID)) {
+        double rating = pref.getValue();
+        Vector userVector = new 
DenseVector(factorization.getUserFeatures(userID));
+        Vector itemVector = new 
DenseVector(factorization.getItemFeatures(pref.getItemID()));
+        double estimate = userVector.dot(itemVector);
+        double err = rating - estimate;
+
+        avg.addDatum(err * err);
+      }
+    }
+
+    double sum = 0.0;
+
+    userIDs = dataModel.getUserIDs();
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      Vector userVector = new 
DenseVector(factorization.getUserFeatures(userID));
+      double regularization = userVector.dot(userVector);
+      sum += regularization;
+    }
+
+    itemIDs = dataModel.getItemIDs();
+    while (itemIDs.hasNext()) {
+      long itemID = itemIDs.nextLong();
+      Vector itemVector = new 
DenseVector(factorization.getUserFeatures(itemID));
+      double regularization = itemVector.dot(itemVector);
+      sum += regularization;
+    }
+
+    double rmse = Math.sqrt(avg.getAverage());
+    double loss = avg.getAverage() / 2 + lambda / 2 * sum;
+    logger.info("RMSE: " + rmse + ";\tLoss: " + loss + ";\tTime Used: " + 
duration);
+    assertTrue(rmse < 0.2);
+  }
+
+  @Test
+  public void testRecommenderWithToyData() throws Exception {
+
+    setUpToyData();
+
+    factorizer = new ParallelSGDFactorizer(dataModel, rank, lambda, 
numIterations, 0.01, 1, 0,0);
+    svdRecommender = new SVDRecommender(dataModel, factorizer);
+
+    /* a hold out test would be better, but this is just a toy example so we 
only check that the
+     * factorization is close to the original matrix */
+    RunningAverage avg = new FullRunningAverage();
+    LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      for (Preference pref : dataModel.getPreferencesFromUser(userID)) {
+        double rating = pref.getValue();
+        double estimate = svdRecommender.estimatePreference(userID, 
pref.getItemID());
+        double err = rating - estimate;
+        avg.addDatum(err * err);
+      }
+    }
+
+    double rmse = Math.sqrt(avg.getAverage());
+    logger.info("rmse: " + rmse);
+    assertTrue(rmse < 0.2);
+  }
+
+  @Test
+  public void testFactorizerWithWithSyntheticData() throws Exception {
+
+    setUpSyntheticData();
+
+    long start = System.currentTimeMillis();
+
+    factorizer = new ParallelSGDFactorizer(dataModel, rank, lambda, 
numIterations, 0.01, 1, 0, 0);
+
+    Factorization factorization = factorizer.factorize();
+
+    long duration = System.currentTimeMillis() - start;
+
+    /* a hold out test would be better, but this is just a toy example so we 
only check that the
+     * factorization is close to the original matrix */
+    RunningAverage avg = new FullRunningAverage();
+    LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+    LongPrimitiveIterator itemIDs;
+
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      for (Preference pref : dataModel.getPreferencesFromUser(userID)) {
+        double rating = pref.getValue();
+        Vector userVector = new 
DenseVector(factorization.getUserFeatures(userID));
+        Vector itemVector = new 
DenseVector(factorization.getItemFeatures(pref.getItemID()));
+        double estimate = userVector.dot(itemVector);
+        double err = rating - estimate;
+
+        avg.addDatum(err * err);
+      }
+    }
+
+    double sum = 0.0;
+
+    userIDs = dataModel.getUserIDs();
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      Vector userVector = new 
DenseVector(factorization.getUserFeatures(userID));
+      double regularization=userVector.dot(userVector);
+      sum += regularization;
+    }
+
+    itemIDs = dataModel.getItemIDs();
+    while (itemIDs.hasNext()) {
+      long itemID = itemIDs.nextLong();
+      Vector itemVector = new 
DenseVector(factorization.getUserFeatures(itemID));
+      double regularization = itemVector.dot(itemVector);
+      sum += regularization;
+    }
+
+    double rmse = Math.sqrt(avg.getAverage());
+    double loss = avg.getAverage() / 2 + lambda / 2 * sum;
+    logger.info("RMSE: " + rmse + ";\tLoss: " + loss + ";\tTime Used: " + 
duration + "ms");
+    assertTrue(rmse < 0.2);
+  }
+
+  @Test
+  public void testRecommenderWithSyntheticData() throws Exception {
+
+    setUpSyntheticData();
+
+    factorizer= new ParallelSGDFactorizer(dataModel, rank, lambda, 
numIterations, 0.01, 1, 0, 0);
+    svdRecommender = new SVDRecommender(dataModel, factorizer);
+
+    /* a hold out test would be better, but this is just a toy example so we 
only check that the
+     * factorization is close to the original matrix */
+    RunningAverage avg = new FullRunningAverage();
+    LongPrimitiveIterator userIDs = dataModel.getUserIDs();
+    while (userIDs.hasNext()) {
+      long userID = userIDs.nextLong();
+      for (Preference pref : dataModel.getPreferencesFromUser(userID)) {
+        double rating = pref.getValue();
+        double estimate = svdRecommender.estimatePreference(userID, 
pref.getItemID());
+        double err = rating - estimate;
+        avg.addDatum(err * err);
+      }
+    }
+
+    double rmse = Math.sqrt(avg.getAverage());
+    logger.info("rmse: " + rmse);
+    assertTrue(rmse < 0.2);
+  }
+}
\ No newline at end of file


Reply via email to