http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
new file mode 100644
index 0000000..fd6657f
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.als;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.mahout.math.VectorWritable;
+import 
org.apache.mahout.math.als.ImplicitFeedbackAlternatingLeastSquaresSolver;
+
+import java.io.IOException;
+
+/** Solving mapper that can be safely executed using multiple threads */
+public class SolveImplicitFeedbackMapper
+    extends 
SharingMapper<IntWritable,VectorWritable,IntWritable,VectorWritable,
+    ImplicitFeedbackAlternatingLeastSquaresSolver> {
+
+  private final VectorWritable uiOrmj = new VectorWritable();
+
+  @Override
+  ImplicitFeedbackAlternatingLeastSquaresSolver createSharedInstance(Context 
ctx) throws IOException {
+    Configuration conf = ctx.getConfiguration();
+
+    double lambda = 
Double.parseDouble(conf.get(ParallelALSFactorizationJob.LAMBDA));
+    double alpha = 
Double.parseDouble(conf.get(ParallelALSFactorizationJob.ALPHA));
+    int numFeatures = conf.getInt(ParallelALSFactorizationJob.NUM_FEATURES, 
-1);
+    int numEntities = 
Integer.parseInt(conf.get(ParallelALSFactorizationJob.NUM_ENTITIES));
+
+    Preconditions.checkArgument(numFeatures > 0, "numFeatures must be greater 
then 0!");
+
+    return new ImplicitFeedbackAlternatingLeastSquaresSolver(numFeatures, 
lambda, alpha,
+        ALS.readMatrixByRowsFromDistributedCache(numEntities, conf), 1);
+  }
+
+  @Override
+  protected void map(IntWritable userOrItemID, VectorWritable ratingsWritable, 
Context ctx)
+    throws IOException, InterruptedException {
+    ImplicitFeedbackAlternatingLeastSquaresSolver solver = getSharedInstance();
+    uiOrmj.set(solver.solve(ratingsWritable.get()));
+    ctx.write(userOrItemID, uiOrmj);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
new file mode 100644
index 0000000..b44fd5b
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.TopItemsQueue;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.cf.taste.recommender.RecommendedItem;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.function.Functions;
+import org.apache.mahout.math.map.OpenIntLongHashMap;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>computes prediction values for each user</p>
+ *
+ * <pre>
+ * u = a user
+ * i = an item not yet rated by u
+ * N = all items similar to i (where similarity is usually computed by 
pairwisely comparing the item-vectors
+ * of the user-item matrix)
+ *
+ * Prediction(u,i) = sum(all n from N: similarity(i,n) * rating(u,n)) / 
sum(all n from N: abs(similarity(i,n)))
+ * </pre>
+ */
+public final class AggregateAndRecommendReducer extends
+    
Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable>
 {
+
+  private static final Logger log = 
LoggerFactory.getLogger(AggregateAndRecommendReducer.class);
+
+  static final String ITEMID_INDEX_PATH = "itemIDIndexPath";
+  static final String NUM_RECOMMENDATIONS = "numRecommendations";
+  static final int DEFAULT_NUM_RECOMMENDATIONS = 10;
+  static final String ITEMS_FILE = "itemsFile";
+
+  private boolean booleanData;
+  private int recommendationsPerUser;
+  private IDReader idReader;
+  private FastIDSet itemsToRecommendFor;
+  private OpenIntLongHashMap indexItemIDMap;
+
+  private final RecommendedItemsWritable recommendedItems = new 
RecommendedItemsWritable();
+
+  private static final float BOOLEAN_PREF_VALUE = 1.0f;
+
+  @Override
+  protected void setup(Context context) throws IOException {
+    Configuration conf = context.getConfiguration();
+    recommendationsPerUser = conf.getInt(NUM_RECOMMENDATIONS, 
DEFAULT_NUM_RECOMMENDATIONS);
+    booleanData = conf.getBoolean(RecommenderJob.BOOLEAN_DATA, false);
+    indexItemIDMap = 
TasteHadoopUtils.readIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf);
+
+    idReader = new IDReader(conf);
+    idReader.readIDs();
+    itemsToRecommendFor = idReader.getItemIds();
+  }
+
+  @Override
+  protected void reduce(VarLongWritable userID,
+                        Iterable<PrefAndSimilarityColumnWritable> values,
+                        Context context) throws IOException, 
InterruptedException {
+    if (booleanData) {
+      reduceBooleanData(userID, values, context);
+    } else {
+      reduceNonBooleanData(userID, values, context);
+    }
+  }
+
+  private void reduceBooleanData(VarLongWritable userID,
+                                 Iterable<PrefAndSimilarityColumnWritable> 
values,
+                                 Context context) throws IOException, 
InterruptedException {
+    /* having boolean data, each estimated preference can only be 1,
+     * however we can't use this to rank the recommended items,
+     * so we use the sum of similarities for that. */
+    Iterator<PrefAndSimilarityColumnWritable> columns = values.iterator();
+    Vector predictions = columns.next().getSimilarityColumn();
+    while (columns.hasNext()) {
+      predictions.assign(columns.next().getSimilarityColumn(), Functions.PLUS);
+    }
+    writeRecommendedItems(userID, predictions, context);
+  }
+
+  private void reduceNonBooleanData(VarLongWritable userID,
+                                    Iterable<PrefAndSimilarityColumnWritable> 
values,
+                                    Context context) throws IOException, 
InterruptedException {
+    /* each entry here is the sum in the numerator of the prediction formula */
+    Vector numerators = null;
+    /* each entry here is the sum in the denominator of the prediction formula 
*/
+    Vector denominators = null;
+    /* each entry here is the number of similar items used in the prediction 
formula */
+    Vector numberOfSimilarItemsUsed = new 
RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+
+    for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) {
+      Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn();
+      float prefValue = prefAndSimilarityColumn.getPrefValue();
+      /* count the number of items used for each prediction */
+      for (Element e : simColumn.nonZeroes()) {
+        int itemIDIndex = e.index();
+        numberOfSimilarItemsUsed.setQuick(itemIDIndex, 
numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1);
+      }
+
+      if (denominators == null) {
+        denominators = simColumn.clone();
+      } else {
+        denominators.assign(simColumn, Functions.PLUS_ABS);
+      }
+
+      if (numerators == null) {
+        numerators = simColumn.clone();
+        if (prefValue != BOOLEAN_PREF_VALUE) {
+          numerators.assign(Functions.MULT, prefValue);
+        }
+      } else {
+        if (prefValue != BOOLEAN_PREF_VALUE) {
+          simColumn.assign(Functions.MULT, prefValue);
+        }
+        numerators.assign(simColumn, Functions.PLUS);
+      }
+
+    }
+
+    if (numerators == null) {
+      return;
+    }
+
+    Vector recommendationVector = new 
RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    for (Element element : numerators.nonZeroes()) {
+      int itemIDIndex = element.index();
+      /* preference estimations must be based on at least 2 datapoints */
+      if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) {
+        /* compute normalized prediction */
+        double prediction = element.get() / denominators.getQuick(itemIDIndex);
+        recommendationVector.setQuick(itemIDIndex, prediction);
+      }
+    }
+    writeRecommendedItems(userID, recommendationVector, context);
+  }
+
+  /**
+   * find the top entries in recommendationVector, map them to the real 
itemIDs and write back the result
+   */
+  private void writeRecommendedItems(VarLongWritable userID, Vector 
recommendationVector, Context context)
+    throws IOException, InterruptedException {
+    TopItemsQueue topKItems = new TopItemsQueue(recommendationsPerUser);
+    FastIDSet itemsForUser = null;
+
+    if (idReader != null && idReader.isUserItemFilterSpecified()) {
+      itemsForUser = idReader.getItemsToRecommendForUser(userID.get());
+    }
+
+    for (Element element : recommendationVector.nonZeroes()) {
+      int index = element.index();
+      long itemID;
+      if (indexItemIDMap != null && !indexItemIDMap.isEmpty()) {
+        itemID = indexItemIDMap.get(index);
+      } else { // we don't have any mappings, so just use the original
+        itemID = index;
+      }
+
+      if (shouldIncludeItemIntoRecommendations(itemID, itemsToRecommendFor, 
itemsForUser)) {
+
+        float value = (float) element.get();
+        if (!Float.isNaN(value)) {
+
+          MutableRecommendedItem topItem = topKItems.top();
+          if (value > topItem.getValue()) {
+            topItem.set(itemID, value);
+            topKItems.updateTop();
+          }
+        }
+      }
+    }
+
+    List<RecommendedItem> topItems = topKItems.getTopItems();
+    if (!topItems.isEmpty()) {
+      recommendedItems.set(topItems);
+      context.write(userID, recommendedItems);
+    }
+  }
+
+  private boolean shouldIncludeItemIntoRecommendations(long itemID, FastIDSet 
allItemsToRecommendFor,
+                                                       FastIDSet itemsForUser) 
{
+    if (allItemsToRecommendFor == null && itemsForUser == null) {
+      return true;
+    } else if (itemsForUser != null) {
+      return itemsForUser.contains(itemID);
+    } else {
+      return allItemsToRecommendFor.contains(itemID);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java
new file mode 100644
index 0000000..7797fe9
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads user ids and item ids from files specified in usersFile, itemsFile or 
userItemFile options in item-based
+ *  recommender. Composes a list of users and a list of items which can be 
used by
+ * {@link org.apache.mahout.cf.taste.hadoop.item.UserVectorSplitterMapper} and
+ * {@link org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer}.
+ */
+public class IDReader {
+
+  static final String USER_ITEM_FILE = "userItemFile";
+
+  private static final Logger log = LoggerFactory.getLogger(IDReader.class);
+  private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
+
+  private Configuration conf;
+
+  private String usersFile;
+  private String itemsFile;
+  private String userItemFile;
+
+  private FastIDSet userIds;
+  private FastIDSet itemIds;
+
+  private FastIDSet emptySet;
+
+  /* Key - user id, value - a set of item ids to include into recommendations 
for this user */
+  private Map<Long, FastIDSet> userItemFilter;
+
+  /**
+   * Creates a new IDReader
+   * 
+   * @param conf Job configuration
+   */
+  public IDReader(Configuration conf) {
+    this.conf = conf;
+    emptySet = new FastIDSet();
+
+    usersFile = conf.get(UserVectorSplitterMapper.USERS_FILE);
+    itemsFile = conf.get(AggregateAndRecommendReducer.ITEMS_FILE);
+    userItemFile = conf.get(USER_ITEM_FILE);
+  }
+
+  /**
+   * Reads user ids and item ids from files specified in a job configuration
+   * 
+   * @throws IOException if an error occurs during file read operation
+   * 
+   * @throws IllegalStateException if userItemFile option is specified 
together with usersFile or itemsFile
+   */
+  public void readIDs() throws IOException, IllegalStateException {
+    if (isUserItemFileSpecified()) {
+      readUserItemFilterIfNeeded();
+    }
+
+    if (isUsersFileSpecified() || isUserItemFilterSpecified()) {
+      readUserIds();
+    }
+
+    if (isItemsFileSpecified() || isUserItemFilterSpecified()) {
+      readItemIds();
+    }
+  }
+
+  /**
+   * Gets a collection of items which should be recommended for a user
+   * 
+   * @param userId ID of a user we are interested in
+   * @return if a userItemFile option is specified, and that file contains at 
least one item ID for the user,
+   *         then this method returns a {@link FastIDSet} object populated 
with item IDs. Otherwise, this
+   *         method returns an empty set.
+   */
+  public FastIDSet getItemsToRecommendForUser(Long userId) {
+    if (isUserItemFilterSpecified() && userItemFilter.containsKey(userId)) {
+      return userItemFilter.get(userId);
+    } else {
+      return emptySet;
+    }
+  }
+
+  private void readUserIds() throws IOException, IllegalStateException {
+    if (isUsersFileSpecified() && !isUserItemFileSpecified()) {
+      userIds = readIDList(usersFile);
+    } else if (isUserItemFileSpecified() && !isUsersFileSpecified()) {
+      readUserItemFilterIfNeeded();
+      userIds = extractAllUserIdsFromUserItemFilter(userItemFilter);
+    } else if (!isUsersFileSpecified()) {
+      throw new IllegalStateException("Neither usersFile nor userItemFile 
options are specified");
+    } else {
+      throw new IllegalStateException("usersFile and userItemFile options 
cannot be used simultaneously");
+    }
+  }
+
+  private void readItemIds() throws IOException, IllegalStateException {
+    if (isItemsFileSpecified() && !isUserItemFileSpecified()) {
+      itemIds = readIDList(itemsFile);
+    } else if (isUserItemFileSpecified() && !isItemsFileSpecified()) {
+      readUserItemFilterIfNeeded();
+      itemIds = extractAllItemIdsFromUserItemFilter(userItemFilter);
+    } else if (!isItemsFileSpecified()) {
+      throw new IllegalStateException("Neither itemsFile nor userItemFile 
options are specified");
+    } else {
+      throw new IllegalStateException("itemsFile and userItemFile options 
cannot be specified simultaneously");
+    }
+  }
+
+  private void readUserItemFilterIfNeeded() throws IOException {
+    if (!isUserItemFilterSpecified() && isUserItemFileSpecified()) {
+      userItemFilter = readUserItemFilter(userItemFile);
+    }
+  }
+
+  private Map<Long, FastIDSet> readUserItemFilter(String pathString) throws 
IOException {
+    Map<Long, FastIDSet> result = new HashMap<>();
+
+    try (InputStream in = openFile(pathString)) {
+      for (String line : new FileLineIterable(in)) {
+        try {
+          String[] tokens = SEPARATOR.split(line);
+          Long userId = Long.parseLong(tokens[0]);
+          Long itemId = Long.parseLong(tokens[1]);
+
+          addUserAndItemIdToUserItemFilter(result, userId, itemId);
+        } catch (NumberFormatException nfe) {
+          log.warn("userItemFile line ignored: {}", line);
+        }
+      }
+    }
+
+    return result;
+  }
+
+  void addUserAndItemIdToUserItemFilter(Map<Long, FastIDSet> filter, Long 
userId, Long itemId) {
+    FastIDSet itemIds;
+
+    if (filter.containsKey(userId)) {
+      itemIds = filter.get(userId);
+    } else {
+      itemIds = new FastIDSet();
+      filter.put(userId, itemIds);
+    }
+
+    itemIds.add(itemId);
+  }
+
+  static FastIDSet extractAllUserIdsFromUserItemFilter(Map<Long, FastIDSet> 
filter) {
+    FastIDSet result = new FastIDSet();
+
+    for (Long userId : filter.keySet()) {
+      result.add(userId);
+    }
+
+    return result;
+  }
+
+  private FastIDSet extractAllItemIdsFromUserItemFilter(Map<Long, FastIDSet> 
filter) {
+    FastIDSet result = new FastIDSet();
+
+    for (FastIDSet itemIds : filter.values()) {
+      result.addAll(itemIds);
+    }
+
+    return result;
+  }
+
+  private FastIDSet readIDList(String pathString) throws IOException {
+    FastIDSet result = null;
+
+    if (pathString != null) {
+      result = new FastIDSet();
+
+      try (InputStream in = openFile(pathString)){
+        for (String line : new FileLineIterable(in)) {
+          try {
+            result.add(Long.parseLong(line));
+          } catch (NumberFormatException nfe) {
+            log.warn("line ignored: {}", line);
+          }
+        }
+      }
+    }
+
+    return result;
+  }
+
+  private InputStream openFile(String pathString) throws IOException {
+    return HadoopUtil.openStream(new Path(pathString), conf);
+  }
+
+  public boolean isUsersFileSpecified () {
+    return usersFile != null;
+  }
+  
+  public boolean isItemsFileSpecified () {
+    return itemsFile != null;
+  }
+  
+  public boolean isUserItemFileSpecified () {
+    return userItemFile != null;
+  }
+
+  public boolean isUserItemFilterSpecified() {
+    return userItemFilter != null;
+  }
+
+  public FastIDSet getUserIds() {
+    return userIds;
+  }
+
+  public FastIDSet getItemIds() {
+    return itemIds;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
new file mode 100644
index 0000000..4415a55
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+
+/**
+ * we use a neat little trick to explicitly filter items for some users: we 
inject a NaN summand into the preference
+ * estimation for those items, which makes {@link 
org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer}
+ * automatically exclude them 
+ */
+public class ItemFilterAsVectorAndPrefsReducer
+    extends 
Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable> {
+
+  private final VarIntWritable itemIDIndexWritable = new VarIntWritable();
+  private final VectorAndPrefsWritable vectorAndPrefs = new 
VectorAndPrefsWritable();
+
+  @Override
+  protected void reduce(VarLongWritable itemID, Iterable<VarLongWritable> 
values, Context ctx)
+    throws IOException, InterruptedException {
+    
+    int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get());
+    Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+    /* artificial NaN summand to exclude this item from the recommendations 
for all users specified in userIDs */
+    vector.set(itemIDIndex, Double.NaN);
+
+    List<Long> userIDs = new ArrayList<>();
+    List<Float> prefValues = new ArrayList<>();
+    for (VarLongWritable userID : values) {
+      userIDs.add(userID.get());
+      prefValues.add(1.0f);
+    }
+
+    itemIDIndexWritable.set(itemIDIndex);
+    vectorAndPrefs.set(vector, userIDs, prefValues);
+    ctx.write(itemIDIndexWritable, vectorAndPrefs);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
new file mode 100644
index 0000000..cdc1ddf
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarLongWritable;
+
+import java.io.IOException;
+import java.util.regex.Pattern;
+
+/**
+ * map out all user/item pairs to filter, keyed by the itemID
+ */
+public class ItemFilterMapper extends 
Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> {
+
+  private static final Pattern SEPARATOR = Pattern.compile("[\t,]");
+
+  private final VarLongWritable itemIDWritable = new VarLongWritable();
+  private final VarLongWritable userIDWritable = new VarLongWritable();
+
+  @Override
+  protected void map(LongWritable key, Text line, Context ctx) throws 
IOException, InterruptedException {
+    String[] tokens = SEPARATOR.split(line.toString());
+    long userID = Long.parseLong(tokens[0]);
+    long itemID = Long.parseLong(tokens[1]);
+    itemIDWritable.set(itemID);
+    userIDWritable.set(userID);
+    ctx.write(itemIDWritable, userIDWritable);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
new file mode 100644
index 0000000..ac8597e
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+public final class ItemIDIndexMapper extends
+    Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> {
+
+  private boolean transpose;
+
+  private final VarIntWritable indexWritable = new VarIntWritable();
+  private final VarLongWritable itemIDWritable = new VarLongWritable();
+
+  @Override
+  protected void setup(Context context) {
+    Configuration jobConf = context.getConfiguration();
+    transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, 
false);
+  }
+  
+  @Override
+  protected void map(LongWritable key,
+                     Text value,
+                     Context context) throws IOException, InterruptedException 
{
+    String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString());
+    long itemID = Long.parseLong(tokens[transpose ? 0 : 1]);
+    int index = TasteHadoopUtils.idToIndex(itemID);
+    indexWritable.set(index);
+    itemIDWritable.set(itemID);
+    context.write(indexWritable, itemIDWritable);
+  }  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
new file mode 100644
index 0000000..d9ecf5e
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+
+public final class ItemIDIndexReducer extends
+    Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> {
+
+  private final VarLongWritable minimumItemIDWritable = new VarLongWritable();
+
+  @Override
+  protected void reduce(VarIntWritable index,
+                        Iterable<VarLongWritable> possibleItemIDs,
+                        Context context) throws IOException, 
InterruptedException {
+    long minimumItemID = Long.MAX_VALUE;
+    for (VarLongWritable varLongWritable : possibleItemIDs) {
+      long itemID = varLongWritable.get();
+      if (itemID < minimumItemID) {
+        minimumItemID = itemID;
+      }
+    }
+    if (minimumItemID != Long.MAX_VALUE) {
+      minimumItemIDWritable.set(minimumItemID);
+      context.write(index, minimumItemIDWritable);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
new file mode 100644
index 0000000..0e818f3
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+
+/**
+ * maps similar items and their preference values per user
+ */
+public final class PartialMultiplyMapper extends
+    
Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable>
 {
+
+  private final VarLongWritable userIDWritable = new VarLongWritable();
+  private final PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new 
PrefAndSimilarityColumnWritable();
+
+  @Override
+  protected void map(VarIntWritable key,
+                     VectorAndPrefsWritable vectorAndPrefsWritable,
+                     Context context) throws IOException, InterruptedException 
{
+
+    Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector();
+    List<Long> userIDs = vectorAndPrefsWritable.getUserIDs();
+    List<Float> prefValues = vectorAndPrefsWritable.getValues();
+
+    for (int i = 0; i < userIDs.size(); i++) {
+      long userID = userIDs.get(i);
+      float prefValue = prefValues.get(i);
+      if (!Float.isNaN(prefValue)) {
+        prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn);
+        userIDWritable.set(userID);
+        context.write(userIDWritable, prefAndSimilarityColumn);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
new file mode 100644
index 0000000..704c74a
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.RandomUtils;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class PrefAndSimilarityColumnWritable implements Writable {
+
+  private float prefValue;
+  private Vector similarityColumn;
+
+  public PrefAndSimilarityColumnWritable() {
+  }
+
+  public PrefAndSimilarityColumnWritable(float prefValue, Vector 
similarityColumn) {
+    set(prefValue, similarityColumn);
+  }
+
+  public void set(float prefValue, Vector similarityColumn) {
+    this.prefValue = prefValue;
+    this.similarityColumn = similarityColumn;
+  }
+
+  public float getPrefValue() {
+    return prefValue;
+  }
+
+  public Vector getSimilarityColumn() {
+    return similarityColumn;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    prefValue = in.readFloat();
+    VectorWritable vw = new VectorWritable();
+    vw.readFields(in);
+    similarityColumn = vw.get();
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeFloat(prefValue);
+    VectorWritable vw = new VectorWritable(similarityColumn);
+    vw.setWritesLaxPrecision(true);
+    vw.write(out);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj instanceof PrefAndSimilarityColumnWritable) {
+      PrefAndSimilarityColumnWritable other = 
(PrefAndSimilarityColumnWritable) obj;
+      return prefValue == other.prefValue && 
similarityColumn.equals(other.similarityColumn);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return RandomUtils.hashFloat(prefValue) + 31 * similarityColumn.hashCode();
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
new file mode 100644
index 0000000..129db1d
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable;
+import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable;
+import 
org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob;
+import org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob;
+import 
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * <p>Runs a completely distributed recommender job as a series of 
mapreduces.</p>
+ * <p/>
+ * <p>Preferences in the input file should look like {@code userID, itemID[, 
preferencevalue]}</p>
+ * <p/>
+ * <p>
+ * Preference value is optional to accommodate applications that have no 
notion of a preference value (that is, the user
+ * simply expresses a preference for an item, but no degree of preference).
+ * </p>
+ * <p/>
+ * <p>
+ * The preference value is assumed to be parseable as a {@code double}. The 
user IDs and item IDs are
+ * parsed as {@code long}s.
+ * </p>
+ * <p/>
+ * <p>Command line arguments specific to this class are:</p>
+ * <p/>
+ * <ol>
+ * <li>--input(path): Directory containing one or more text files with the 
preference data</li>
+ * <li>--output(path): output path where recommender output should go</li>
+ * <li>--similarityClassname (classname): Name of vector similarity class to 
instantiate or a predefined similarity
+ * from {@link 
org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li>
+ * <li>--usersFile (path): only compute recommendations for user IDs contained 
in this file (optional)</li>
+ * <li>--itemsFile (path): only include item IDs from this file in the 
recommendations (optional)</li>
+ * <li>--filterFile (path): file containing comma-separated userID,itemID 
pairs. Used to exclude the item from the
+ * recommendations for that user (optional)</li>
+ * <li>--numRecommendations (integer): Number of recommendations to compute 
per user (10)</li>
+ * <li>--booleanData (boolean): Treat input data as having no pref values 
(false)</li>
+ * <li>--maxPrefsPerUser (integer): Maximum number of preferences considered 
per user in final
+ *   recommendation phase (10)</li>
+ * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities 
considered per item (100)</li>
+ * <li>--minPrefsPerUser (integer): ignore users with less preferences than 
this in the similarity computation (1)</li>
+ * <li>--maxPrefsPerUserInItemSimilarity (integer): max number of preferences 
to consider per user in
+ *   the item similarity computation phase,
+ * users with more preferences will be sampled down (1000)</li>
+ * <li>--threshold (double): discard item pairs with a similarity value below 
this</li>
+ * </ol>
+ * <p/>
+ * <p>General command line options are documented in {@link AbstractJob}.</p>
+ * <p/>
+ * <p>Note that because of how Hadoop parses arguments, all "-D" arguments 
must appear before all other
+ * arguments.</p>
+ */
+public final class RecommenderJob extends AbstractJob {
+
+  public static final String BOOLEAN_DATA = "booleanData";
+  public static final String DEFAULT_PREPARE_PATH = "preparePreferenceMatrix";
+
+  private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100;
+  private static final int DEFAULT_MAX_PREFS = 500;
+  private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("numRecommendations", "n", "Number of recommendations per user",
+            
String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS));
+    addOption("usersFile", null, "File of users to recommend for", null);
+    addOption("itemsFile", null, "File of items to recommend for", null);
+    addOption("filterFile", "f", "File containing comma-separated 
userID,itemID pairs. Used to exclude the item from "
+            + "the recommendations for that user (optional)", null);
+    addOption("userItemFile", "uif", "File containing comma-separated 
userID,itemID pairs (optional). "
+            + "Used to include only these items into recommendations. "
+            + "Cannot be used together with usersFile or itemsFile", null);
+    addOption("booleanData", "b", "Treat input as without pref values", 
Boolean.FALSE.toString());
+    addOption("maxPrefsPerUser", "mxp",
+            "Maximum number of preferences considered per user in final 
recommendation phase",
+            
String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED));
+    addOption("minPrefsPerUser", "mp", "ignore users with less preferences 
than this in the similarity computation "
+            + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+    addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities 
considered per item ",
+            String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM));
+    addOption("maxPrefsInItemSimilarity", "mpiis", "max number of preferences 
to consider per user or item in the "
+            + "item similarity computation phase, users or items with more 
preferences will be sampled down (default: "
+        + DEFAULT_MAX_PREFS + ')', String.valueOf(DEFAULT_MAX_PREFS));
+    addOption("similarityClassname", "s", "Name of distributed similarity 
measures class to instantiate, " 
+            + "alternatively use one of the predefined similarities (" + 
VectorSimilarityMeasures.list() + ')', true);
+    addOption("threshold", "tr", "discard item pairs with a similarity value 
below this", false);
+    addOption("outputPathForSimilarityMatrix", "opfsm", "write the item 
similarity matrix to this path (optional)",
+        false);
+    addOption("randomSeed", null, "use this seed for sampling", false);
+    addFlag("sequencefileOutput", null, "write the output into a SequenceFile 
instead of a text file");
+
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Path outputPath = getOutputPath();
+    int numRecommendations = Integer.parseInt(getOption("numRecommendations"));
+    String usersFile = getOption("usersFile");
+    String itemsFile = getOption("itemsFile");
+    String filterFile = getOption("filterFile");
+    String userItemFile = getOption("userItemFile");
+    boolean booleanData = Boolean.valueOf(getOption("booleanData"));
+    int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser"));
+    int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
+    int maxPrefsInItemSimilarity = 
Integer.parseInt(getOption("maxPrefsInItemSimilarity"));
+    int maxSimilaritiesPerItem = 
Integer.parseInt(getOption("maxSimilaritiesPerItem"));
+    String similarityClassname = getOption("similarityClassname");
+    double threshold = hasOption("threshold")
+        ? Double.parseDouble(getOption("threshold")) : 
RowSimilarityJob.NO_THRESHOLD;
+    long randomSeed = hasOption("randomSeed")
+        ? Long.parseLong(getOption("randomSeed")) : 
RowSimilarityJob.NO_FIXED_RANDOM_SEED;
+
+
+    Path prepPath = getTempPath(DEFAULT_PREPARE_PATH);
+    Path similarityMatrixPath = getTempPath("similarityMatrix");
+    Path explicitFilterPath = getTempPath("explicitFilterPath");
+    Path partialMultiplyPath = getTempPath("partialMultiply");
+
+    AtomicInteger currentPhase = new AtomicInteger();
+
+    int numberOfUsers = -1;
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{
+        "--input", getInputPath().toString(),
+        "--output", prepPath.toString(),
+        "--minPrefsPerUser", String.valueOf(minPrefsPerUser),
+        "--booleanData", String.valueOf(booleanData),
+        "--tempDir", getTempPath().toString(),
+      });
+
+      numberOfUsers = HadoopUtil.readInt(new Path(prepPath, 
PreparePreferenceMatrixJob.NUM_USERS), getConf());
+    }
+
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+
+      /* special behavior if phase 1 is skipped */
+      if (numberOfUsers == -1) {
+        numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, 
PreparePreferenceMatrixJob.USER_VECTORS),
+                PathType.LIST, null, getConf());
+      }
+
+      //calculate the co-occurrence matrix
+      ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{
+        "--input", new Path(prepPath, 
PreparePreferenceMatrixJob.RATING_MATRIX).toString(),
+        "--output", similarityMatrixPath.toString(),
+        "--numberOfColumns", String.valueOf(numberOfUsers),
+        "--similarityClassname", similarityClassname,
+        "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity),
+        "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity),
+        "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem),
+        "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE),
+        "--threshold", String.valueOf(threshold),
+        "--randomSeed", String.valueOf(randomSeed),
+        "--tempDir", getTempPath().toString(),
+      });
+
+      // write out the similarity matrix if the user specified that behavior
+      if (hasOption("outputPathForSimilarityMatrix")) {
+        Path outputPathForSimilarityMatrix = new 
Path(getOption("outputPathForSimilarityMatrix"));
+
+        Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, 
outputPathForSimilarityMatrix,
+            SequenceFileInputFormat.class, 
ItemSimilarityJob.MostSimilarItemPairsMapper.class,
+            EntityEntityWritable.class, DoubleWritable.class, 
ItemSimilarityJob.MostSimilarItemPairsReducer.class,
+            EntityEntityWritable.class, DoubleWritable.class, 
TextOutputFormat.class);
+
+        Configuration mostSimilarItemsConf = 
outputSimilarityMatrix.getConfiguration();
+        mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR,
+            new Path(prepPath, 
PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+        
mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, 
maxSimilaritiesPerItem);
+        outputSimilarityMatrix.waitForCompletion(true);
+      }
+    }
+
+    //start the multiplication of the co-occurrence matrix by the user vectors
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      Job partialMultiply = Job.getInstance(getConf(), "partialMultiply");
+      Configuration partialMultiplyConf = partialMultiply.getConfiguration();
+
+      MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, 
SequenceFileInputFormat.class,
+                                  SimilarityMatrixRowWrapperMapper.class);
+      MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, 
PreparePreferenceMatrixJob.USER_VECTORS),
+          SequenceFileInputFormat.class, UserVectorSplitterMapper.class);
+      partialMultiply.setJarByClass(ToVectorAndPrefReducer.class);
+      partialMultiply.setMapOutputKeyClass(VarIntWritable.class);
+      partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class);
+      partialMultiply.setReducerClass(ToVectorAndPrefReducer.class);
+      partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class);
+      partialMultiply.setOutputKeyClass(VarIntWritable.class);
+      partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class);
+      partialMultiplyConf.setBoolean("mapred.compress.map.output", true);
+      partialMultiplyConf.set("mapred.output.dir", 
partialMultiplyPath.toString());
+
+      if (usersFile != null) {
+        partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, 
usersFile);
+      }
+      
+      if (userItemFile != null) {
+        partialMultiplyConf.set(IDReader.USER_ITEM_FILE, userItemFile);
+      }
+      
+      
partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED,
 maxPrefsPerUser);
+
+      boolean succeeded = partialMultiply.waitForCompletion(true);
+      if (!succeeded) {
+        return -1;
+      }
+    }
+
+    if (shouldRunNextPhase(parsedArgs, currentPhase)) {
+      //filter out any users we don't care about
+      /* convert the user/item pairs to filter if a filterfile has been 
specified */
+      if (filterFile != null) {
+        Job itemFiltering = prepareJob(new Path(filterFile), 
explicitFilterPath, TextInputFormat.class,
+                ItemFilterMapper.class, VarLongWritable.class, 
VarLongWritable.class,
+                ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, 
VectorAndPrefsWritable.class,
+                SequenceFileOutputFormat.class);
+        boolean succeeded = itemFiltering.waitForCompletion(true);
+        if (!succeeded) {
+          return -1;
+        }
+      }
+
+      String aggregateAndRecommendInput = partialMultiplyPath.toString();
+      if (filterFile != null) {
+        aggregateAndRecommendInput += "," + explicitFilterPath;
+      }
+
+      Class<? extends OutputFormat> outputFormat = 
parsedArgs.containsKey("--sequencefileOutput")
+          ? SequenceFileOutputFormat.class : TextOutputFormat.class;
+
+      //extract out the recommendations
+      Job aggregateAndRecommend = prepareJob(
+              new Path(aggregateAndRecommendInput), outputPath, 
SequenceFileInputFormat.class,
+              PartialMultiplyMapper.class, VarLongWritable.class, 
PrefAndSimilarityColumnWritable.class,
+              AggregateAndRecommendReducer.class, VarLongWritable.class, 
RecommendedItemsWritable.class,
+              outputFormat);
+      Configuration aggregateAndRecommendConf = 
aggregateAndRecommend.getConfiguration();
+      if (itemsFile != null) {
+        aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, 
itemsFile);
+      }
+      
+      if (userItemFile != null) {
+        aggregateAndRecommendConf.set(IDReader.USER_ITEM_FILE, userItemFile);
+      }
+
+      if (filterFile != null) {
+        setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), 
partialMultiplyPath, explicitFilterPath);
+      }
+      setIOSort(aggregateAndRecommend);
+      
aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH,
+              new Path(prepPath, 
PreparePreferenceMatrixJob.ITEMID_INDEX).toString());
+      
aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS,
 numRecommendations);
+      aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData);
+      boolean succeeded = aggregateAndRecommend.waitForCompletion(true);
+      if (!succeeded) {
+        return -1;
+      }
+    }
+
+    return 0;
+  }
+
+  private static void setIOSort(JobContext job) {
+    Configuration conf = job.getConfiguration();
+    conf.setInt("io.sort.factor", 100);
+    String javaOpts = conf.get("mapred.map.child.java.opts"); // new arg name
+    if (javaOpts == null) {
+      javaOpts = conf.get("mapred.child.java.opts"); // old arg name
+    }
+    int assumedHeapSize = 512;
+    if (javaOpts != null) {
+      Matcher m = Pattern.compile("-Xmx([0-9]+)([mMgG])").matcher(javaOpts);
+      if (m.find()) {
+        assumedHeapSize = Integer.parseInt(m.group(1));
+        String megabyteOrGigabyte = m.group(2);
+        if ("g".equalsIgnoreCase(megabyteOrGigabyte)) {
+          assumedHeapSize *= 1024;
+        }
+      }
+    }
+    // Cap this at 1024MB now; see 
https://issues.apache.org/jira/browse/MAPREDUCE-2308
+    conf.setInt("io.sort.mb", Math.min(assumedHeapSize / 2, 1024));
+    // For some reason the Merger doesn't report status for a long time; 
increase
+    // timeout when running these jobs
+    conf.setInt("mapred.task.timeout", 60 * 60 * 1000);
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new RecommenderJob(), args);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
new file mode 100644
index 0000000..8ae8215
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * maps a row of the similarity matrix to a {@link VectorOrPrefWritable}
+ * 
+ * actually a column from that matrix has to be used but as the similarity 
matrix is symmetric, 
+ * we can use a row instead of having to transpose it
+ */
+public final class SimilarityMatrixRowWrapperMapper extends
+    Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> {
+
+  private final VarIntWritable index = new VarIntWritable();
+  private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+
+  @Override
+  protected void map(IntWritable key,
+                     VectorWritable value,
+                     Context context) throws IOException, InterruptedException 
{
+    Vector similarityMatrixRow = value.get();
+    /* remove self similarity */
+    similarityMatrixRow.set(key.get(), Double.NaN);
+
+    index.set(key.get());
+    vectorOrPref.set(similarityMatrixRow);
+
+    context.write(index, vectorOrPref);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
new file mode 100644
index 0000000..e6e47fd
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+/**
+ * <h1>Input</h1>
+ * 
+ * <p>
+ * Takes user IDs as {@link VarLongWritable} mapped to all associated item IDs 
and preference values, as
+ * {@link EntityPrefWritable}s.
+ * </p>
+ * 
+ * <h1>Output</h1>
+ * 
+ * <p>
+ * The same user ID mapped to a {@link RandomAccessSparseVector} 
representation of the same item IDs and
+ * preference values. Item IDs are used as vector indexes; they are hashed 
into ints to work as indexes with
+ * {@link TasteHadoopUtils#idToIndex(long)}. The mapping is remembered for 
later with a combination of
+ * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}.
+ * </p>
+ */
+public final class ToUserVectorsReducer extends
+    Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> {
+
+  public static final String MIN_PREFERENCES_PER_USER = 
ToUserVectorsReducer.class.getName() 
+      + ".minPreferencesPerUser";
+
+  private int minPreferences;
+
+  public enum Counters { USERS }
+
+  private final VectorWritable userVectorWritable = new VectorWritable();
+
+  @Override
+  protected void setup(Context ctx) throws IOException, InterruptedException {
+    super.setup(ctx);
+    minPreferences = ctx.getConfiguration().getInt(MIN_PREFERENCES_PER_USER, 
1);
+  }
+
+  @Override
+  protected void reduce(VarLongWritable userID,
+                        Iterable<VarLongWritable> itemPrefs,
+                        Context context) throws IOException, 
InterruptedException {
+    Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100);
+    for (VarLongWritable itemPref : itemPrefs) {
+      int index = TasteHadoopUtils.idToIndex(itemPref.get());
+      float value = itemPref instanceof EntityPrefWritable ? 
((EntityPrefWritable) itemPref).getPrefValue() : 1.0f;
+      userVector.set(index, value);
+    }
+
+    if (userVector.getNumNondefaultElements() >= minPreferences) {
+      userVectorWritable.set(userVector);
+      userVectorWritable.setWritesLaxPrecision(true);
+      context.getCounter(Counters.USERS).increment(1);
+      context.write(userID, userVectorWritable);
+    }
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
new file mode 100644
index 0000000..9167437
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.Vector;
+
+public final class ToVectorAndPrefReducer extends
+    
Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable>
 {
+
+  private final VectorAndPrefsWritable vectorAndPrefs = new 
VectorAndPrefsWritable();
+
+  @Override
+  protected void reduce(VarIntWritable key,
+                        Iterable<VectorOrPrefWritable> values,
+                        Context context) throws IOException, 
InterruptedException {
+
+    List<Long> userIDs = new ArrayList<>();
+    List<Float> prefValues = new ArrayList<>();
+    Vector similarityMatrixColumn = null;
+    for (VectorOrPrefWritable value : values) {
+      if (value.getVector() == null) {
+        // Then this is a user-pref value
+        userIDs.add(value.getUserID());
+        prefValues.add(value.getValue());
+      } else {
+        // Then this is the column vector
+        if (similarityMatrixColumn != null) {
+          throw new IllegalStateException("Found two similarity-matrix columns 
for item index " + key.get());
+        }
+        similarityMatrixColumn = value.getVector();
+      }
+    }
+
+    if (similarityMatrixColumn == null) {
+      return;
+    }
+
+    vectorAndPrefs.set(similarityMatrixColumn, userIDs, prefValues);
+    context.write(key, vectorAndPrefs);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
new file mode 100644
index 0000000..2290d06
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.mahout.cf.taste.impl.common.FastIDSet;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class UserVectorSplitterMapper extends
+    Mapper<VarLongWritable,VectorWritable, 
VarIntWritable,VectorOrPrefWritable> {
+
+  private static final Logger log = 
LoggerFactory.getLogger(UserVectorSplitterMapper.class);
+
+  static final String USERS_FILE = "usersFile";
+  static final String MAX_PREFS_PER_USER_CONSIDERED = 
"maxPrefsPerUserConsidered";
+  static final int DEFAULT_MAX_PREFS_PER_USER_CONSIDERED = 10;
+
+  private int maxPrefsPerUserConsidered;
+  private FastIDSet usersToRecommendFor;
+
+  private final VarIntWritable itemIndexWritable = new VarIntWritable();
+  private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable();
+
+  @Override
+  protected void setup(Context context) throws IOException {
+    Configuration jobConf = context.getConfiguration();
+    maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, 
DEFAULT_MAX_PREFS_PER_USER_CONSIDERED);
+    
+    IDReader idReader = new IDReader (jobConf);
+    idReader.readIDs();
+    usersToRecommendFor = idReader.getUserIds();    
+  }
+
+  @Override
+  protected void map(VarLongWritable key,
+                     VectorWritable value,
+                     Context context) throws IOException, InterruptedException 
{
+    long userID = key.get();
+
+    log.info("UserID = {}", userID);
+
+    if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) {
+      return;
+    }
+    Vector userVector = maybePruneUserVector(value.get());
+
+    for (Element e : userVector.nonZeroes()) {
+      itemIndexWritable.set(e.index());
+      vectorOrPref.set(userID, (float) e.get());
+      context.write(itemIndexWritable, vectorOrPref);
+    }
+  }
+
+  private Vector maybePruneUserVector(Vector userVector) {
+    if (userVector.getNumNondefaultElements() <= maxPrefsPerUserConsidered) {
+      return userVector;
+    }
+
+    float smallestLargeValue = findSmallestLargeValue(userVector);
+
+    // "Blank out" small-sized prefs to reduce the amount of partial products
+    // generated later. They're not zeroed, but NaN-ed, so they come through
+    // and can be used to exclude these items from prefs.
+    for (Element e : userVector.nonZeroes()) {
+      float absValue = Math.abs((float) e.get());
+      if (absValue < smallestLargeValue) {
+        e.set(Float.NaN);
+      }
+    }
+
+    return userVector;
+  }
+
+  private float findSmallestLargeValue(Vector userVector) {
+
+    PriorityQueue<Float> topPrefValues = new 
PriorityQueue<Float>(maxPrefsPerUserConsidered) {
+      @Override
+      protected boolean lessThan(Float f1, Float f2) {
+        return f1 < f2;
+      }
+    };
+
+    for (Element e : userVector.nonZeroes()) {
+      float absValue = Math.abs((float) e.get());
+      topPrefValues.insertWithOverflow(absValue);
+    }
+    return topPrefValues.top();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
new file mode 100644
index 0000000..11d496f
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class VectorAndPrefsWritable implements Writable {
+
+  private Vector vector;
+  private List<Long> userIDs;
+  private List<Float> values;
+
+  public VectorAndPrefsWritable() {
+  }
+
+  public VectorAndPrefsWritable(Vector vector, List<Long> userIDs, List<Float> 
values) {
+    set(vector, userIDs, values);
+  }
+
+  public void set(Vector vector, List<Long> userIDs, List<Float> values) {
+    this.vector = vector;
+    this.userIDs = userIDs;
+    this.values = values;
+  }
+
+  public Vector getVector() {
+    return vector;
+  }
+
+  public List<Long> getUserIDs() {
+    return userIDs;
+  }
+
+  public List<Float> getValues() {
+    return values;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    VectorWritable vw = new VectorWritable(vector);
+    vw.setWritesLaxPrecision(true);
+    vw.write(out);
+    Varint.writeUnsignedVarInt(userIDs.size(), out);
+    for (int i = 0; i < userIDs.size(); i++) {
+      Varint.writeSignedVarLong(userIDs.get(i), out);
+      out.writeFloat(values.get(i));
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    VectorWritable writable = new VectorWritable();
+    writable.readFields(in);
+    vector = writable.get();
+    int size = Varint.readUnsignedVarInt(in);
+    userIDs = new ArrayList<>(size);
+    values = new ArrayList<>(size);
+    for (int i = 0; i < size; i++) {
+      userIDs.add(Varint.readSignedVarLong(in));
+      values.add(in.readFloat());
+    }
+  }
+
+  @Override
+  public String toString() {
+    return vector + "\t" + userIDs + '\t' + values;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
new file mode 100644
index 0000000..515d7ea
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.item;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.mahout.math.Varint;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+public final class VectorOrPrefWritable implements Writable {
+
+  private Vector vector;
+  private long userID;
+  private float value;
+
+  public VectorOrPrefWritable() {
+  }
+
+  public VectorOrPrefWritable(Vector vector) {
+    this.vector = vector;
+  }
+
+  public VectorOrPrefWritable(long userID, float value) {
+    this.userID = userID;
+    this.value = value;
+  }
+
+  public Vector getVector() {
+    return vector;
+  }
+
+  public long getUserID() {
+    return userID;
+  }
+
+  public float getValue() {
+    return value;
+  }
+
+  void set(Vector vector) {
+    this.vector = vector;
+    this.userID = Long.MIN_VALUE;
+    this.value = Float.NaN;
+  }
+
+  public void set(long userID, float value) {
+    this.vector = null;
+    this.userID = userID;
+    this.value = value;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    if (vector == null) {
+      out.writeBoolean(false);
+      Varint.writeSignedVarLong(userID, out);
+      out.writeFloat(value);
+    } else {
+      out.writeBoolean(true);
+      VectorWritable vw = new VectorWritable(vector);
+      vw.setWritesLaxPrecision(true);
+      vw.write(out);
+    }
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    boolean hasVector = in.readBoolean();
+    if (hasVector) {
+      VectorWritable writable = new VectorWritable();
+      writable.readFields(in);
+      set(writable.get());
+    } else {
+      long theUserID = Varint.readSignedVarLong(in);
+      float theValue = in.readFloat();
+      set(theUserID, theValue);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return vector == null ? userID + ":" + value : vector.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
new file mode 100644
index 0000000..c64ee38
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable;
+import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper;
+import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer;
+import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
+import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.math.VarIntWritable;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.VectorWritable;
+
+import java.util.List;
+import java.util.Map;
+
+public class PreparePreferenceMatrixJob extends AbstractJob {
+
+  public static final String NUM_USERS = "numUsers.bin";
+  public static final String ITEMID_INDEX = "itemIDIndex";
+  public static final String USER_VECTORS = "userVectors";
+  public static final String RATING_MATRIX = "ratingMatrix";
+
+  private static final int DEFAULT_MIN_PREFS_PER_USER = 1;
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new PreparePreferenceMatrixJob(), args);
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+    addOption("minPrefsPerUser", "mp", "ignore users with less preferences 
than this "
+            + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', 
String.valueOf(DEFAULT_MIN_PREFS_PER_USER));
+    addOption("booleanData", "b", "Treat input as without pref values", 
Boolean.FALSE.toString());
+    addOption("ratingShift", "rs", "shift ratings by this value", "0.0");
+
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser"));
+    boolean booleanData = Boolean.valueOf(getOption("booleanData"));
+    float ratingShift = Float.parseFloat(getOption("ratingShift"));
+    //convert items to an internal index
+    Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), 
TextInputFormat.class,
+            ItemIDIndexMapper.class, VarIntWritable.class, 
VarLongWritable.class, ItemIDIndexReducer.class,
+            VarIntWritable.class, VarLongWritable.class, 
SequenceFileOutputFormat.class);
+    itemIDIndex.setCombinerClass(ItemIDIndexReducer.class);
+    boolean succeeded = itemIDIndex.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+    //convert user preferences into a vector per user
+    Job toUserVectors = prepareJob(getInputPath(),
+                                   getOutputPath(USER_VECTORS),
+                                   TextInputFormat.class,
+                                   ToItemPrefsMapper.class,
+                                   VarLongWritable.class,
+                                   booleanData ? VarLongWritable.class : 
EntityPrefWritable.class,
+                                   ToUserVectorsReducer.class,
+                                   VarLongWritable.class,
+                                   VectorWritable.class,
+                                   SequenceFileOutputFormat.class);
+    toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, 
booleanData);
+    
toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER,
 minPrefsPerUser);
+    toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, 
String.valueOf(ratingShift));
+    succeeded = toUserVectors.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+    //we need the number of users later
+    int numberOfUsers = (int) 
toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
+    HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
+    //build the rating matrix
+    Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), 
getOutputPath(RATING_MATRIX),
+            ToItemVectorsMapper.class, IntWritable.class, 
VectorWritable.class, ToItemVectorsReducer.class,
+            IntWritable.class, VectorWritable.class);
+    toItemVectors.setCombinerClass(ToItemVectorsReducer.class);
+
+    succeeded = toItemVectors.waitForCompletion(true);
+    if (!succeeded) {
+      return -1;
+    }
+
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
new file mode 100644
index 0000000..5a4144c
--- /dev/null
+++ 
b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.cf.taste.hadoop.preparation;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.VarLongWritable;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+
+import java.io.IOException;
+
+public class ToItemVectorsMapper
+    extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> {
+
+  private final IntWritable itemID = new IntWritable();
+  private final VectorWritable itemVectorWritable = new VectorWritable();
+
+  @Override
+  protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, 
Context ctx)
+    throws IOException, InterruptedException {
+    Vector userRatings = vectorWritable.get();
+
+    int column = TasteHadoopUtils.idToIndex(rowIndex.get());
+
+    itemVectorWritable.setWritesLaxPrecision(true);
+
+    Vector itemVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1);
+    for (Vector.Element elem : userRatings.nonZeroes()) {
+      itemID.set(elem.index());
+      itemVector.setQuick(column, elem.get());
+      itemVectorWritable.set(itemVector);
+      ctx.write(itemID, itemVectorWritable);
+      // reset vector for reuse
+      itemVector.setQuick(elem.index(), 0.0);
+    }
+  }
+
+}

Reply via email to