http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java new file mode 100644 index 0000000..fd6657f --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/als/SolveImplicitFeedbackMapper.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.als; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IntWritable; +import org.apache.mahout.math.VectorWritable; +import org.apache.mahout.math.als.ImplicitFeedbackAlternatingLeastSquaresSolver; + +import java.io.IOException; + +/** Solving mapper that can be safely executed using multiple threads */ +public class SolveImplicitFeedbackMapper + extends SharingMapper<IntWritable,VectorWritable,IntWritable,VectorWritable, + ImplicitFeedbackAlternatingLeastSquaresSolver> { + + private final VectorWritable uiOrmj = new VectorWritable(); + + @Override + ImplicitFeedbackAlternatingLeastSquaresSolver createSharedInstance(Context ctx) throws IOException { + Configuration conf = ctx.getConfiguration(); + + double lambda = Double.parseDouble(conf.get(ParallelALSFactorizationJob.LAMBDA)); + double alpha = Double.parseDouble(conf.get(ParallelALSFactorizationJob.ALPHA)); + int numFeatures = conf.getInt(ParallelALSFactorizationJob.NUM_FEATURES, -1); + int numEntities = Integer.parseInt(conf.get(ParallelALSFactorizationJob.NUM_ENTITIES)); + + Preconditions.checkArgument(numFeatures > 0, "numFeatures must be greater then 0!"); + + return new ImplicitFeedbackAlternatingLeastSquaresSolver(numFeatures, lambda, alpha, + ALS.readMatrixByRowsFromDistributedCache(numEntities, conf), 1); + } + + @Override + protected void map(IntWritable userOrItemID, VectorWritable ratingsWritable, Context ctx) + throws IOException, InterruptedException { + ImplicitFeedbackAlternatingLeastSquaresSolver solver = getSharedInstance(); + uiOrmj.set(solver.solve(ratingsWritable.get())); + ctx.write(userOrItemID, uiOrmj); + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java new file mode 100644 index 0000000..b44fd5b --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/AggregateAndRecommendReducer.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.cf.taste.hadoop.MutableRecommendedItem; +import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable; +import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils; +import org.apache.mahout.cf.taste.hadoop.TopItemsQueue; +import org.apache.mahout.cf.taste.impl.common.FastIDSet; +import org.apache.mahout.cf.taste.recommender.RecommendedItem; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.function.Functions; +import org.apache.mahout.math.map.OpenIntLongHashMap; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>computes prediction values for each user</p> + * + * <pre> + * u = a user + * i = an item not yet rated by u + * N = all items similar to i (where similarity is usually computed by pairwisely comparing the item-vectors + * of the user-item matrix) + * + * Prediction(u,i) = sum(all n from N: similarity(i,n) * rating(u,n)) / sum(all n from N: abs(similarity(i,n))) + * </pre> + */ +public final class AggregateAndRecommendReducer extends + Reducer<VarLongWritable,PrefAndSimilarityColumnWritable,VarLongWritable,RecommendedItemsWritable> { + + private static final Logger log = LoggerFactory.getLogger(AggregateAndRecommendReducer.class); + + static final String ITEMID_INDEX_PATH = "itemIDIndexPath"; + static final String NUM_RECOMMENDATIONS = "numRecommendations"; + static final int DEFAULT_NUM_RECOMMENDATIONS = 10; + static final String ITEMS_FILE = "itemsFile"; + + private boolean booleanData; + private int recommendationsPerUser; + private IDReader idReader; + private FastIDSet itemsToRecommendFor; + private OpenIntLongHashMap indexItemIDMap; + + private final RecommendedItemsWritable recommendedItems = new RecommendedItemsWritable(); + + private static final float BOOLEAN_PREF_VALUE = 1.0f; + + @Override + protected void setup(Context context) throws IOException { + Configuration conf = context.getConfiguration(); + recommendationsPerUser = conf.getInt(NUM_RECOMMENDATIONS, DEFAULT_NUM_RECOMMENDATIONS); + booleanData = conf.getBoolean(RecommenderJob.BOOLEAN_DATA, false); + indexItemIDMap = TasteHadoopUtils.readIDIndexMap(conf.get(ITEMID_INDEX_PATH), conf); + + idReader = new IDReader(conf); + idReader.readIDs(); + itemsToRecommendFor = idReader.getItemIds(); + } + + @Override + protected void reduce(VarLongWritable userID, + Iterable<PrefAndSimilarityColumnWritable> values, + Context context) throws IOException, InterruptedException { + if (booleanData) { + reduceBooleanData(userID, values, context); + } else { + reduceNonBooleanData(userID, values, context); + } + } + + private void reduceBooleanData(VarLongWritable userID, + Iterable<PrefAndSimilarityColumnWritable> values, + Context context) throws IOException, InterruptedException { + /* having boolean data, each estimated preference can only be 1, + * however we can't use this to rank the recommended items, + * so we use the sum of similarities for that. */ + Iterator<PrefAndSimilarityColumnWritable> columns = values.iterator(); + Vector predictions = columns.next().getSimilarityColumn(); + while (columns.hasNext()) { + predictions.assign(columns.next().getSimilarityColumn(), Functions.PLUS); + } + writeRecommendedItems(userID, predictions, context); + } + + private void reduceNonBooleanData(VarLongWritable userID, + Iterable<PrefAndSimilarityColumnWritable> values, + Context context) throws IOException, InterruptedException { + /* each entry here is the sum in the numerator of the prediction formula */ + Vector numerators = null; + /* each entry here is the sum in the denominator of the prediction formula */ + Vector denominators = null; + /* each entry here is the number of similar items used in the prediction formula */ + Vector numberOfSimilarItemsUsed = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); + + for (PrefAndSimilarityColumnWritable prefAndSimilarityColumn : values) { + Vector simColumn = prefAndSimilarityColumn.getSimilarityColumn(); + float prefValue = prefAndSimilarityColumn.getPrefValue(); + /* count the number of items used for each prediction */ + for (Element e : simColumn.nonZeroes()) { + int itemIDIndex = e.index(); + numberOfSimilarItemsUsed.setQuick(itemIDIndex, numberOfSimilarItemsUsed.getQuick(itemIDIndex) + 1); + } + + if (denominators == null) { + denominators = simColumn.clone(); + } else { + denominators.assign(simColumn, Functions.PLUS_ABS); + } + + if (numerators == null) { + numerators = simColumn.clone(); + if (prefValue != BOOLEAN_PREF_VALUE) { + numerators.assign(Functions.MULT, prefValue); + } + } else { + if (prefValue != BOOLEAN_PREF_VALUE) { + simColumn.assign(Functions.MULT, prefValue); + } + numerators.assign(simColumn, Functions.PLUS); + } + + } + + if (numerators == null) { + return; + } + + Vector recommendationVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); + for (Element element : numerators.nonZeroes()) { + int itemIDIndex = element.index(); + /* preference estimations must be based on at least 2 datapoints */ + if (numberOfSimilarItemsUsed.getQuick(itemIDIndex) > 1) { + /* compute normalized prediction */ + double prediction = element.get() / denominators.getQuick(itemIDIndex); + recommendationVector.setQuick(itemIDIndex, prediction); + } + } + writeRecommendedItems(userID, recommendationVector, context); + } + + /** + * find the top entries in recommendationVector, map them to the real itemIDs and write back the result + */ + private void writeRecommendedItems(VarLongWritable userID, Vector recommendationVector, Context context) + throws IOException, InterruptedException { + TopItemsQueue topKItems = new TopItemsQueue(recommendationsPerUser); + FastIDSet itemsForUser = null; + + if (idReader != null && idReader.isUserItemFilterSpecified()) { + itemsForUser = idReader.getItemsToRecommendForUser(userID.get()); + } + + for (Element element : recommendationVector.nonZeroes()) { + int index = element.index(); + long itemID; + if (indexItemIDMap != null && !indexItemIDMap.isEmpty()) { + itemID = indexItemIDMap.get(index); + } else { // we don't have any mappings, so just use the original + itemID = index; + } + + if (shouldIncludeItemIntoRecommendations(itemID, itemsToRecommendFor, itemsForUser)) { + + float value = (float) element.get(); + if (!Float.isNaN(value)) { + + MutableRecommendedItem topItem = topKItems.top(); + if (value > topItem.getValue()) { + topItem.set(itemID, value); + topKItems.updateTop(); + } + } + } + } + + List<RecommendedItem> topItems = topKItems.getTopItems(); + if (!topItems.isEmpty()) { + recommendedItems.set(topItems); + context.write(userID, recommendedItems); + } + } + + private boolean shouldIncludeItemIntoRecommendations(long itemID, FastIDSet allItemsToRecommendFor, + FastIDSet itemsForUser) { + if (allItemsToRecommendFor == null && itemsForUser == null) { + return true; + } else if (itemsForUser != null) { + return itemsForUser.contains(itemID); + } else { + return allItemsToRecommendFor.contains(itemID); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java new file mode 100644 index 0000000..7797fe9 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/IDReader.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.mahout.cf.taste.impl.common.FastIDSet; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.FileLineIterable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reads user ids and item ids from files specified in usersFile, itemsFile or userItemFile options in item-based + * recommender. Composes a list of users and a list of items which can be used by + * {@link org.apache.mahout.cf.taste.hadoop.item.UserVectorSplitterMapper} and + * {@link org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer}. + */ +public class IDReader { + + static final String USER_ITEM_FILE = "userItemFile"; + + private static final Logger log = LoggerFactory.getLogger(IDReader.class); + private static final Pattern SEPARATOR = Pattern.compile("[\t,]"); + + private Configuration conf; + + private String usersFile; + private String itemsFile; + private String userItemFile; + + private FastIDSet userIds; + private FastIDSet itemIds; + + private FastIDSet emptySet; + + /* Key - user id, value - a set of item ids to include into recommendations for this user */ + private Map<Long, FastIDSet> userItemFilter; + + /** + * Creates a new IDReader + * + * @param conf Job configuration + */ + public IDReader(Configuration conf) { + this.conf = conf; + emptySet = new FastIDSet(); + + usersFile = conf.get(UserVectorSplitterMapper.USERS_FILE); + itemsFile = conf.get(AggregateAndRecommendReducer.ITEMS_FILE); + userItemFile = conf.get(USER_ITEM_FILE); + } + + /** + * Reads user ids and item ids from files specified in a job configuration + * + * @throws IOException if an error occurs during file read operation + * + * @throws IllegalStateException if userItemFile option is specified together with usersFile or itemsFile + */ + public void readIDs() throws IOException, IllegalStateException { + if (isUserItemFileSpecified()) { + readUserItemFilterIfNeeded(); + } + + if (isUsersFileSpecified() || isUserItemFilterSpecified()) { + readUserIds(); + } + + if (isItemsFileSpecified() || isUserItemFilterSpecified()) { + readItemIds(); + } + } + + /** + * Gets a collection of items which should be recommended for a user + * + * @param userId ID of a user we are interested in + * @return if a userItemFile option is specified, and that file contains at least one item ID for the user, + * then this method returns a {@link FastIDSet} object populated with item IDs. Otherwise, this + * method returns an empty set. + */ + public FastIDSet getItemsToRecommendForUser(Long userId) { + if (isUserItemFilterSpecified() && userItemFilter.containsKey(userId)) { + return userItemFilter.get(userId); + } else { + return emptySet; + } + } + + private void readUserIds() throws IOException, IllegalStateException { + if (isUsersFileSpecified() && !isUserItemFileSpecified()) { + userIds = readIDList(usersFile); + } else if (isUserItemFileSpecified() && !isUsersFileSpecified()) { + readUserItemFilterIfNeeded(); + userIds = extractAllUserIdsFromUserItemFilter(userItemFilter); + } else if (!isUsersFileSpecified()) { + throw new IllegalStateException("Neither usersFile nor userItemFile options are specified"); + } else { + throw new IllegalStateException("usersFile and userItemFile options cannot be used simultaneously"); + } + } + + private void readItemIds() throws IOException, IllegalStateException { + if (isItemsFileSpecified() && !isUserItemFileSpecified()) { + itemIds = readIDList(itemsFile); + } else if (isUserItemFileSpecified() && !isItemsFileSpecified()) { + readUserItemFilterIfNeeded(); + itemIds = extractAllItemIdsFromUserItemFilter(userItemFilter); + } else if (!isItemsFileSpecified()) { + throw new IllegalStateException("Neither itemsFile nor userItemFile options are specified"); + } else { + throw new IllegalStateException("itemsFile and userItemFile options cannot be specified simultaneously"); + } + } + + private void readUserItemFilterIfNeeded() throws IOException { + if (!isUserItemFilterSpecified() && isUserItemFileSpecified()) { + userItemFilter = readUserItemFilter(userItemFile); + } + } + + private Map<Long, FastIDSet> readUserItemFilter(String pathString) throws IOException { + Map<Long, FastIDSet> result = new HashMap<>(); + + try (InputStream in = openFile(pathString)) { + for (String line : new FileLineIterable(in)) { + try { + String[] tokens = SEPARATOR.split(line); + Long userId = Long.parseLong(tokens[0]); + Long itemId = Long.parseLong(tokens[1]); + + addUserAndItemIdToUserItemFilter(result, userId, itemId); + } catch (NumberFormatException nfe) { + log.warn("userItemFile line ignored: {}", line); + } + } + } + + return result; + } + + void addUserAndItemIdToUserItemFilter(Map<Long, FastIDSet> filter, Long userId, Long itemId) { + FastIDSet itemIds; + + if (filter.containsKey(userId)) { + itemIds = filter.get(userId); + } else { + itemIds = new FastIDSet(); + filter.put(userId, itemIds); + } + + itemIds.add(itemId); + } + + static FastIDSet extractAllUserIdsFromUserItemFilter(Map<Long, FastIDSet> filter) { + FastIDSet result = new FastIDSet(); + + for (Long userId : filter.keySet()) { + result.add(userId); + } + + return result; + } + + private FastIDSet extractAllItemIdsFromUserItemFilter(Map<Long, FastIDSet> filter) { + FastIDSet result = new FastIDSet(); + + for (FastIDSet itemIds : filter.values()) { + result.addAll(itemIds); + } + + return result; + } + + private FastIDSet readIDList(String pathString) throws IOException { + FastIDSet result = null; + + if (pathString != null) { + result = new FastIDSet(); + + try (InputStream in = openFile(pathString)){ + for (String line : new FileLineIterable(in)) { + try { + result.add(Long.parseLong(line)); + } catch (NumberFormatException nfe) { + log.warn("line ignored: {}", line); + } + } + } + } + + return result; + } + + private InputStream openFile(String pathString) throws IOException { + return HadoopUtil.openStream(new Path(pathString), conf); + } + + public boolean isUsersFileSpecified () { + return usersFile != null; + } + + public boolean isItemsFileSpecified () { + return itemsFile != null; + } + + public boolean isUserItemFileSpecified () { + return userItemFile != null; + } + + public boolean isUserItemFilterSpecified() { + return userItemFilter != null; + } + + public FastIDSet getUserIds() { + return userIds; + } + + public FastIDSet getItemIds() { + return itemIds; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java new file mode 100644 index 0000000..4415a55 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterAsVectorAndPrefsReducer.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; + +/** + * we use a neat little trick to explicitly filter items for some users: we inject a NaN summand into the preference + * estimation for those items, which makes {@link org.apache.mahout.cf.taste.hadoop.item.AggregateAndRecommendReducer} + * automatically exclude them + */ +public class ItemFilterAsVectorAndPrefsReducer + extends Reducer<VarLongWritable,VarLongWritable,VarIntWritable,VectorAndPrefsWritable> { + + private final VarIntWritable itemIDIndexWritable = new VarIntWritable(); + private final VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(); + + @Override + protected void reduce(VarLongWritable itemID, Iterable<VarLongWritable> values, Context ctx) + throws IOException, InterruptedException { + + int itemIDIndex = TasteHadoopUtils.idToIndex(itemID.get()); + Vector vector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1); + /* artificial NaN summand to exclude this item from the recommendations for all users specified in userIDs */ + vector.set(itemIDIndex, Double.NaN); + + List<Long> userIDs = new ArrayList<>(); + List<Float> prefValues = new ArrayList<>(); + for (VarLongWritable userID : values) { + userIDs.add(userID.get()); + prefValues.add(1.0f); + } + + itemIDIndexWritable.set(itemIDIndex); + vectorAndPrefs.set(vector, userIDs, prefValues); + ctx.write(itemIDIndexWritable, vectorAndPrefs); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java new file mode 100644 index 0000000..cdc1ddf --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemFilterMapper.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.VarLongWritable; + +import java.io.IOException; +import java.util.regex.Pattern; + +/** + * map out all user/item pairs to filter, keyed by the itemID + */ +public class ItemFilterMapper extends Mapper<LongWritable,Text,VarLongWritable,VarLongWritable> { + + private static final Pattern SEPARATOR = Pattern.compile("[\t,]"); + + private final VarLongWritable itemIDWritable = new VarLongWritable(); + private final VarLongWritable userIDWritable = new VarLongWritable(); + + @Override + protected void map(LongWritable key, Text line, Context ctx) throws IOException, InterruptedException { + String[] tokens = SEPARATOR.split(line.toString()); + long userID = Long.parseLong(tokens[0]); + long itemID = Long.parseLong(tokens[1]); + itemIDWritable.set(itemID); + userIDWritable.set(userID); + ctx.write(itemIDWritable, userIDWritable); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java new file mode 100644 index 0000000..ac8597e --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexMapper.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils; +import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; + +public final class ItemIDIndexMapper extends + Mapper<LongWritable,Text, VarIntWritable, VarLongWritable> { + + private boolean transpose; + + private final VarIntWritable indexWritable = new VarIntWritable(); + private final VarLongWritable itemIDWritable = new VarLongWritable(); + + @Override + protected void setup(Context context) { + Configuration jobConf = context.getConfiguration(); + transpose = jobConf.getBoolean(ToEntityPrefsMapper.TRANSPOSE_USER_ITEM, false); + } + + @Override + protected void map(LongWritable key, + Text value, + Context context) throws IOException, InterruptedException { + String[] tokens = TasteHadoopUtils.splitPrefTokens(value.toString()); + long itemID = Long.parseLong(tokens[transpose ? 0 : 1]); + int index = TasteHadoopUtils.idToIndex(itemID); + indexWritable.set(index); + itemIDWritable.set(itemID); + context.write(indexWritable, itemIDWritable); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java new file mode 100644 index 0000000..d9ecf5e --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ItemIDIndexReducer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; + +public final class ItemIDIndexReducer extends + Reducer<VarIntWritable, VarLongWritable, VarIntWritable,VarLongWritable> { + + private final VarLongWritable minimumItemIDWritable = new VarLongWritable(); + + @Override + protected void reduce(VarIntWritable index, + Iterable<VarLongWritable> possibleItemIDs, + Context context) throws IOException, InterruptedException { + long minimumItemID = Long.MAX_VALUE; + for (VarLongWritable varLongWritable : possibleItemIDs) { + long itemID = varLongWritable.get(); + if (itemID < minimumItemID) { + minimumItemID = itemID; + } + } + if (minimumItemID != Long.MAX_VALUE) { + minimumItemIDWritable.set(minimumItemID); + context.write(index, minimumItemIDWritable); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java new file mode 100644 index 0000000..0e818f3 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PartialMultiplyMapper.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; + +/** + * maps similar items and their preference values per user + */ +public final class PartialMultiplyMapper extends + Mapper<VarIntWritable,VectorAndPrefsWritable,VarLongWritable,PrefAndSimilarityColumnWritable> { + + private final VarLongWritable userIDWritable = new VarLongWritable(); + private final PrefAndSimilarityColumnWritable prefAndSimilarityColumn = new PrefAndSimilarityColumnWritable(); + + @Override + protected void map(VarIntWritable key, + VectorAndPrefsWritable vectorAndPrefsWritable, + Context context) throws IOException, InterruptedException { + + Vector similarityMatrixColumn = vectorAndPrefsWritable.getVector(); + List<Long> userIDs = vectorAndPrefsWritable.getUserIDs(); + List<Float> prefValues = vectorAndPrefsWritable.getValues(); + + for (int i = 0; i < userIDs.size(); i++) { + long userID = userIDs.get(i); + float prefValue = prefValues.get(i); + if (!Float.isNaN(prefValue)) { + prefAndSimilarityColumn.set(prefValue, similarityMatrixColumn); + userIDWritable.set(userID); + context.write(userIDWritable, prefAndSimilarityColumn); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java new file mode 100644 index 0000000..704c74a --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/PrefAndSimilarityColumnWritable.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.mahout.common.RandomUtils; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +public final class PrefAndSimilarityColumnWritable implements Writable { + + private float prefValue; + private Vector similarityColumn; + + public PrefAndSimilarityColumnWritable() { + } + + public PrefAndSimilarityColumnWritable(float prefValue, Vector similarityColumn) { + set(prefValue, similarityColumn); + } + + public void set(float prefValue, Vector similarityColumn) { + this.prefValue = prefValue; + this.similarityColumn = similarityColumn; + } + + public float getPrefValue() { + return prefValue; + } + + public Vector getSimilarityColumn() { + return similarityColumn; + } + + @Override + public void readFields(DataInput in) throws IOException { + prefValue = in.readFloat(); + VectorWritable vw = new VectorWritable(); + vw.readFields(in); + similarityColumn = vw.get(); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeFloat(prefValue); + VectorWritable vw = new VectorWritable(similarityColumn); + vw.setWritesLaxPrecision(true); + vw.write(out); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof PrefAndSimilarityColumnWritable) { + PrefAndSimilarityColumnWritable other = (PrefAndSimilarityColumnWritable) obj; + return prefValue == other.prefValue && similarityColumn.equals(other.similarityColumn); + } + return false; + } + + @Override + public int hashCode() { + return RandomUtils.hashFloat(prefValue) + 31 * similarityColumn.hashCode(); + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java new file mode 100644 index 0000000..129db1d --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/RecommenderJob.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; +import org.apache.mahout.cf.taste.hadoop.RecommendedItemsWritable; +import org.apache.mahout.cf.taste.hadoop.preparation.PreparePreferenceMatrixJob; +import org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob; +import org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasures; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * <p>Runs a completely distributed recommender job as a series of mapreduces.</p> + * <p/> + * <p>Preferences in the input file should look like {@code userID, itemID[, preferencevalue]}</p> + * <p/> + * <p> + * Preference value is optional to accommodate applications that have no notion of a preference value (that is, the user + * simply expresses a preference for an item, but no degree of preference). + * </p> + * <p/> + * <p> + * The preference value is assumed to be parseable as a {@code double}. The user IDs and item IDs are + * parsed as {@code long}s. + * </p> + * <p/> + * <p>Command line arguments specific to this class are:</p> + * <p/> + * <ol> + * <li>--input(path): Directory containing one or more text files with the preference data</li> + * <li>--output(path): output path where recommender output should go</li> + * <li>--similarityClassname (classname): Name of vector similarity class to instantiate or a predefined similarity + * from {@link org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.VectorSimilarityMeasure}</li> + * <li>--usersFile (path): only compute recommendations for user IDs contained in this file (optional)</li> + * <li>--itemsFile (path): only include item IDs from this file in the recommendations (optional)</li> + * <li>--filterFile (path): file containing comma-separated userID,itemID pairs. Used to exclude the item from the + * recommendations for that user (optional)</li> + * <li>--numRecommendations (integer): Number of recommendations to compute per user (10)</li> + * <li>--booleanData (boolean): Treat input data as having no pref values (false)</li> + * <li>--maxPrefsPerUser (integer): Maximum number of preferences considered per user in final + * recommendation phase (10)</li> + * <li>--maxSimilaritiesPerItem (integer): Maximum number of similarities considered per item (100)</li> + * <li>--minPrefsPerUser (integer): ignore users with less preferences than this in the similarity computation (1)</li> + * <li>--maxPrefsPerUserInItemSimilarity (integer): max number of preferences to consider per user in + * the item similarity computation phase, + * users with more preferences will be sampled down (1000)</li> + * <li>--threshold (double): discard item pairs with a similarity value below this</li> + * </ol> + * <p/> + * <p>General command line options are documented in {@link AbstractJob}.</p> + * <p/> + * <p>Note that because of how Hadoop parses arguments, all "-D" arguments must appear before all other + * arguments.</p> + */ +public final class RecommenderJob extends AbstractJob { + + public static final String BOOLEAN_DATA = "booleanData"; + public static final String DEFAULT_PREPARE_PATH = "preparePreferenceMatrix"; + + private static final int DEFAULT_MAX_SIMILARITIES_PER_ITEM = 100; + private static final int DEFAULT_MAX_PREFS = 500; + private static final int DEFAULT_MIN_PREFS_PER_USER = 1; + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption("numRecommendations", "n", "Number of recommendations per user", + String.valueOf(AggregateAndRecommendReducer.DEFAULT_NUM_RECOMMENDATIONS)); + addOption("usersFile", null, "File of users to recommend for", null); + addOption("itemsFile", null, "File of items to recommend for", null); + addOption("filterFile", "f", "File containing comma-separated userID,itemID pairs. Used to exclude the item from " + + "the recommendations for that user (optional)", null); + addOption("userItemFile", "uif", "File containing comma-separated userID,itemID pairs (optional). " + + "Used to include only these items into recommendations. " + + "Cannot be used together with usersFile or itemsFile", null); + addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString()); + addOption("maxPrefsPerUser", "mxp", + "Maximum number of preferences considered per user in final recommendation phase", + String.valueOf(UserVectorSplitterMapper.DEFAULT_MAX_PREFS_PER_USER_CONSIDERED)); + addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this in the similarity computation " + + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER)); + addOption("maxSimilaritiesPerItem", "m", "Maximum number of similarities considered per item ", + String.valueOf(DEFAULT_MAX_SIMILARITIES_PER_ITEM)); + addOption("maxPrefsInItemSimilarity", "mpiis", "max number of preferences to consider per user or item in the " + + "item similarity computation phase, users or items with more preferences will be sampled down (default: " + + DEFAULT_MAX_PREFS + ')', String.valueOf(DEFAULT_MAX_PREFS)); + addOption("similarityClassname", "s", "Name of distributed similarity measures class to instantiate, " + + "alternatively use one of the predefined similarities (" + VectorSimilarityMeasures.list() + ')', true); + addOption("threshold", "tr", "discard item pairs with a similarity value below this", false); + addOption("outputPathForSimilarityMatrix", "opfsm", "write the item similarity matrix to this path (optional)", + false); + addOption("randomSeed", null, "use this seed for sampling", false); + addFlag("sequencefileOutput", null, "write the output into a SequenceFile instead of a text file"); + + Map<String, List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + + Path outputPath = getOutputPath(); + int numRecommendations = Integer.parseInt(getOption("numRecommendations")); + String usersFile = getOption("usersFile"); + String itemsFile = getOption("itemsFile"); + String filterFile = getOption("filterFile"); + String userItemFile = getOption("userItemFile"); + boolean booleanData = Boolean.valueOf(getOption("booleanData")); + int maxPrefsPerUser = Integer.parseInt(getOption("maxPrefsPerUser")); + int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser")); + int maxPrefsInItemSimilarity = Integer.parseInt(getOption("maxPrefsInItemSimilarity")); + int maxSimilaritiesPerItem = Integer.parseInt(getOption("maxSimilaritiesPerItem")); + String similarityClassname = getOption("similarityClassname"); + double threshold = hasOption("threshold") + ? Double.parseDouble(getOption("threshold")) : RowSimilarityJob.NO_THRESHOLD; + long randomSeed = hasOption("randomSeed") + ? Long.parseLong(getOption("randomSeed")) : RowSimilarityJob.NO_FIXED_RANDOM_SEED; + + + Path prepPath = getTempPath(DEFAULT_PREPARE_PATH); + Path similarityMatrixPath = getTempPath("similarityMatrix"); + Path explicitFilterPath = getTempPath("explicitFilterPath"); + Path partialMultiplyPath = getTempPath("partialMultiply"); + + AtomicInteger currentPhase = new AtomicInteger(); + + int numberOfUsers = -1; + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + ToolRunner.run(getConf(), new PreparePreferenceMatrixJob(), new String[]{ + "--input", getInputPath().toString(), + "--output", prepPath.toString(), + "--minPrefsPerUser", String.valueOf(minPrefsPerUser), + "--booleanData", String.valueOf(booleanData), + "--tempDir", getTempPath().toString(), + }); + + numberOfUsers = HadoopUtil.readInt(new Path(prepPath, PreparePreferenceMatrixJob.NUM_USERS), getConf()); + } + + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + + /* special behavior if phase 1 is skipped */ + if (numberOfUsers == -1) { + numberOfUsers = (int) HadoopUtil.countRecords(new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS), + PathType.LIST, null, getConf()); + } + + //calculate the co-occurrence matrix + ToolRunner.run(getConf(), new RowSimilarityJob(), new String[]{ + "--input", new Path(prepPath, PreparePreferenceMatrixJob.RATING_MATRIX).toString(), + "--output", similarityMatrixPath.toString(), + "--numberOfColumns", String.valueOf(numberOfUsers), + "--similarityClassname", similarityClassname, + "--maxObservationsPerRow", String.valueOf(maxPrefsInItemSimilarity), + "--maxObservationsPerColumn", String.valueOf(maxPrefsInItemSimilarity), + "--maxSimilaritiesPerRow", String.valueOf(maxSimilaritiesPerItem), + "--excludeSelfSimilarity", String.valueOf(Boolean.TRUE), + "--threshold", String.valueOf(threshold), + "--randomSeed", String.valueOf(randomSeed), + "--tempDir", getTempPath().toString(), + }); + + // write out the similarity matrix if the user specified that behavior + if (hasOption("outputPathForSimilarityMatrix")) { + Path outputPathForSimilarityMatrix = new Path(getOption("outputPathForSimilarityMatrix")); + + Job outputSimilarityMatrix = prepareJob(similarityMatrixPath, outputPathForSimilarityMatrix, + SequenceFileInputFormat.class, ItemSimilarityJob.MostSimilarItemPairsMapper.class, + EntityEntityWritable.class, DoubleWritable.class, ItemSimilarityJob.MostSimilarItemPairsReducer.class, + EntityEntityWritable.class, DoubleWritable.class, TextOutputFormat.class); + + Configuration mostSimilarItemsConf = outputSimilarityMatrix.getConfiguration(); + mostSimilarItemsConf.set(ItemSimilarityJob.ITEM_ID_INDEX_PATH_STR, + new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString()); + mostSimilarItemsConf.setInt(ItemSimilarityJob.MAX_SIMILARITIES_PER_ITEM, maxSimilaritiesPerItem); + outputSimilarityMatrix.waitForCompletion(true); + } + } + + //start the multiplication of the co-occurrence matrix by the user vectors + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job partialMultiply = Job.getInstance(getConf(), "partialMultiply"); + Configuration partialMultiplyConf = partialMultiply.getConfiguration(); + + MultipleInputs.addInputPath(partialMultiply, similarityMatrixPath, SequenceFileInputFormat.class, + SimilarityMatrixRowWrapperMapper.class); + MultipleInputs.addInputPath(partialMultiply, new Path(prepPath, PreparePreferenceMatrixJob.USER_VECTORS), + SequenceFileInputFormat.class, UserVectorSplitterMapper.class); + partialMultiply.setJarByClass(ToVectorAndPrefReducer.class); + partialMultiply.setMapOutputKeyClass(VarIntWritable.class); + partialMultiply.setMapOutputValueClass(VectorOrPrefWritable.class); + partialMultiply.setReducerClass(ToVectorAndPrefReducer.class); + partialMultiply.setOutputFormatClass(SequenceFileOutputFormat.class); + partialMultiply.setOutputKeyClass(VarIntWritable.class); + partialMultiply.setOutputValueClass(VectorAndPrefsWritable.class); + partialMultiplyConf.setBoolean("mapred.compress.map.output", true); + partialMultiplyConf.set("mapred.output.dir", partialMultiplyPath.toString()); + + if (usersFile != null) { + partialMultiplyConf.set(UserVectorSplitterMapper.USERS_FILE, usersFile); + } + + if (userItemFile != null) { + partialMultiplyConf.set(IDReader.USER_ITEM_FILE, userItemFile); + } + + partialMultiplyConf.setInt(UserVectorSplitterMapper.MAX_PREFS_PER_USER_CONSIDERED, maxPrefsPerUser); + + boolean succeeded = partialMultiply.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + //filter out any users we don't care about + /* convert the user/item pairs to filter if a filterfile has been specified */ + if (filterFile != null) { + Job itemFiltering = prepareJob(new Path(filterFile), explicitFilterPath, TextInputFormat.class, + ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class, + ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class, + SequenceFileOutputFormat.class); + boolean succeeded = itemFiltering.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + String aggregateAndRecommendInput = partialMultiplyPath.toString(); + if (filterFile != null) { + aggregateAndRecommendInput += "," + explicitFilterPath; + } + + Class<? extends OutputFormat> outputFormat = parsedArgs.containsKey("--sequencefileOutput") + ? SequenceFileOutputFormat.class : TextOutputFormat.class; + + //extract out the recommendations + Job aggregateAndRecommend = prepareJob( + new Path(aggregateAndRecommendInput), outputPath, SequenceFileInputFormat.class, + PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class, + AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class, + outputFormat); + Configuration aggregateAndRecommendConf = aggregateAndRecommend.getConfiguration(); + if (itemsFile != null) { + aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMS_FILE, itemsFile); + } + + if (userItemFile != null) { + aggregateAndRecommendConf.set(IDReader.USER_ITEM_FILE, userItemFile); + } + + if (filterFile != null) { + setS3SafeCombinedInputPath(aggregateAndRecommend, getTempPath(), partialMultiplyPath, explicitFilterPath); + } + setIOSort(aggregateAndRecommend); + aggregateAndRecommendConf.set(AggregateAndRecommendReducer.ITEMID_INDEX_PATH, + new Path(prepPath, PreparePreferenceMatrixJob.ITEMID_INDEX).toString()); + aggregateAndRecommendConf.setInt(AggregateAndRecommendReducer.NUM_RECOMMENDATIONS, numRecommendations); + aggregateAndRecommendConf.setBoolean(BOOLEAN_DATA, booleanData); + boolean succeeded = aggregateAndRecommend.waitForCompletion(true); + if (!succeeded) { + return -1; + } + } + + return 0; + } + + private static void setIOSort(JobContext job) { + Configuration conf = job.getConfiguration(); + conf.setInt("io.sort.factor", 100); + String javaOpts = conf.get("mapred.map.child.java.opts"); // new arg name + if (javaOpts == null) { + javaOpts = conf.get("mapred.child.java.opts"); // old arg name + } + int assumedHeapSize = 512; + if (javaOpts != null) { + Matcher m = Pattern.compile("-Xmx([0-9]+)([mMgG])").matcher(javaOpts); + if (m.find()) { + assumedHeapSize = Integer.parseInt(m.group(1)); + String megabyteOrGigabyte = m.group(2); + if ("g".equalsIgnoreCase(megabyteOrGigabyte)) { + assumedHeapSize *= 1024; + } + } + } + // Cap this at 1024MB now; see https://issues.apache.org/jira/browse/MAPREDUCE-2308 + conf.setInt("io.sort.mb", Math.min(assumedHeapSize / 2, 1024)); + // For some reason the Merger doesn't report status for a long time; increase + // timeout when running these jobs + conf.setInt("mapred.task.timeout", 60 * 60 * 1000); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new RecommenderJob(), args); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java new file mode 100644 index 0000000..8ae8215 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/SimilarityMatrixRowWrapperMapper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** + * maps a row of the similarity matrix to a {@link VectorOrPrefWritable} + * + * actually a column from that matrix has to be used but as the similarity matrix is symmetric, + * we can use a row instead of having to transpose it + */ +public final class SimilarityMatrixRowWrapperMapper extends + Mapper<IntWritable,VectorWritable,VarIntWritable,VectorOrPrefWritable> { + + private final VarIntWritable index = new VarIntWritable(); + private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable(); + + @Override + protected void map(IntWritable key, + VectorWritable value, + Context context) throws IOException, InterruptedException { + Vector similarityMatrixRow = value.get(); + /* remove self similarity */ + similarityMatrixRow.set(key.get(), Double.NaN); + + index.set(key.get()); + vectorOrPref.set(similarityMatrixRow); + + context.write(index, vectorOrPref); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java new file mode 100644 index 0000000..e6e47fd --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToUserVectorsReducer.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; +import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +/** + * <h1>Input</h1> + * + * <p> + * Takes user IDs as {@link VarLongWritable} mapped to all associated item IDs and preference values, as + * {@link EntityPrefWritable}s. + * </p> + * + * <h1>Output</h1> + * + * <p> + * The same user ID mapped to a {@link RandomAccessSparseVector} representation of the same item IDs and + * preference values. Item IDs are used as vector indexes; they are hashed into ints to work as indexes with + * {@link TasteHadoopUtils#idToIndex(long)}. The mapping is remembered for later with a combination of + * {@link ItemIDIndexMapper} and {@link ItemIDIndexReducer}. + * </p> + */ +public final class ToUserVectorsReducer extends + Reducer<VarLongWritable,VarLongWritable,VarLongWritable,VectorWritable> { + + public static final String MIN_PREFERENCES_PER_USER = ToUserVectorsReducer.class.getName() + + ".minPreferencesPerUser"; + + private int minPreferences; + + public enum Counters { USERS } + + private final VectorWritable userVectorWritable = new VectorWritable(); + + @Override + protected void setup(Context ctx) throws IOException, InterruptedException { + super.setup(ctx); + minPreferences = ctx.getConfiguration().getInt(MIN_PREFERENCES_PER_USER, 1); + } + + @Override + protected void reduce(VarLongWritable userID, + Iterable<VarLongWritable> itemPrefs, + Context context) throws IOException, InterruptedException { + Vector userVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 100); + for (VarLongWritable itemPref : itemPrefs) { + int index = TasteHadoopUtils.idToIndex(itemPref.get()); + float value = itemPref instanceof EntityPrefWritable ? ((EntityPrefWritable) itemPref).getPrefValue() : 1.0f; + userVector.set(index, value); + } + + if (userVector.getNumNondefaultElements() >= minPreferences) { + userVectorWritable.set(userVector); + userVectorWritable.setWritesLaxPrecision(true); + context.getCounter(Counters.USERS).increment(1); + context.write(userID, userVectorWritable); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java new file mode 100644 index 0000000..9167437 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/ToVectorAndPrefReducer.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.Vector; + +public final class ToVectorAndPrefReducer extends + Reducer<VarIntWritable,VectorOrPrefWritable,VarIntWritable,VectorAndPrefsWritable> { + + private final VectorAndPrefsWritable vectorAndPrefs = new VectorAndPrefsWritable(); + + @Override + protected void reduce(VarIntWritable key, + Iterable<VectorOrPrefWritable> values, + Context context) throws IOException, InterruptedException { + + List<Long> userIDs = new ArrayList<>(); + List<Float> prefValues = new ArrayList<>(); + Vector similarityMatrixColumn = null; + for (VectorOrPrefWritable value : values) { + if (value.getVector() == null) { + // Then this is a user-pref value + userIDs.add(value.getUserID()); + prefValues.add(value.getValue()); + } else { + // Then this is the column vector + if (similarityMatrixColumn != null) { + throw new IllegalStateException("Found two similarity-matrix columns for item index " + key.get()); + } + similarityMatrixColumn = value.getVector(); + } + } + + if (similarityMatrixColumn == null) { + return; + } + + vectorAndPrefs.set(similarityMatrixColumn, userIDs, prefValues); + context.write(key, vectorAndPrefs); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java new file mode 100644 index 0000000..2290d06 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/UserVectorSplitterMapper.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.lucene.util.PriorityQueue; +import org.apache.mahout.cf.taste.impl.common.FastIDSet; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.Vector.Element; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class UserVectorSplitterMapper extends + Mapper<VarLongWritable,VectorWritable, VarIntWritable,VectorOrPrefWritable> { + + private static final Logger log = LoggerFactory.getLogger(UserVectorSplitterMapper.class); + + static final String USERS_FILE = "usersFile"; + static final String MAX_PREFS_PER_USER_CONSIDERED = "maxPrefsPerUserConsidered"; + static final int DEFAULT_MAX_PREFS_PER_USER_CONSIDERED = 10; + + private int maxPrefsPerUserConsidered; + private FastIDSet usersToRecommendFor; + + private final VarIntWritable itemIndexWritable = new VarIntWritable(); + private final VectorOrPrefWritable vectorOrPref = new VectorOrPrefWritable(); + + @Override + protected void setup(Context context) throws IOException { + Configuration jobConf = context.getConfiguration(); + maxPrefsPerUserConsidered = jobConf.getInt(MAX_PREFS_PER_USER_CONSIDERED, DEFAULT_MAX_PREFS_PER_USER_CONSIDERED); + + IDReader idReader = new IDReader (jobConf); + idReader.readIDs(); + usersToRecommendFor = idReader.getUserIds(); + } + + @Override + protected void map(VarLongWritable key, + VectorWritable value, + Context context) throws IOException, InterruptedException { + long userID = key.get(); + + log.info("UserID = {}", userID); + + if (usersToRecommendFor != null && !usersToRecommendFor.contains(userID)) { + return; + } + Vector userVector = maybePruneUserVector(value.get()); + + for (Element e : userVector.nonZeroes()) { + itemIndexWritable.set(e.index()); + vectorOrPref.set(userID, (float) e.get()); + context.write(itemIndexWritable, vectorOrPref); + } + } + + private Vector maybePruneUserVector(Vector userVector) { + if (userVector.getNumNondefaultElements() <= maxPrefsPerUserConsidered) { + return userVector; + } + + float smallestLargeValue = findSmallestLargeValue(userVector); + + // "Blank out" small-sized prefs to reduce the amount of partial products + // generated later. They're not zeroed, but NaN-ed, so they come through + // and can be used to exclude these items from prefs. + for (Element e : userVector.nonZeroes()) { + float absValue = Math.abs((float) e.get()); + if (absValue < smallestLargeValue) { + e.set(Float.NaN); + } + } + + return userVector; + } + + private float findSmallestLargeValue(Vector userVector) { + + PriorityQueue<Float> topPrefValues = new PriorityQueue<Float>(maxPrefsPerUserConsidered) { + @Override + protected boolean lessThan(Float f1, Float f2) { + return f1 < f2; + } + }; + + for (Element e : userVector.nonZeroes()) { + float absValue = Math.abs((float) e.get()); + topPrefValues.insertWithOverflow(absValue); + } + return topPrefValues.top(); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java new file mode 100644 index 0000000..11d496f --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorAndPrefsWritable.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.mahout.math.Varint; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +public final class VectorAndPrefsWritable implements Writable { + + private Vector vector; + private List<Long> userIDs; + private List<Float> values; + + public VectorAndPrefsWritable() { + } + + public VectorAndPrefsWritable(Vector vector, List<Long> userIDs, List<Float> values) { + set(vector, userIDs, values); + } + + public void set(Vector vector, List<Long> userIDs, List<Float> values) { + this.vector = vector; + this.userIDs = userIDs; + this.values = values; + } + + public Vector getVector() { + return vector; + } + + public List<Long> getUserIDs() { + return userIDs; + } + + public List<Float> getValues() { + return values; + } + + @Override + public void write(DataOutput out) throws IOException { + VectorWritable vw = new VectorWritable(vector); + vw.setWritesLaxPrecision(true); + vw.write(out); + Varint.writeUnsignedVarInt(userIDs.size(), out); + for (int i = 0; i < userIDs.size(); i++) { + Varint.writeSignedVarLong(userIDs.get(i), out); + out.writeFloat(values.get(i)); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + VectorWritable writable = new VectorWritable(); + writable.readFields(in); + vector = writable.get(); + int size = Varint.readUnsignedVarInt(in); + userIDs = new ArrayList<>(size); + values = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + userIDs.add(Varint.readSignedVarLong(in)); + values.add(in.readFloat()); + } + } + + @Override + public String toString() { + return vector + "\t" + userIDs + '\t' + values; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java new file mode 100644 index 0000000..515d7ea --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/item/VectorOrPrefWritable.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.item; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.mahout.math.Varint; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +public final class VectorOrPrefWritable implements Writable { + + private Vector vector; + private long userID; + private float value; + + public VectorOrPrefWritable() { + } + + public VectorOrPrefWritable(Vector vector) { + this.vector = vector; + } + + public VectorOrPrefWritable(long userID, float value) { + this.userID = userID; + this.value = value; + } + + public Vector getVector() { + return vector; + } + + public long getUserID() { + return userID; + } + + public float getValue() { + return value; + } + + void set(Vector vector) { + this.vector = vector; + this.userID = Long.MIN_VALUE; + this.value = Float.NaN; + } + + public void set(long userID, float value) { + this.vector = null; + this.userID = userID; + this.value = value; + } + + @Override + public void write(DataOutput out) throws IOException { + if (vector == null) { + out.writeBoolean(false); + Varint.writeSignedVarLong(userID, out); + out.writeFloat(value); + } else { + out.writeBoolean(true); + VectorWritable vw = new VectorWritable(vector); + vw.setWritesLaxPrecision(true); + vw.write(out); + } + } + + @Override + public void readFields(DataInput in) throws IOException { + boolean hasVector = in.readBoolean(); + if (hasVector) { + VectorWritable writable = new VectorWritable(); + writable.readFields(in); + set(writable.get()); + } else { + long theUserID = Varint.readSignedVarLong(in); + float theValue = in.readFloat(); + set(theUserID, theValue); + } + } + + @Override + public String toString() { + return vector == null ? userID + ":" + value : vector.toString(); + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java new file mode 100644 index 0000000..c64ee38 --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/PreparePreferenceMatrixJob.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.preparation; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; +import org.apache.mahout.cf.taste.hadoop.ToEntityPrefsMapper; +import org.apache.mahout.cf.taste.hadoop.ToItemPrefsMapper; +import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexMapper; +import org.apache.mahout.cf.taste.hadoop.item.ItemIDIndexReducer; +import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob; +import org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.math.VarIntWritable; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.VectorWritable; + +import java.util.List; +import java.util.Map; + +public class PreparePreferenceMatrixJob extends AbstractJob { + + public static final String NUM_USERS = "numUsers.bin"; + public static final String ITEMID_INDEX = "itemIDIndex"; + public static final String USER_VECTORS = "userVectors"; + public static final String RATING_MATRIX = "ratingMatrix"; + + private static final int DEFAULT_MIN_PREFS_PER_USER = 1; + + public static void main(String[] args) throws Exception { + ToolRunner.run(new PreparePreferenceMatrixJob(), args); + } + + @Override + public int run(String[] args) throws Exception { + + addInputOption(); + addOutputOption(); + addOption("minPrefsPerUser", "mp", "ignore users with less preferences than this " + + "(default: " + DEFAULT_MIN_PREFS_PER_USER + ')', String.valueOf(DEFAULT_MIN_PREFS_PER_USER)); + addOption("booleanData", "b", "Treat input as without pref values", Boolean.FALSE.toString()); + addOption("ratingShift", "rs", "shift ratings by this value", "0.0"); + + Map<String, List<String>> parsedArgs = parseArguments(args); + if (parsedArgs == null) { + return -1; + } + + int minPrefsPerUser = Integer.parseInt(getOption("minPrefsPerUser")); + boolean booleanData = Boolean.valueOf(getOption("booleanData")); + float ratingShift = Float.parseFloat(getOption("ratingShift")); + //convert items to an internal index + Job itemIDIndex = prepareJob(getInputPath(), getOutputPath(ITEMID_INDEX), TextInputFormat.class, + ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class, ItemIDIndexReducer.class, + VarIntWritable.class, VarLongWritable.class, SequenceFileOutputFormat.class); + itemIDIndex.setCombinerClass(ItemIDIndexReducer.class); + boolean succeeded = itemIDIndex.waitForCompletion(true); + if (!succeeded) { + return -1; + } + //convert user preferences into a vector per user + Job toUserVectors = prepareJob(getInputPath(), + getOutputPath(USER_VECTORS), + TextInputFormat.class, + ToItemPrefsMapper.class, + VarLongWritable.class, + booleanData ? VarLongWritable.class : EntityPrefWritable.class, + ToUserVectorsReducer.class, + VarLongWritable.class, + VectorWritable.class, + SequenceFileOutputFormat.class); + toUserVectors.getConfiguration().setBoolean(RecommenderJob.BOOLEAN_DATA, booleanData); + toUserVectors.getConfiguration().setInt(ToUserVectorsReducer.MIN_PREFERENCES_PER_USER, minPrefsPerUser); + toUserVectors.getConfiguration().set(ToEntityPrefsMapper.RATING_SHIFT, String.valueOf(ratingShift)); + succeeded = toUserVectors.waitForCompletion(true); + if (!succeeded) { + return -1; + } + //we need the number of users later + int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue(); + HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf()); + //build the rating matrix + Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX), + ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class, + IntWritable.class, VectorWritable.class); + toItemVectors.setCombinerClass(ToItemVectorsReducer.class); + + succeeded = toItemVectors.waitForCompletion(true); + if (!succeeded) { + return -1; + } + + return 0; + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java new file mode 100644 index 0000000..5a4144c --- /dev/null +++ b/community/mahout-mr/mr/src/main/java/org/apache/mahout/cf/taste/hadoop/preparation/ToItemVectorsMapper.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.hadoop.preparation; + +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.cf.taste.hadoop.TasteHadoopUtils; +import org.apache.mahout.math.RandomAccessSparseVector; +import org.apache.mahout.math.VarLongWritable; +import org.apache.mahout.math.Vector; +import org.apache.mahout.math.VectorWritable; + +import java.io.IOException; + +public class ToItemVectorsMapper + extends Mapper<VarLongWritable,VectorWritable,IntWritable,VectorWritable> { + + private final IntWritable itemID = new IntWritable(); + private final VectorWritable itemVectorWritable = new VectorWritable(); + + @Override + protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx) + throws IOException, InterruptedException { + Vector userRatings = vectorWritable.get(); + + int column = TasteHadoopUtils.idToIndex(rowIndex.get()); + + itemVectorWritable.setWritesLaxPrecision(true); + + Vector itemVector = new RandomAccessSparseVector(Integer.MAX_VALUE, 1); + for (Vector.Element elem : userRatings.nonZeroes()) { + itemID.set(elem.index()); + itemVector.setQuick(column, elem.get()); + itemVectorWritable.set(itemVector); + ctx.write(itemID, itemVectorWritable); + // reset vector for reuse + itemVector.setQuick(elem.index(), 0.0); + } + } + +}
