http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java new file mode 100644 index 0000000..752bb48 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToPrefsDriver.java @@ -0,0 +1,274 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.email; + +import com.google.common.io.Closeables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.mahout.common.AbstractJob; +import org.apache.mahout.common.HadoopUtil; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.commandline.DefaultOptionCreator; +import org.apache.mahout.common.iterator.sequencefile.PathFilters; +import org.apache.mahout.common.iterator.sequencefile.PathType; +import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable; +import org.apache.mahout.math.VarIntWritable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Convert the Mail archives (see {@link org.apache.mahout.text.SequenceFilesFromMailArchives}) to a preference + * file that can be consumed by the {@link org.apache.mahout.cf.taste.hadoop.item.RecommenderJob}. + * <p/> + * This assumes the input is a Sequence File, that the key is: filename/message id and the value is a list + * (separated by the user's choosing) containing the from email and any references + * <p/> + * The output is a matrix where either the from or to are the rows (represented as longs) and the columns are the + * message ids that the user has interacted with (as a VectorWritable). This class currently does not account for + * thread hijacking. + * <p/> + * It also outputs a side table mapping the row ids to their original and the message ids to the message thread id + */ +public final class MailToPrefsDriver extends AbstractJob { + + private static final Logger log = LoggerFactory.getLogger(MailToPrefsDriver.class); + + private static final String OUTPUT_FILES_PATTERN = "part-*"; + private static final int DICTIONARY_BYTE_OVERHEAD = 4; + + public static void main(String[] args) throws Exception { + ToolRunner.run(new Configuration(), new MailToPrefsDriver(), args); + } + + @Override + public int run(String[] args) throws Exception { + addInputOption(); + addOutputOption(); + addOption(DefaultOptionCreator.overwriteOption().create()); + addOption("chunkSize", "cs", "The size of chunks to write. Default is 100 mb", "100"); + addOption("separator", "sep", "The separator used in the input file to separate to, from, subject. Default is \\n", + "\n"); + addOption("from", "f", "The position in the input text (value) where the from email is located, starting from " + + "zero (0).", "0"); + addOption("refs", "r", "The position in the input text (value) where the reference ids are located, " + + "starting from zero (0).", "1"); + addOption(buildOption("useCounts", "u", "If set, then use the number of times the user has interacted with a " + + "thread as an indication of their preference. Otherwise, use boolean preferences.", false, false, + String.valueOf(true))); + Map<String, List<String>> parsedArgs = parseArguments(args); + + Path input = getInputPath(); + Path output = getOutputPath(); + int chunkSize = Integer.parseInt(getOption("chunkSize")); + String separator = getOption("separator"); + Configuration conf = getConf(); + boolean useCounts = hasOption("useCounts"); + AtomicInteger currentPhase = new AtomicInteger(); + int[] msgDim = new int[1]; + //TODO: mod this to not do so many passes over the data. Dictionary creation could probably be a chain mapper + List<Path> msgIdChunks = null; + boolean overwrite = hasOption(DefaultOptionCreator.OVERWRITE_OPTION); + // create the dictionary between message ids and longs + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + //TODO: there seems to be a pattern emerging for dictionary creation + // -- sparse vectors from seq files also has this. + Path msgIdsPath = new Path(output, "msgIds"); + if (overwrite) { + HadoopUtil.delete(conf, msgIdsPath); + } + log.info("Creating Msg Id Dictionary"); + Job createMsgIdDictionary = prepareJob(input, + msgIdsPath, + SequenceFileInputFormat.class, + MsgIdToDictionaryMapper.class, + Text.class, + VarIntWritable.class, + MailToDictionaryReducer.class, + Text.class, + VarIntWritable.class, + SequenceFileOutputFormat.class); + + boolean succeeded = createMsgIdDictionary.waitForCompletion(true); + if (!succeeded) { + return -1; + } + //write out the dictionary at the top level + msgIdChunks = createDictionaryChunks(msgIdsPath, output, "msgIds-dictionary-", + createMsgIdDictionary.getConfiguration(), chunkSize, msgDim); + } + //create the dictionary between from email addresses and longs + List<Path> fromChunks = null; + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Path fromIdsPath = new Path(output, "fromIds"); + if (overwrite) { + HadoopUtil.delete(conf, fromIdsPath); + } + log.info("Creating From Id Dictionary"); + Job createFromIdDictionary = prepareJob(input, + fromIdsPath, + SequenceFileInputFormat.class, + FromEmailToDictionaryMapper.class, + Text.class, + VarIntWritable.class, + MailToDictionaryReducer.class, + Text.class, + VarIntWritable.class, + SequenceFileOutputFormat.class); + createFromIdDictionary.getConfiguration().set(EmailUtility.SEPARATOR, separator); + boolean succeeded = createFromIdDictionary.waitForCompletion(true); + if (!succeeded) { + return -1; + } + //write out the dictionary at the top level + int[] fromDim = new int[1]; + fromChunks = createDictionaryChunks(fromIdsPath, output, "fromIds-dictionary-", + createFromIdDictionary.getConfiguration(), chunkSize, fromDim); + } + //OK, we have our dictionaries, let's output the real thing we need: <from_id -> <msgId, msgId, msgId, ...>> + if (shouldRunNextPhase(parsedArgs, currentPhase) && fromChunks != null && msgIdChunks != null) { + //Job map + //may be a way to do this so that we can load the from ids in memory, if they are small enough so that + // we don't need the double loop + log.info("Creating recommendation matrix"); + Path vecPath = new Path(output, "recInput"); + if (overwrite) { + HadoopUtil.delete(conf, vecPath); + } + //conf.set(EmailUtility.FROM_DIMENSION, String.valueOf(fromDim[0])); + conf.set(EmailUtility.MSG_ID_DIMENSION, String.valueOf(msgDim[0])); + conf.set(EmailUtility.FROM_PREFIX, "fromIds-dictionary-"); + conf.set(EmailUtility.MSG_IDS_PREFIX, "msgIds-dictionary-"); + conf.set(EmailUtility.FROM_INDEX, getOption("from")); + conf.set(EmailUtility.REFS_INDEX, getOption("refs")); + conf.set(EmailUtility.SEPARATOR, separator); + conf.set(MailToRecReducer.USE_COUNTS_PREFERENCE, String.valueOf(useCounts)); + int j = 0; + int i = 0; + for (Path fromChunk : fromChunks) { + for (Path idChunk : msgIdChunks) { + Path out = new Path(vecPath, "tmp-" + i + '-' + j); + DistributedCache.setCacheFiles(new URI[]{fromChunk.toUri(), idChunk.toUri()}, conf); + Job createRecMatrix = prepareJob(input, out, SequenceFileInputFormat.class, + MailToRecMapper.class, Text.class, LongWritable.class, MailToRecReducer.class, Text.class, + NullWritable.class, TextOutputFormat.class); + createRecMatrix.getConfiguration().set("mapred.output.compress", "false"); + boolean succeeded = createRecMatrix.waitForCompletion(true); + if (!succeeded) { + return -1; + } + //copy the results up a level + //HadoopUtil.copyMergeSeqFiles(out.getFileSystem(conf), out, vecPath.getFileSystem(conf), outPath, true, + // conf, ""); + FileStatus[] fs = HadoopUtil.getFileStatus(new Path(out, "*"), PathType.GLOB, PathFilters.partFilter(), null, + conf); + for (int k = 0; k < fs.length; k++) { + FileStatus f = fs[k]; + Path outPath = new Path(vecPath, "chunk-" + i + '-' + j + '-' + k); + FileUtil.copy(f.getPath().getFileSystem(conf), f.getPath(), outPath.getFileSystem(conf), outPath, true, + overwrite, conf); + } + HadoopUtil.delete(conf, out); + j++; + } + i++; + } + //concat the files together + /*Path mergePath = new Path(output, "vectors.dat"); + if (overwrite) { + HadoopUtil.delete(conf, mergePath); + } + log.info("Merging together output vectors to vectors.dat in {}", output);*/ + //HadoopUtil.copyMergeSeqFiles(vecPath.getFileSystem(conf), vecPath, mergePath.getFileSystem(conf), mergePath, + // false, conf, "\n"); + } + + return 0; + } + + private static List<Path> createDictionaryChunks(Path inputPath, + Path dictionaryPathBase, + String name, + Configuration baseConf, + int chunkSizeInMegabytes, int[] maxTermDimension) + throws IOException { + List<Path> chunkPaths = new ArrayList<>(); + + Configuration conf = new Configuration(baseConf); + + FileSystem fs = FileSystem.get(inputPath.toUri(), conf); + + long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L; + int chunkIndex = 0; + Path chunkPath = new Path(dictionaryPathBase, name + chunkIndex); + chunkPaths.add(chunkPath); + + SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); + + try { + long currentChunkSize = 0; + Path filesPattern = new Path(inputPath, OUTPUT_FILES_PATTERN); + int i = 1; //start at 1, since a miss in the OpenObjectIntHashMap returns a 0 + for (Pair<Writable, Writable> record + : new SequenceFileDirIterable<>(filesPattern, PathType.GLOB, null, null, true, conf)) { + if (currentChunkSize > chunkSizeLimit) { + Closeables.close(dictWriter, false); + chunkIndex++; + + chunkPath = new Path(dictionaryPathBase, name + chunkIndex); + chunkPaths.add(chunkPath); + + dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class); + currentChunkSize = 0; + } + + Writable key = record.getFirst(); + int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8; + currentChunkSize += fieldSize; + dictWriter.append(key, new IntWritable(i++)); + } + maxTermDimension[0] = i; + } finally { + Closeables.close(dictWriter, false); + } + + return chunkPaths; + } + +}
http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java new file mode 100644 index 0000000..91bbd17 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecMapper.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.email; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.map.OpenObjectIntHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public final class MailToRecMapper extends Mapper<Text, Text, Text, LongWritable> { + + private static final Logger log = LoggerFactory.getLogger(MailToRecMapper.class); + + private final OpenObjectIntHashMap<String> fromDictionary = new OpenObjectIntHashMap<>(); + private final OpenObjectIntHashMap<String> msgIdDictionary = new OpenObjectIntHashMap<>(); + private String separator = "\n"; + private int fromIdx; + private int refsIdx; + + public enum Counters { + REFERENCE, ORIGINAL + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + Configuration conf = context.getConfiguration(); + String fromPrefix = conf.get(EmailUtility.FROM_PREFIX); + String msgPrefix = conf.get(EmailUtility.MSG_IDS_PREFIX); + fromIdx = conf.getInt(EmailUtility.FROM_INDEX, 0); + refsIdx = conf.getInt(EmailUtility.REFS_INDEX, 1); + EmailUtility.loadDictionaries(conf, fromPrefix, fromDictionary, msgPrefix, msgIdDictionary); + log.info("From Dictionary size: {} Msg Id Dictionary size: {}", fromDictionary.size(), msgIdDictionary.size()); + separator = context.getConfiguration().get(EmailUtility.SEPARATOR); + } + + @Override + protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { + + int msgIdKey = Integer.MIN_VALUE; + + + int fromKey = Integer.MIN_VALUE; + String valStr = value.toString(); + String[] splits = StringUtils.splitByWholeSeparatorPreserveAllTokens(valStr, separator); + + if (splits != null && splits.length > 0) { + if (splits.length > refsIdx) { + String from = EmailUtility.cleanUpEmailAddress(splits[fromIdx]); + fromKey = fromDictionary.get(from); + } + //get the references + if (splits.length > refsIdx) { + String[] theRefs = EmailUtility.parseReferences(splits[refsIdx]); + if (theRefs != null && theRefs.length > 0) { + //we have a reference, the first one is the original message id, so map to that one if it exists + msgIdKey = msgIdDictionary.get(theRefs[0]); + context.getCounter(Counters.REFERENCE).increment(1); + } + } + } + //we don't have any references, so use the msg id + if (msgIdKey == Integer.MIN_VALUE) { + //get the msg id and the from and output the associated ids + String keyStr = key.toString(); + int idx = keyStr.lastIndexOf('/'); + if (idx != -1) { + String msgId = keyStr.substring(idx + 1); + msgIdKey = msgIdDictionary.get(msgId); + context.getCounter(Counters.ORIGINAL).increment(1); + } + } + + if (msgIdKey != Integer.MIN_VALUE && fromKey != Integer.MIN_VALUE) { + context.write(new Text(fromKey + "," + msgIdKey), new LongWritable(1)); + } + } + + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java new file mode 100644 index 0000000..ee36a41 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MailToRecReducer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.email; + +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; + +public class MailToRecReducer extends Reducer<Text, LongWritable, Text, NullWritable> { + //if true, then output weight + private boolean useCounts = true; + /** + * We can either ignore how many times the user interacted (boolean) or output the number of times they interacted. + */ + public static final String USE_COUNTS_PREFERENCE = "useBooleanPreferences"; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + useCounts = context.getConfiguration().getBoolean(USE_COUNTS_PREFERENCE, true); + } + + @Override + protected void reduce(Text key, Iterable<LongWritable> values, Context context) + throws IOException, InterruptedException { + if (useCounts) { + long sum = 0; + for (LongWritable value : values) { + sum++; + } + context.write(new Text(key.toString() + ',' + sum), null); + } else { + context.write(new Text(key.toString()), null); + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java new file mode 100644 index 0000000..f3de847 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/email/MsgIdToDictionaryMapper.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.email; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.mahout.math.VarIntWritable; + +import java.io.IOException; + +/** + * Assumes the input is in the format created by {@link org.apache.mahout.text.SequenceFilesFromMailArchives} + */ +public final class MsgIdToDictionaryMapper extends Mapper<Text, Text, Text, VarIntWritable> { + + @Override + protected void map(Text key, Text value, Context context) throws IOException, InterruptedException { + //message id is in the key: /201008/[email protected] + String keyStr = key.toString(); + int idx = keyStr.lastIndexOf('@'); //find the last @ + if (idx == -1) { + context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1); + } else { + //found the @, now find the last slash before the @ and grab everything after that + idx = keyStr.lastIndexOf('/', idx); + String msgId = keyStr.substring(idx + 1); + if (EmailUtility.WHITESPACE.matcher(msgId).matches()) { + context.getCounter(EmailUtility.Counters.NO_MESSAGE_ID).increment(1); + } else { + context.write(new Text(msgId), new VarIntWritable(1)); + } + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java new file mode 100644 index 0000000..c358021 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterable.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.Pair; + +public final class DataFileIterable implements Iterable<Pair<PreferenceArray,long[]>> { + + private final File dataFile; + + public DataFileIterable(File dataFile) { + this.dataFile = dataFile; + } + + @Override + public Iterator<Pair<PreferenceArray, long[]>> iterator() { + try { + return new DataFileIterator(dataFile); + } catch (IOException ioe) { + throw new IllegalStateException(ioe); + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java new file mode 100644 index 0000000..786e080 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/DataFileIterator.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.util.regex.Pattern; + +import com.google.common.collect.AbstractIterator; +import com.google.common.io.Closeables; +import org.apache.mahout.cf.taste.impl.common.SkippingIterator; +import org.apache.mahout.cf.taste.impl.model.GenericUserPreferenceArray; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.iterator.FileLineIterator; +import org.apache.mahout.common.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>An {@link java.util.Iterator} which iterates over any of the KDD Cup's rating files. These include the files + * {train,test,validation}Idx{1,2}}.txt. See http://kddcup.yahoo.com/. Each element in the iteration corresponds + * to one user's ratings as a {@link PreferenceArray} and corresponding timestamps as a parallel {@code long} + * array.</p> + * + * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed + * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p> + */ +public final class DataFileIterator + extends AbstractIterator<Pair<PreferenceArray,long[]>> + implements SkippingIterator<Pair<PreferenceArray,long[]>>, Closeable { + + private static final Pattern COLON_PATTERN = Pattern.compile(":"); + private static final Pattern PIPE_PATTERN = Pattern.compile("\\|"); + private static final Pattern TAB_PATTERN = Pattern.compile("\t"); + + private final FileLineIterator lineIterator; + + private static final Logger log = LoggerFactory.getLogger(DataFileIterator.class); + + public DataFileIterator(File dataFile) throws IOException { + if (dataFile == null || dataFile.isDirectory() || !dataFile.exists()) { + throw new IllegalArgumentException("Bad data file: " + dataFile); + } + lineIterator = new FileLineIterator(dataFile); + } + + @Override + protected Pair<PreferenceArray, long[]> computeNext() { + + if (!lineIterator.hasNext()) { + return endOfData(); + } + + String line = lineIterator.next(); + // First a userID|ratingsCount line + String[] tokens = PIPE_PATTERN.split(line); + + long userID = Long.parseLong(tokens[0]); + int ratingsLeftToRead = Integer.parseInt(tokens[1]); + int ratingsRead = 0; + + PreferenceArray currentUserPrefs = new GenericUserPreferenceArray(ratingsLeftToRead); + long[] timestamps = new long[ratingsLeftToRead]; + + while (ratingsLeftToRead > 0) { + + line = lineIterator.next(); + + // Then a data line. May be 1-4 tokens depending on whether preference info is included (it's not in test data) + // or whether date info is included (not inluded in track 2). Item ID is always first, and date is the last + // two fields if it exists. + tokens = TAB_PATTERN.split(line); + boolean hasPref = tokens.length == 2 || tokens.length == 4; + boolean hasDate = tokens.length > 2; + + long itemID = Long.parseLong(tokens[0]); + + currentUserPrefs.setUserID(0, userID); + currentUserPrefs.setItemID(ratingsRead, itemID); + if (hasPref) { + float preference = Float.parseFloat(tokens[1]); + currentUserPrefs.setValue(ratingsRead, preference); + } + + if (hasDate) { + long timestamp; + if (hasPref) { + timestamp = parseFakeTimestamp(tokens[2], tokens[3]); + } else { + timestamp = parseFakeTimestamp(tokens[1], tokens[2]); + } + timestamps[ratingsRead] = timestamp; + } + + ratingsRead++; + ratingsLeftToRead--; + } + + return new Pair<>(currentUserPrefs, timestamps); + } + + @Override + public void skip(int n) { + for (int i = 0; i < n; i++) { + if (lineIterator.hasNext()) { + String line = lineIterator.next(); + // First a userID|ratingsCount line + String[] tokens = PIPE_PATTERN.split(line); + int linesToSKip = Integer.parseInt(tokens[1]); + lineIterator.skip(linesToSKip); + } else { + break; + } + } + } + + @Override + public void close() { + endOfData(); + try { + Closeables.close(lineIterator, true); + } catch (IOException e) { + log.error(e.getMessage(), e); + } + } + + /** + * @param dateString "date" in days since some undisclosed date, which we will arbitrarily assume to be the + * epoch, January 1 1970. + * @param timeString time of day in HH:mm:ss format + * @return the UNIX timestamp for this moment in time + */ + private static long parseFakeTimestamp(String dateString, CharSequence timeString) { + int days = Integer.parseInt(dateString); + String[] timeTokens = COLON_PATTERN.split(timeString); + int hours = Integer.parseInt(timeTokens[0]); + int minutes = Integer.parseInt(timeTokens[1]); + int seconds = Integer.parseInt(timeTokens[2]); + return 86400L * days + 3600L + hours + 60L * minutes + seconds; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java new file mode 100644 index 0000000..4b62050 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/KDDCupDataModel.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + +import com.google.common.base.Preconditions; +import org.apache.mahout.cf.taste.common.Refreshable; +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.impl.common.FastByIDMap; +import org.apache.mahout.cf.taste.impl.common.FastIDSet; +import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator; +import org.apache.mahout.cf.taste.impl.model.GenericDataModel; +import org.apache.mahout.cf.taste.model.DataModel; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.Pair; +import org.apache.mahout.common.iterator.SamplingIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p>An {@link DataModel} which reads into memory any of the KDD Cup's rating files; it is really + * meant for use with training data in the files trainIdx{1,2}}.txt. + * See http://kddcup.yahoo.com/.</p> + * + * <p>Timestamps in the data set are relative to some unknown point in time, for anonymity. They are assumed + * to be relative to the epoch, time 0, or January 1 1970, for purposes here.</p> + */ +public final class KDDCupDataModel implements DataModel { + + private static final Logger log = LoggerFactory.getLogger(KDDCupDataModel.class); + + private final File dataFileDirectory; + private final DataModel delegate; + + /** + * @param dataFile training rating file + */ + public KDDCupDataModel(File dataFile) throws IOException { + this(dataFile, false, 1.0); + } + + /** + * @param dataFile training rating file + * @param storeDates if true, dates are parsed and stored, otherwise not + * @param samplingRate percentage of users to keep; can be used to reduce memory requirements + */ + public KDDCupDataModel(File dataFile, boolean storeDates, double samplingRate) throws IOException { + + Preconditions.checkArgument(!Double.isNaN(samplingRate) && samplingRate > 0.0 && samplingRate <= 1.0, + "Must be: 0.0 < samplingRate <= 1.0"); + + dataFileDirectory = dataFile.getParentFile(); + + Iterator<Pair<PreferenceArray,long[]>> dataIterator = new DataFileIterator(dataFile); + if (samplingRate < 1.0) { + dataIterator = new SamplingIterator<>(dataIterator, samplingRate); + } + + FastByIDMap<PreferenceArray> userData = new FastByIDMap<>(); + FastByIDMap<FastByIDMap<Long>> timestamps = new FastByIDMap<>(); + + while (dataIterator.hasNext()) { + + Pair<PreferenceArray,long[]> pair = dataIterator.next(); + PreferenceArray userPrefs = pair.getFirst(); + long[] timestampsForPrefs = pair.getSecond(); + + userData.put(userPrefs.getUserID(0), userPrefs); + if (storeDates) { + FastByIDMap<Long> itemTimestamps = new FastByIDMap<>(); + for (int i = 0; i < timestampsForPrefs.length; i++) { + long timestamp = timestampsForPrefs[i]; + if (timestamp > 0L) { + itemTimestamps.put(userPrefs.getItemID(i), timestamp); + } + } + } + + } + + if (storeDates) { + delegate = new GenericDataModel(userData, timestamps); + } else { + delegate = new GenericDataModel(userData); + } + + Runtime runtime = Runtime.getRuntime(); + log.info("Loaded data model in about {}MB heap", (runtime.totalMemory() - runtime.freeMemory()) / 1000000); + } + + public File getDataFileDirectory() { + return dataFileDirectory; + } + + public static File getTrainingFile(File dataFileDirectory) { + return getFile(dataFileDirectory, "trainIdx"); + } + + public static File getValidationFile(File dataFileDirectory) { + return getFile(dataFileDirectory, "validationIdx"); + } + + public static File getTestFile(File dataFileDirectory) { + return getFile(dataFileDirectory, "testIdx"); + } + + public static File getTrackFile(File dataFileDirectory) { + return getFile(dataFileDirectory, "trackData"); + } + + private static File getFile(File dataFileDirectory, String prefix) { + // Works on set 1 or 2 + for (int set : new int[] {1,2}) { + // Works on sample data from before contest or real data + for (String firstLinesOrNot : new String[] {"", ".firstLines"}) { + for (String gzippedOrNot : new String[] {".gz", ""}) { + File dataFile = new File(dataFileDirectory, prefix + set + firstLinesOrNot + ".txt" + gzippedOrNot); + if (dataFile.exists()) { + return dataFile; + } + } + } + } + throw new IllegalArgumentException("Can't find " + prefix + " file in " + dataFileDirectory); + } + + @Override + public LongPrimitiveIterator getUserIDs() throws TasteException { + return delegate.getUserIDs(); + } + + @Override + public PreferenceArray getPreferencesFromUser(long userID) throws TasteException { + return delegate.getPreferencesFromUser(userID); + } + + @Override + public FastIDSet getItemIDsFromUser(long userID) throws TasteException { + return delegate.getItemIDsFromUser(userID); + } + + @Override + public LongPrimitiveIterator getItemIDs() throws TasteException { + return delegate.getItemIDs(); + } + + @Override + public PreferenceArray getPreferencesForItem(long itemID) throws TasteException { + return delegate.getPreferencesForItem(itemID); + } + + @Override + public Float getPreferenceValue(long userID, long itemID) throws TasteException { + return delegate.getPreferenceValue(userID, itemID); + } + + @Override + public Long getPreferenceTime(long userID, long itemID) throws TasteException { + return delegate.getPreferenceTime(userID, itemID); + } + + @Override + public int getNumItems() throws TasteException { + return delegate.getNumItems(); + } + + @Override + public int getNumUsers() throws TasteException { + return delegate.getNumUsers(); + } + + @Override + public int getNumUsersWithPreferenceFor(long itemID) throws TasteException { + return delegate.getNumUsersWithPreferenceFor(itemID); + } + + @Override + public int getNumUsersWithPreferenceFor(long itemID1, long itemID2) throws TasteException { + return delegate.getNumUsersWithPreferenceFor(itemID1, itemID2); + } + + @Override + public void setPreference(long userID, long itemID, float value) throws TasteException { + delegate.setPreference(userID, itemID, value); + } + + @Override + public void removePreference(long userID, long itemID) throws TasteException { + delegate.removePreference(userID, itemID); + } + + @Override + public boolean hasPreferenceValues() { + return delegate.hasPreferenceValues(); + } + + @Override + public float getMaxPreference() { + return 100.0f; + } + + @Override + public float getMinPreference() { + return 0.0f; + } + + @Override + public void refresh(Collection<Refreshable> alreadyRefreshed) { + // do nothing + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java new file mode 100644 index 0000000..3f4a732 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/ToCSV.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup; + +import org.apache.commons.io.Charsets; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.Pair; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.zip.GZIPOutputStream; + +/** + * <p>This class converts a KDD Cup input file into a compressed CSV format. The output format is + * {@code userID,itemID,score,timestamp}. It can optionally restrict its output to exclude + * score and/or timestamp.</p> + * + * <p>Run as: {@code ToCSV (input file) (output file) [num columns to output]}</p> + */ +public final class ToCSV { + + private ToCSV() { + } + + public static void main(String[] args) throws Exception { + + File inputFile = new File(args[0]); + File outputFile = new File(args[1]); + int columnsToOutput = 4; + if (args.length >= 3) { + columnsToOutput = Integer.parseInt(args[2]); + } + + OutputStream outStream = new GZIPOutputStream(new FileOutputStream(outputFile)); + + try (Writer outWriter = new BufferedWriter(new OutputStreamWriter(outStream, Charsets.UTF_8))){ + for (Pair<PreferenceArray,long[]> user : new DataFileIterable(inputFile)) { + PreferenceArray prefs = user.getFirst(); + long[] timestamps = user.getSecond(); + for (int i = 0; i < prefs.length(); i++) { + outWriter.write(String.valueOf(prefs.getUserID(i))); + outWriter.write(','); + outWriter.write(String.valueOf(prefs.getItemID(i))); + if (columnsToOutput > 2) { + outWriter.write(','); + outWriter.write(String.valueOf(prefs.getValue(i))); + } + if (columnsToOutput > 3) { + outWriter.write(','); + outWriter.write(String.valueOf(timestamps[i])); + } + outWriter.write('\n'); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java new file mode 100644 index 0000000..0112ab9 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/EstimateConverter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class EstimateConverter { + + private static final Logger log = LoggerFactory.getLogger(EstimateConverter.class); + + private EstimateConverter() {} + + public static byte convert(double estimate, long userID, long itemID) { + if (Double.isNaN(estimate)) { + log.warn("Unable to compute estimate for user {}, item {}", userID, itemID); + return 0x7F; + } else { + int scaledEstimate = (int) (estimate * 2.55); + if (scaledEstimate > 255) { + scaledEstimate = 255; + } else if (scaledEstimate < 0) { + scaledEstimate = 0; + } + return (byte) scaledEstimate; + } + } +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java new file mode 100644 index 0000000..72056da --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Callable.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.mahout.cf.taste.common.NoSuchItemException; +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.cf.taste.recommender.Recommender; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class Track1Callable implements Callable<byte[]> { + + private static final Logger log = LoggerFactory.getLogger(Track1Callable.class); + private static final AtomicInteger COUNT = new AtomicInteger(); + + private final Recommender recommender; + private final PreferenceArray userTest; + + Track1Callable(Recommender recommender, PreferenceArray userTest) { + this.recommender = recommender; + this.userTest = userTest; + } + + @Override + public byte[] call() throws TasteException { + long userID = userTest.get(0).getUserID(); + byte[] result = new byte[userTest.length()]; + for (int i = 0; i < userTest.length(); i++) { + long itemID = userTest.getItemID(i); + double estimate; + try { + estimate = recommender.estimatePreference(userID, itemID); + } catch (NoSuchItemException nsie) { + // OK in the sample data provided before the contest, should never happen otherwise + log.warn("Unknown item {}; OK unless this is the real contest data", itemID); + continue; + } + result[i] = EstimateConverter.convert(estimate, userID, itemID); + } + + if (COUNT.incrementAndGet() % 10000 == 0) { + log.info("Completed {} users", COUNT.get()); + } + + return result; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java new file mode 100644 index 0000000..067daf5 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Recommender.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import java.util.Collection; +import java.util.List; + +import org.apache.mahout.cf.taste.common.Refreshable; +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender; +import org.apache.mahout.cf.taste.impl.similarity.UncenteredCosineSimilarity; +import org.apache.mahout.cf.taste.model.DataModel; +import org.apache.mahout.cf.taste.recommender.IDRescorer; +import org.apache.mahout.cf.taste.recommender.RecommendedItem; +import org.apache.mahout.cf.taste.recommender.Recommender; +import org.apache.mahout.cf.taste.similarity.ItemSimilarity; + +public final class Track1Recommender implements Recommender { + + private final Recommender recommender; + + public Track1Recommender(DataModel dataModel) throws TasteException { + // Change this to whatever you like! + ItemSimilarity similarity = new UncenteredCosineSimilarity(dataModel); + recommender = new GenericItemBasedRecommender(dataModel, similarity); + } + + @Override + public List<RecommendedItem> recommend(long userID, int howMany) throws TasteException { + return recommender.recommend(userID, howMany); + } + + @Override + public List<RecommendedItem> recommend(long userID, int howMany, boolean includeKnownItems) throws TasteException { + return recommend(userID, howMany, null, includeKnownItems); + } + + @Override + public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer) throws TasteException { + return recommender.recommend(userID, howMany, rescorer, false); + } + + @Override + public List<RecommendedItem> recommend(long userID, int howMany, IDRescorer rescorer, boolean includeKnownItems) + throws TasteException { + return recommender.recommend(userID, howMany, rescorer, includeKnownItems); + } + + @Override + public float estimatePreference(long userID, long itemID) throws TasteException { + return recommender.estimatePreference(userID, itemID); + } + + @Override + public void setPreference(long userID, long itemID, float value) throws TasteException { + recommender.setPreference(userID, itemID, value); + } + + @Override + public void removePreference(long userID, long itemID) throws TasteException { + recommender.removePreference(userID, itemID); + } + + @Override + public DataModel getDataModel() { + return recommender.getDataModel(); + } + + @Override + public void refresh(Collection<Refreshable> alreadyRefreshed) { + recommender.refresh(alreadyRefreshed); + } + + @Override + public String toString() { + return "Track1Recommender[recommender:" + recommender + ']'; + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java new file mode 100644 index 0000000..6b9fe1b --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderBuilder.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.eval.RecommenderBuilder; +import org.apache.mahout.cf.taste.model.DataModel; +import org.apache.mahout.cf.taste.recommender.Recommender; + +final class Track1RecommenderBuilder implements RecommenderBuilder { + + @Override + public Recommender buildRecommender(DataModel dataModel) throws TasteException { + return new Track1Recommender(dataModel); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java new file mode 100644 index 0000000..bcd0a3d --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluator.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import java.io.File; +import java.util.Collection; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.Lists; +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.eval.DataModelBuilder; +import org.apache.mahout.cf.taste.eval.RecommenderBuilder; +import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable; +import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel; +import org.apache.mahout.cf.taste.impl.common.FullRunningAverage; +import org.apache.mahout.cf.taste.impl.common.FullRunningAverageAndStdDev; +import org.apache.mahout.cf.taste.impl.common.RunningAverage; +import org.apache.mahout.cf.taste.impl.common.RunningAverageAndStdDev; +import org.apache.mahout.cf.taste.impl.eval.AbstractDifferenceRecommenderEvaluator; +import org.apache.mahout.cf.taste.model.DataModel; +import org.apache.mahout.cf.taste.model.Preference; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.cf.taste.recommender.Recommender; +import org.apache.mahout.common.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Attempts to run an evaluation just like that dictated for Yahoo's KDD Cup, Track 1. + * It will compute the RMSE of a validation data set against the predicted ratings from + * the training data set. + */ +public final class Track1RecommenderEvaluator extends AbstractDifferenceRecommenderEvaluator { + + private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluator.class); + + private RunningAverage average; + private final File dataFileDirectory; + + public Track1RecommenderEvaluator(File dataFileDirectory) { + setMaxPreference(100.0f); + setMinPreference(0.0f); + average = new FullRunningAverage(); + this.dataFileDirectory = dataFileDirectory; + } + + @Override + public double evaluate(RecommenderBuilder recommenderBuilder, + DataModelBuilder dataModelBuilder, + DataModel dataModel, + double trainingPercentage, + double evaluationPercentage) throws TasteException { + + Recommender recommender = recommenderBuilder.buildRecommender(dataModel); + + Collection<Callable<Void>> estimateCallables = Lists.newArrayList(); + AtomicInteger noEstimateCounter = new AtomicInteger(); + for (Pair<PreferenceArray,long[]> userData + : new DataFileIterable(KDDCupDataModel.getValidationFile(dataFileDirectory))) { + PreferenceArray validationPrefs = userData.getFirst(); + long userID = validationPrefs.get(0).getUserID(); + estimateCallables.add( + new PreferenceEstimateCallable(recommender, userID, validationPrefs, noEstimateCounter)); + } + + RunningAverageAndStdDev timing = new FullRunningAverageAndStdDev(); + execute(estimateCallables, noEstimateCounter, timing); + + double result = computeFinalEvaluation(); + log.info("Evaluation result: {}", result); + return result; + } + + // Use RMSE scoring: + + @Override + protected void reset() { + average = new FullRunningAverage(); + } + + @Override + protected void processOneEstimate(float estimatedPreference, Preference realPref) { + double diff = realPref.getValue() - estimatedPreference; + average.addDatum(diff * diff); + } + + @Override + protected double computeFinalEvaluation() { + return Math.sqrt(average.getAverage()); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java new file mode 100644 index 0000000..deadc00 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1RecommenderEvaluatorRunner.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import java.io.File; +import java.io.IOException; + +import org.apache.commons.cli2.OptionException; +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.example.TasteOptionParser; +import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel; +import org.apache.mahout.cf.taste.model.DataModel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class Track1RecommenderEvaluatorRunner { + + private static final Logger log = LoggerFactory.getLogger(Track1RecommenderEvaluatorRunner.class); + + private Track1RecommenderEvaluatorRunner() { + } + + public static void main(String... args) throws IOException, TasteException, OptionException { + File dataFileDirectory = TasteOptionParser.getRatings(args); + if (dataFileDirectory == null) { + throw new IllegalArgumentException("No data directory"); + } + if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) { + throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory); + } + Track1RecommenderEvaluator evaluator = new Track1RecommenderEvaluator(dataFileDirectory); + DataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory)); + double evaluation = evaluator.evaluate(new Track1RecommenderBuilder(), + null, + model, + Float.NaN, + Float.NaN); + log.info(String.valueOf(evaluation)); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java new file mode 100644 index 0000000..a0ff126 --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/Track1Runner.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1; + +import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable; +import org.apache.mahout.cf.taste.example.kddcup.KDDCupDataModel; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; + +/** + * <p>Runs "track 1" of the KDD Cup competition using whatever recommender is inside {@link Track1Recommender} + * and attempts to output the result in the correct contest format.</p> + * + * <p>Run as: {@code Track1Runner [track 1 data file directory] [output file]}</p> + */ +public final class Track1Runner { + + private static final Logger log = LoggerFactory.getLogger(Track1Runner.class); + + private Track1Runner() { + } + + public static void main(String[] args) throws Exception { + + File dataFileDirectory = new File(args[0]); + if (!dataFileDirectory.exists() || !dataFileDirectory.isDirectory()) { + throw new IllegalArgumentException("Bad data file directory: " + dataFileDirectory); + } + + long start = System.currentTimeMillis(); + + KDDCupDataModel model = new KDDCupDataModel(KDDCupDataModel.getTrainingFile(dataFileDirectory)); + Track1Recommender recommender = new Track1Recommender(model); + + long end = System.currentTimeMillis(); + log.info("Loaded model in {}s", (end - start) / 1000); + start = end; + + Collection<Track1Callable> callables = new ArrayList<>(); + for (Pair<PreferenceArray,long[]> tests : new DataFileIterable(KDDCupDataModel.getTestFile(dataFileDirectory))) { + PreferenceArray userTest = tests.getFirst(); + callables.add(new Track1Callable(recommender, userTest)); + } + + int cores = Runtime.getRuntime().availableProcessors(); + log.info("Running on {} cores", cores); + ExecutorService executor = Executors.newFixedThreadPool(cores); + List<Future<byte[]>> results = executor.invokeAll(callables); + executor.shutdown(); + + end = System.currentTimeMillis(); + log.info("Ran recommendations in {}s", (end - start) / 1000); + start = end; + + try (OutputStream out = new BufferedOutputStream(new FileOutputStream(new File(args[1])))){ + for (Future<byte[]> result : results) { + for (byte estimate : result.get()) { + out.write(estimate); + } + } + } + + end = System.currentTimeMillis(); + log.info("Wrote output in {}s", (end - start) / 1000); + } + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java new file mode 100644 index 0000000..022d78c --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/DataModelFactorizablePreferences.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1.svd; + +import org.apache.mahout.cf.taste.common.TasteException; +import org.apache.mahout.cf.taste.impl.common.FastIDSet; +import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator; +import org.apache.mahout.cf.taste.impl.model.GenericPreference; +import org.apache.mahout.cf.taste.model.DataModel; +import org.apache.mahout.cf.taste.model.Preference; + +import java.util.ArrayList; +import java.util.List; + +/** + * can be used to drop {@link DataModel}s into {@link ParallelArraysSGDFactorizer} + */ +public class DataModelFactorizablePreferences implements FactorizablePreferences { + + private final FastIDSet userIDs; + private final FastIDSet itemIDs; + + private final List<Preference> preferences; + + private final float minPreference; + private final float maxPreference; + + public DataModelFactorizablePreferences(DataModel dataModel) { + + minPreference = dataModel.getMinPreference(); + maxPreference = dataModel.getMaxPreference(); + + try { + userIDs = new FastIDSet(dataModel.getNumUsers()); + itemIDs = new FastIDSet(dataModel.getNumItems()); + preferences = new ArrayList<>(); + + LongPrimitiveIterator userIDsIterator = dataModel.getUserIDs(); + while (userIDsIterator.hasNext()) { + long userID = userIDsIterator.nextLong(); + userIDs.add(userID); + for (Preference preference : dataModel.getPreferencesFromUser(userID)) { + itemIDs.add(preference.getItemID()); + preferences.add(new GenericPreference(userID, preference.getItemID(), preference.getValue())); + } + } + } catch (TasteException te) { + throw new IllegalStateException("Unable to create factorizable preferences!", te); + } + } + + @Override + public LongPrimitiveIterator getUserIDs() { + return userIDs.iterator(); + } + + @Override + public LongPrimitiveIterator getItemIDs() { + return itemIDs.iterator(); + } + + @Override + public Iterable<Preference> getPreferences() { + return preferences; + } + + @Override + public float getMinPreference() { + return minPreference; + } + + @Override + public float getMaxPreference() { + return maxPreference; + } + + @Override + public int numUsers() { + return userIDs.size(); + } + + @Override + public int numItems() { + return itemIDs.size(); + } + + @Override + public int numPreferences() { + return preferences.size(); + } +} + http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java new file mode 100644 index 0000000..a126dec --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/FactorizablePreferences.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1.svd; + +import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator; +import org.apache.mahout.cf.taste.model.Preference; + +/** + * models the necessary input for {@link ParallelArraysSGDFactorizer} + */ +public interface FactorizablePreferences { + + LongPrimitiveIterator getUserIDs(); + + LongPrimitiveIterator getItemIDs(); + + Iterable<Preference> getPreferences(); + + float getMinPreference(); + + float getMaxPreference(); + + int numUsers(); + + int numItems(); + + int numPreferences(); + +} http://git-wip-us.apache.org/repos/asf/mahout/blob/410ed16a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java ---------------------------------------------------------------------- diff --git a/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java new file mode 100644 index 0000000..6dcef6b --- /dev/null +++ b/community/mahout-mr/mr-examples/src/main/java/org/apache/mahout/cf/taste/example/kddcup/track1/svd/KDDCupFactorizablePreferences.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.mahout.cf.taste.example.kddcup.track1.svd; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import org.apache.mahout.cf.taste.example.kddcup.DataFileIterable; +import org.apache.mahout.cf.taste.impl.common.AbstractLongPrimitiveIterator; +import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator; +import org.apache.mahout.cf.taste.model.Preference; +import org.apache.mahout.cf.taste.model.PreferenceArray; +import org.apache.mahout.common.Pair; + +import java.io.File; + +public class KDDCupFactorizablePreferences implements FactorizablePreferences { + + private final File dataFile; + + public KDDCupFactorizablePreferences(File dataFile) { + this.dataFile = dataFile; + } + + @Override + public LongPrimitiveIterator getUserIDs() { + return new FixedSizeLongIterator(numUsers()); + } + + @Override + public LongPrimitiveIterator getItemIDs() { + return new FixedSizeLongIterator(numItems()); + } + + @Override + public Iterable<Preference> getPreferences() { + Iterable<Iterable<Preference>> prefIterators = + Iterables.transform(new DataFileIterable(dataFile), + new Function<Pair<PreferenceArray,long[]>,Iterable<Preference>>() { + @Override + public Iterable<Preference> apply(Pair<PreferenceArray,long[]> from) { + return from.getFirst(); + } + }); + return Iterables.concat(prefIterators); + } + + @Override + public float getMinPreference() { + return 0; + } + + @Override + public float getMaxPreference() { + return 100; + } + + @Override + public int numUsers() { + return 1000990; + } + + @Override + public int numItems() { + return 624961; + } + + @Override + public int numPreferences() { + return 252800275; + } + + static class FixedSizeLongIterator extends AbstractLongPrimitiveIterator { + + private long currentValue; + private final long maximum; + + FixedSizeLongIterator(long maximum) { + this.maximum = maximum; + currentValue = 0; + } + + @Override + public long nextLong() { + return currentValue++; + } + + @Override + public long peek() { + return currentValue; + } + + @Override + public void skip(int n) { + currentValue += n; + } + + @Override + public boolean hasNext() { + return currentValue < maximum; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + } + +}
