Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityJob.java Wed May 26 18:59:02 2010 @@ -22,20 +22,23 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.Charset; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.cli2.Option; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; +import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Partitioner; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; @@ -115,7 +118,7 @@ public final class ItemSimilarityJob ext "org.apache.mahout.cf.taste.hadoop.similarity.item.ItemSimilarityJob.numberOfUsers"; @Override - public int run(String[] args) throws IOException { + public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Option similarityClassOpt = AbstractJob.buildOption("similarityClassname", "s", "Name of distributed similarity class to instantiate"); @@ -129,77 +132,79 @@ public final class ItemSimilarityJob ext String distributedSimilarityClassname = parsedArgs.get("--similarityClassname"); - String inputPath = originalConf.get("mapred.input.dir"); - String outputPath = originalConf.get("mapred.output.dir"); - String tempDirPath = parsedArgs.get("--tempDir"); - - String countUsersPath = tempDirPath + "/countUsers"; - String itemVectorsPath = tempDirPath + "/itemVectors"; - String userVectorsPath = tempDirPath + "/userVectors"; - - /* count all unique users */ - JobConf countUsers = prepareJobConf(inputPath, - countUsersPath, - TextInputFormat.class, - CountUsersMapper.class, - CountUsersKeyWritable.class, - VarLongWritable.class, - CountUsersReducer.class, - VarIntWritable.class, - NullWritable.class, - TextOutputFormat.class); - - countUsers.setPartitionerClass( - CountUsersKeyWritable.CountUsersPartitioner.class); - countUsers.setOutputValueGroupingComparator( - CountUsersKeyWritable.CountUsersGroupComparator.class); - - JobClient.runJob(countUsers); - - int numberOfUsers = - readNumberOfUsers(countUsers, (countUsersPath + "/part-00000")); - - JobConf itemVectors = prepareJobConf(inputPath, - itemVectorsPath, - TextInputFormat.class, - ToUserPrefsMapper.class, - VarLongWritable.class, - EntityPrefWritable.class, - ToItemVectorReducer.class, - VarLongWritable.class, - EntityPrefWritableArrayWritable.class, - SequenceFileOutputFormat.class); - JobClient.runJob(itemVectors); - - JobConf userVectors = prepareJobConf(itemVectorsPath, - userVectorsPath, - SequenceFileInputFormat.class, - PreferredItemsPerUserMapper.class, - VarLongWritable.class, - ItemPrefWithItemVectorWeightWritable.class, - PreferredItemsPerUserReducer.class, - VarLongWritable.class, - ItemPrefWithItemVectorWeightArrayWritable.class, - SequenceFileOutputFormat.class); - - userVectors.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname); - JobClient.runJob(userVectors); - - JobConf similarity = prepareJobConf(userVectorsPath, - outputPath, - SequenceFileInputFormat.class, - CopreferredItemsMapper.class, - ItemPairWritable.class, - CoRating.class, - SimilarityReducer.class, - EntityEntityWritable.class, - DoubleWritable.class, - TextOutputFormat.class); + Path inputPath = new Path(originalConf.get("mapred.input.dir")); + Path outputPath = new Path(originalConf.get("mapred.output.dir")); + Path tempDirPath = new Path(parsedArgs.get("--tempDir")); + + Path countUsersPath = new Path(tempDirPath, "countUsers"); + Path itemVectorsPath = new Path(tempDirPath, "itemVectors"); + Path userVectorsPath = new Path(tempDirPath, "userVectors"); + + AtomicInteger currentPhase = new AtomicInteger(); + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + /* count all unique users */ + Job countUsers = prepareJob(inputPath, + countUsersPath, + TextInputFormat.class, + CountUsersMapper.class, + CountUsersKeyWritable.class, + VarLongWritable.class, + CountUsersReducer.class, + VarIntWritable.class, + NullWritable.class, + TextOutputFormat.class); + countUsers.setPartitionerClass(CountUsersKeyWritable.CountUsersPartitioner.class); + countUsers.setGroupingComparatorClass(CountUsersKeyWritable.CountUsersGroupComparator.class); + countUsers.waitForCompletion(true); + } - similarity.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname); - similarity.setInt(NUMBER_OF_USERS, numberOfUsers); + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job itemVectors = prepareJob(inputPath, + itemVectorsPath, + TextInputFormat.class, + ToUserPrefsMapper.class, + VarLongWritable.class, + EntityPrefWritable.class, + ToItemVectorReducer.class, + VarLongWritable.class, + EntityPrefWritableArrayWritable.class, + SequenceFileOutputFormat.class); + itemVectors.waitForCompletion(true); + } - JobClient.runJob(similarity); + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job userVectors = prepareJob(itemVectorsPath, + userVectorsPath, + SequenceFileInputFormat.class, + PreferredItemsPerUserMapper.class, + VarLongWritable.class, + ItemPrefWithItemVectorWeightWritable.class, + PreferredItemsPerUserReducer.class, + VarLongWritable.class, + ItemPrefWithItemVectorWeightArrayWritable.class, + SequenceFileOutputFormat.class); + userVectors.getConfiguration().set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname); + userVectors.waitForCompletion(true); + } + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job similarity = prepareJob(userVectorsPath, + outputPath, + SequenceFileInputFormat.class, + CopreferredItemsMapper.class, + ItemPairWritable.class, + CoRating.class, + SimilarityReducer.class, + EntityEntityWritable.class, + DoubleWritable.class, + TextOutputFormat.class); + Configuration conf = similarity.getConfiguration(); + int numberOfUsers = readNumberOfUsers(conf, countUsersPath); + conf.set(DISTRIBUTED_SIMILARITY_CLASSNAME, distributedSimilarityClassname); + conf.setInt(NUMBER_OF_USERS, numberOfUsers); + similarity.waitForCompletion(true); + } return 0; } @@ -208,11 +213,17 @@ public final class ItemSimilarityJob ext ToolRunner.run(new ItemSimilarityJob(), args); } - static int readNumberOfUsers(JobConf conf, String outputFile) throws IOException { + static int readNumberOfUsers(Configuration conf, Path outputDir) throws IOException { FileSystem fs = FileSystem.get(conf); + Path outputFile = fs.listStatus(outputDir, new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part-"); + } + })[0].getPath(); InputStream in = null; try { - in = fs.open(new Path(outputFile)); + in = fs.open(outputFile); ByteArrayOutputStream out = new ByteArrayOutputStream(); IOUtils.copyBytes(in, out, conf); return Integer.parseInt(new String(out.toByteArray(), Charset.forName("UTF-8")).trim());
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserMapper.java Wed May 26 18:59:02 2010 @@ -21,28 +21,26 @@ import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Mapper; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable; import org.apache.mahout.cf.taste.hadoop.similarity.DistributedItemSimilarity; +import org.apache.mahout.common.iterator.IteratorIterable; import org.apache.mahout.math.VarLongWritable; /** * for each item-vector, we compute its weight here and map out all entries with the user as key, * so we can create the user-vectors in the reducer */ -public final class PreferredItemsPerUserMapper extends MapReduceBase - implements Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable> { +public final class PreferredItemsPerUserMapper extends + Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable> { private DistributedItemSimilarity distributedSimilarity; @Override - public void configure(JobConf jobConf) { - super.configure(jobConf); + public void setup(Context context) { + Configuration jobConf = context.getConfiguration(); distributedSimilarity = ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME)); } @@ -50,15 +48,15 @@ public final class PreferredItemsPerUser @Override public void map(VarLongWritable item, EntityPrefWritableArrayWritable userPrefsArray, - OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightWritable> output, - Reporter reporter) throws IOException { + Context context) throws IOException, InterruptedException { EntityPrefWritable[] userPrefs = userPrefsArray.getPrefs(); - double weight = distributedSimilarity.weightOfItemVector(new UserPrefsIterator(userPrefs)); + double weight = distributedSimilarity.weightOfItemVector( + new IteratorIterable<Float>(new UserPrefsIterator(userPrefs))); for (EntityPrefWritable userPref : userPrefs) { - output.collect(new VarLongWritable(userPref.getID()), + context.write(new VarLongWritable(userPref.getID()), new ItemPrefWithItemVectorWeightWritable(item.get(), weight, userPref.getPrefValue())); } } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/PreferredItemsPerUserReducer.java Wed May 26 18:59:02 2010 @@ -19,33 +19,28 @@ package org.apache.mahout.cf.taste.hadoo import java.io.IOException; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.math.VarLongWritable; -public final class PreferredItemsPerUserReducer extends MapReduceBase - implements Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> { +public final class PreferredItemsPerUserReducer extends + Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> { @Override public void reduce(VarLongWritable user, - Iterator<ItemPrefWithItemVectorWeightWritable> itemPrefs, - OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> output, - Reporter reporter) - throws IOException { + Iterable<ItemPrefWithItemVectorWeightWritable> itemPrefs, + Context context) + throws IOException, InterruptedException { Set<ItemPrefWithItemVectorWeightWritable> itemPrefsWithItemVectorWeight = new HashSet<ItemPrefWithItemVectorWeightWritable>(); - while (itemPrefs.hasNext()) { - itemPrefsWithItemVectorWeight.add(itemPrefs.next().clone()); + for (ItemPrefWithItemVectorWeightWritable writable : itemPrefs) { + itemPrefsWithItemVectorWeight.add(writable.clone()); } - output.collect(user, new ItemPrefWithItemVectorWeightArrayWritable( + context.write(user, new ItemPrefWithItemVectorWeightArrayWritable( itemPrefsWithItemVectorWeight.toArray( new ItemPrefWithItemVectorWeightWritable[itemPrefsWithItemVectorWeight.size()]))); } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/SimilarityReducer.java Wed May 26 18:59:02 2010 @@ -18,14 +18,10 @@ package org.apache.mahout.cf.taste.hadoop.similarity.item; import java.io.IOException; -import java.util.Iterator; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; import org.apache.mahout.cf.taste.hadoop.similarity.CoRating; import org.apache.mahout.cf.taste.hadoop.similarity.DistributedItemSimilarity; @@ -34,15 +30,15 @@ import org.apache.mahout.cf.taste.hadoop * Finally compute the similarity for each item-pair, that has been corated at least once. * Computation is done with an external implementation of {...@link DistributedItemSimilarity}. */ -public final class SimilarityReducer extends MapReduceBase - implements Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable> { +public final class SimilarityReducer extends + Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable> { private DistributedItemSimilarity distributedItemSimilarity; private int numberOfUsers; @Override - public void configure(JobConf jobConf) { - super.configure(jobConf); + public void setup(Context context) { + Configuration jobConf = context.getConfiguration(); distributedItemSimilarity = ItemSimilarityJob.instantiateSimilarity(jobConf.get(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME)); numberOfUsers = jobConf.getInt(ItemSimilarityJob.NUMBER_OF_USERS, -1); @@ -53,16 +49,15 @@ public final class SimilarityReducer ext @Override public void reduce(ItemPairWritable pair, - Iterator<CoRating> coRatings, - OutputCollector<EntityEntityWritable,DoubleWritable> output, - Reporter reporter) - throws IOException { + Iterable<CoRating> coRatings, + Context context) + throws IOException, InterruptedException { double similarity = distributedItemSimilarity.similarity(coRatings, pair.getItemAWeight(), pair.getItemBWeight(), numberOfUsers); if (!Double.isNaN(similarity)) { - output.collect(pair.getItemItemWritable(), new DoubleWritable(similarity)); + context.write(pair.getItemItemWritable(), new DoubleWritable(similarity)); } } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ToItemVectorReducer.java Wed May 26 18:59:02 2010 @@ -19,13 +19,9 @@ package org.apache.mahout.cf.taste.hadoo import java.io.IOException; import java.util.HashSet; -import java.util.Iterator; import java.util.Set; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable; import org.apache.mahout.math.VarLongWritable; @@ -34,24 +30,22 @@ import org.apache.mahout.math.VarLongWri * For each single item, collect all users with their preferences * (thereby building the item vectors of the user-item-matrix) */ -public final class ToItemVectorReducer - extends MapReduceBase implements +public final class ToItemVectorReducer extends Reducer<VarLongWritable,EntityPrefWritable,VarLongWritable,EntityPrefWritableArrayWritable> { @Override public void reduce(VarLongWritable item, - Iterator<EntityPrefWritable> userPrefs, - OutputCollector<VarLongWritable,EntityPrefWritableArrayWritable> output, - Reporter reporter) - throws IOException { + Iterable<EntityPrefWritable> userPrefs, + Context context) + throws IOException, InterruptedException { Set<EntityPrefWritable> collectedUserPrefs = new HashSet<EntityPrefWritable>(); - while (userPrefs.hasNext()) { - collectedUserPrefs.add(userPrefs.next().clone()); + for (EntityPrefWritable writable : userPrefs) { + collectedUserPrefs.add(writable.clone()); } - output.collect(item, new EntityPrefWritableArrayWritable( + context.write(item, new EntityPrefWritableArrayWritable( collectedUserPrefs.toArray(new EntityPrefWritable[collectedUserPrefs.size()]))); } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneAverageDiffsJob.java Wed May 26 18:59:02 2010 @@ -19,18 +19,19 @@ package org.apache.mahout.cf.taste.hadoo import java.io.IOException; import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.GzipCodec; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; -import org.apache.hadoop.mapred.lib.IdentityMapper; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; import org.apache.mahout.common.AbstractJob; @@ -41,7 +42,7 @@ import org.apache.mahout.math.VarLongWri public final class SlopeOneAverageDiffsJob extends AbstractJob { @Override - public int run(String[] args) throws IOException { + public int run(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Map<String,String> parsedArgs = AbstractJob.parseArguments(args); if (parsedArgs == null) { @@ -49,23 +50,41 @@ public final class SlopeOneAverageDiffsJ } Configuration originalConf = getConf(); - String prefsFile = originalConf.get("mapred.input.dir"); - String outputPath = originalConf.get("mapred.output.dir"); - String averagesOutputPath = parsedArgs.get("--tempDir"); - - JobConf prefsToDiffsJobConf = prepareJobConf(prefsFile, averagesOutputPath, - TextInputFormat.class, ToItemPrefsMapper.class, VarLongWritable.class, EntityPrefWritable.class, - SlopeOnePrefsToDiffsReducer.class, EntityEntityWritable.class, FloatWritable.class, - SequenceFileOutputFormat.class); - JobClient.runJob(prefsToDiffsJobConf); - - JobConf diffsToAveragesJobConf = prepareJobConf(averagesOutputPath, outputPath, - SequenceFileInputFormat.class, IdentityMapper.class, EntityEntityWritable.class, FloatWritable.class, - SlopeOneDiffsToAveragesReducer.class, EntityEntityWritable.class, FloatWritable.class, - TextOutputFormat.class); - diffsToAveragesJobConf.setClass("mapred.output.compression.codec", GzipCodec.class, - CompressionCodec.class); - JobClient.runJob(diffsToAveragesJobConf); + Path prefsFile = new Path(originalConf.get("mapred.input.dir")); + Path outputPath = new Path(originalConf.get("mapred.output.dir")); + Path averagesOutputPath = new Path(parsedArgs.get("--tempDir")); + + AtomicInteger currentPhase = new AtomicInteger(); + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job prefsToDiffsJob = prepareJob(prefsFile, + averagesOutputPath, + TextInputFormat.class, + ToItemPrefsMapper.class, + VarLongWritable.class, + EntityPrefWritable.class, + SlopeOnePrefsToDiffsReducer.class, + EntityEntityWritable.class, + FloatWritable.class, + SequenceFileOutputFormat.class); + prefsToDiffsJob.waitForCompletion(true); + } + + + if (shouldRunNextPhase(parsedArgs, currentPhase)) { + Job diffsToAveragesJob = prepareJob(averagesOutputPath, + outputPath, + SequenceFileInputFormat.class, + Mapper.class, + EntityEntityWritable.class, + FloatWritable.class, + SlopeOneDiffsToAveragesReducer.class, + EntityEntityWritable.class, + FloatWritable.class, + TextOutputFormat.class); + FileOutputFormat.setOutputCompressorClass(diffsToAveragesJob, GzipCodec.class); + diffsToAveragesJob.waitForCompletion(true); + } return 0; } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOneDiffsToAveragesReducer.java Wed May 26 18:59:02 2010 @@ -18,29 +18,24 @@ package org.apache.mahout.cf.taste.hadoop.slopeone; import java.io.IOException; -import java.util.Iterator; import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; -public final class SlopeOneDiffsToAveragesReducer extends MapReduceBase implements +public final class SlopeOneDiffsToAveragesReducer extends Reducer<EntityEntityWritable,FloatWritable, EntityEntityWritable,FloatWritable> { @Override public void reduce(EntityEntityWritable key, - Iterator<FloatWritable> values, - OutputCollector<EntityEntityWritable,FloatWritable> output, - Reporter reporter) throws IOException { + Iterable<FloatWritable> values, + Context context) throws IOException, InterruptedException { int count = 0; double total = 0.0; - while (values.hasNext()) { - total += values.next().get(); + for (FloatWritable value : values) { + total += value.get(); count++; } - output.collect(key, new FloatWritable((float) (total / count))); + context.write(key, new FloatWritable((float) (total / count))); } } \ No newline at end of file Modified: mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/cf/taste/hadoop/slopeone/SlopeOnePrefsToDiffsReducer.java Wed May 26 18:59:02 2010 @@ -20,29 +20,24 @@ package org.apache.mahout.cf.taste.hadoo import java.io.IOException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; import org.apache.mahout.math.VarLongWritable; -public final class SlopeOnePrefsToDiffsReducer extends MapReduceBase implements +public final class SlopeOnePrefsToDiffsReducer extends Reducer<VarLongWritable,EntityPrefWritable,EntityEntityWritable,FloatWritable> { @Override public void reduce(VarLongWritable key, - Iterator<EntityPrefWritable> values, - OutputCollector<EntityEntityWritable,FloatWritable> output, - Reporter reporter) throws IOException { + Iterable<EntityPrefWritable> values, + Context context) throws IOException, InterruptedException { List<EntityPrefWritable> prefs = new ArrayList<EntityPrefWritable>(); - while (values.hasNext()) { - prefs.add(new EntityPrefWritable(values.next())); + for (EntityPrefWritable writable : values) { + prefs.add(new EntityPrefWritable(writable)); } Collections.sort(prefs, ByItemIDComparator.getInstance()); int size = prefs.size(); @@ -54,7 +49,7 @@ public final class SlopeOnePrefsToDiffsR EntityPrefWritable second = prefs.get(j); long itemBID = second.getID(); float itemBValue = second.getPrefValue(); - output.collect(new EntityEntityWritable(itemAID, itemBID), new FloatWritable(itemBValue - itemAValue)); + context.write(new EntityEntityWritable(itemAID, itemBID), new FloatWritable(itemBValue - itemAValue)); } } } Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java (original) +++ mahout/trunk/core/src/main/java/org/apache/mahout/common/AbstractJob.java Wed May 26 18:59:02 2010 @@ -31,16 +31,15 @@ import org.apache.commons.cli2.builder.A import org.apache.commons.cli2.builder.DefaultOptionBuilder; import org.apache.commons.cli2.builder.GroupBuilder; import org.apache.commons.cli2.commandline.Parser; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputFormat; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.Tool; import org.apache.mahout.common.commandline.DefaultOptionCreator; import org.slf4j.Logger; @@ -166,47 +165,54 @@ public abstract class AbstractJob extend return !phaseSkipped; } - protected JobConf prepareJobConf(String inputPath, - String outputPath, - Class<? extends InputFormat> inputFormat, - Class<? extends Mapper> mapper, - Class<? extends Writable> mapperKey, - Class<? extends Writable> mapperValue, - Class<? extends Reducer> reducer, - Class<? extends Writable> reducerKey, - Class<? extends Writable> reducerValue, - Class<? extends OutputFormat> outputFormat) throws IOException { - - JobConf jobConf = new JobConf(getConf(), getClass()); - FileSystem fs = FileSystem.get(jobConf); - - Path inputPathPath = new Path(inputPath).makeQualified(fs); - Path outputPathPath = new Path(outputPath).makeQualified(fs); - - jobConf.setClass("mapred.input.format.class", inputFormat, InputFormat.class); - // Override this: - jobConf.set("mapred.input.dir", StringUtils.escapeString(inputPathPath.toString())); - - jobConf.setClass("mapred.mapper.class", mapper, Mapper.class); - jobConf.setClass("mapred.mapoutput.key.class", mapperKey, Writable.class); - jobConf.setClass("mapred.mapoutput.value.class", mapperValue, Writable.class); - - jobConf.setClass("mapred.reducer.class", reducer, Reducer.class); - jobConf.setClass("mapred.output.key.class", reducerKey, Writable.class); - jobConf.setClass("mapred.output.value.class", reducerValue, Writable.class); + protected Job prepareJob(Path inputPath, + Path outputPath, + Class<? extends InputFormat> inputFormat, + Class<? extends Mapper> mapper, + Class<? extends Writable> mapperKey, + Class<? extends Writable> mapperValue, + Class<? extends Reducer> reducer, + Class<? extends Writable> reducerKey, + Class<? extends Writable> reducerValue, + Class<? extends OutputFormat> outputFormat) throws IOException { + + Job job = new Job(new Configuration(getConf())); + Configuration jobConf = job.getConfiguration(); + + if (reducer.equals(Reducer.class)) { + if (mapper.equals(Mapper.class)) { + throw new IllegalStateException("Can't figure out the user class jar file from mapper/reducer"); + } + job.setJarByClass(mapper); + } else { + job.setJarByClass(reducer); + } + + job.setInputFormatClass(inputFormat); + jobConf.set("mapred.input.dir", inputPath.toString()); + + job.setMapperClass(mapper); + job.setMapOutputKeyClass(mapperKey); + job.setMapOutputValueClass(mapperValue); + jobConf.setBoolean("mapred.compress.map.output", true); - String customJobName = jobConf.get("mapred.job.name"); - if (customJobName == null) { + job.setReducerClass(reducer); + job.setOutputKeyClass(reducerKey); + job.setOutputValueClass(reducerValue); + + String customJobName = job.getJobName(); + if (customJobName == null || customJobName.trim().length() == 0) { customJobName = getClass().getSimpleName(); } - jobConf.set("mapred.job.name", customJobName + '-' + mapper.getSimpleName() + '-' + reducer.getSimpleName()); + customJobName += '-' + mapper.getSimpleName(); + customJobName += '-' + reducer.getSimpleName(); + job.setJobName(customJobName); - jobConf.setClass("mapred.output.format.class", outputFormat, OutputFormat.class); - // Override this: - jobConf.set("mapred.output.dir", StringUtils.escapeString(outputPathPath.toString())); + job.setOutputFormatClass(outputFormat); + jobConf.set("mapred.output.dir", outputPath.toString()); - return jobConf; + return job; } } Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java (original) +++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/DistributedItemSimilarityTestCase.java Wed May 26 18:59:02 2010 @@ -57,8 +57,8 @@ public abstract class DistributedItemSim } } - double weightX = similarity.weightOfItemVector(nonNaNPrefsX.iterator()); - double weightY = similarity.weightOfItemVector(nonNaNPrefsY.iterator()); + double weightX = similarity.weightOfItemVector(nonNaNPrefsX); + double weightY = similarity.weightOfItemVector(nonNaNPrefsY); List<CoRating> coRatings = new LinkedList<CoRating>(); @@ -71,7 +71,7 @@ public abstract class DistributedItemSim } } - double result = similarity.similarity(coRatings.iterator(), weightX, weightY, numberOfUsers); + double result = similarity.similarity(coRatings, weightX, weightY, numberOfUsers); assertEquals(expectedSimilarity, result, EPSILON); } Modified: mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java (original) +++ mahout/trunk/core/src/test/java/org/apache/mahout/cf/taste/hadoop/similarity/item/ItemSimilarityTest.java Wed May 26 18:59:02 2010 @@ -22,6 +22,7 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileReader; import java.io.FileWriter; +import java.io.FilenameFilter; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; @@ -29,12 +30,13 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; import org.apache.mahout.cf.taste.hadoop.EntityEntityWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritable; import org.apache.mahout.cf.taste.hadoop.EntityPrefWritableArrayWritable; @@ -50,32 +52,32 @@ import org.easymock.classextension.EasyM /** * Unit tests for the mappers and reducers in org.apache.mahout.cf.taste.hadoop.similarity * Integration test with a mini-file at the end - * */ public final class ItemSimilarityTest extends MahoutTestCase { public void testUserPrefsPerItemMapper() throws Exception { - OutputCollector<VarLongWritable,VarLongWritable> output = - EasyMock.createMock(OutputCollector.class); - output.collect(new VarLongWritable(34L), new EntityPrefWritable(12L, 2.3f)); - EasyMock.replay(output); + Mapper<LongWritable,Text,VarLongWritable,VarLongWritable>.Context context = + EasyMock.createMock(Mapper.Context.class); + context.write(new VarLongWritable(34L), new EntityPrefWritable(12L, 2.3f)); + EasyMock.replay(context); - new ToUserPrefsMapper().map(new LongWritable(), new Text("12,34,2.3"), output, null); + new ToUserPrefsMapper().map(new LongWritable(), new Text("12,34,2.3"), context); - EasyMock.verify(output); + EasyMock.verify(context); } public void testCountUsersMapper() throws Exception { - OutputCollector<CountUsersKeyWritable,VarLongWritable> output = EasyMock.createMock(OutputCollector.class); - output.collect(keyForUserID(12L), EasyMock.eq(new VarLongWritable(12L))); - output.collect(keyForUserID(35L), EasyMock.eq(new VarLongWritable(35L))); - EasyMock.replay(output); + Mapper<LongWritable,Text,CountUsersKeyWritable,VarLongWritable>.Context context = + EasyMock.createMock(Mapper.Context.class); + context.write(keyForUserID(12L), EasyMock.eq(new VarLongWritable(12L))); + context.write(keyForUserID(35L), EasyMock.eq(new VarLongWritable(35L))); + EasyMock.replay(context); CountUsersMapper mapper = new CountUsersMapper(); - mapper.map(null, new Text("12,100,1.3"), output, null); - mapper.map(null, new Text("35,100,3.0"), output, null); + mapper.map(null, new Text("12,100,1.3"), context); + mapper.map(null, new Text("35,100,3.0"), context); - EasyMock.verify(output); + EasyMock.verify(context); } static CountUsersKeyWritable keyForUserID(final long userID) { @@ -98,17 +100,18 @@ public final class ItemSimilarityTest ex public void testCountUsersReducer() throws Exception { - OutputCollector<VarIntWritable,NullWritable> output = EasyMock.createMock(OutputCollector.class); - output.collect(new VarIntWritable(3), NullWritable.get()); - EasyMock.replay(output); + Reducer<CountUsersKeyWritable,VarLongWritable,VarIntWritable,NullWritable>.Context context = + EasyMock.createMock(Reducer.Context.class); + context.write(new VarIntWritable(3), NullWritable.get()); + EasyMock.replay(context); List<VarLongWritable> userIDs = Arrays.asList(new VarLongWritable(1L), new VarLongWritable(1L), - new VarLongWritable(3L), new VarLongWritable(5L), - new VarLongWritable(5L), new VarLongWritable(5L)); + new VarLongWritable(3L), new VarLongWritable(5L), + new VarLongWritable(5L), new VarLongWritable(5L)); - new CountUsersReducer().reduce(null, userIDs.iterator(), output, null); + new CountUsersReducer().reduce(null, userIDs, context); - EasyMock.verify(output); + EasyMock.verify(context); } public void testToItemVectorReducer() throws Exception { @@ -116,16 +119,16 @@ public final class ItemSimilarityTest ex List<EntityPrefWritable> userPrefs = Arrays.asList( new EntityPrefWritable(34L, 1.0f), new EntityPrefWritable(56L, 2.0f)); - OutputCollector<VarLongWritable,EntityPrefWritableArrayWritable> output = - EasyMock.createMock(OutputCollector.class); + Reducer<VarLongWritable,EntityPrefWritable,VarLongWritable,EntityPrefWritableArrayWritable>.Context context = + EasyMock.createMock(Reducer.Context.class); - output.collect(EasyMock.eq(new VarLongWritable(12L)), equalToUserPrefs(userPrefs)); + context.write(EasyMock.eq(new VarLongWritable(12L)), equalToUserPrefs(userPrefs)); - EasyMock.replay(output); + EasyMock.replay(context); - new ToItemVectorReducer().reduce(new VarLongWritable(12L), userPrefs.iterator(), output, null); + new ToItemVectorReducer().reduce(new VarLongWritable(12L), userPrefs, context); - EasyMock.verify(output); + EasyMock.verify(context); } @@ -162,30 +165,32 @@ public final class ItemSimilarityTest ex } public void testPreferredItemsPerUserMapper() throws Exception { - OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightWritable> output = - EasyMock.createMock(OutputCollector.class); + Mapper<VarLongWritable,EntityPrefWritableArrayWritable,VarLongWritable,ItemPrefWithItemVectorWeightWritable>.Context context = + EasyMock.createMock(Mapper.Context.class); EntityPrefWritableArrayWritable userPrefs = new EntityPrefWritableArrayWritable( new EntityPrefWritable[] { new EntityPrefWritable(12L, 2.0f), new EntityPrefWritable(56L, 3.0f) }); + Configuration conf = new Configuration(); + EasyMock.expect(context.getConfiguration()).andStubReturn(conf); + double weight = - new DistributedUncenteredZeroAssumingCosineSimilarity().weightOfItemVector(Arrays.asList(2.0f, 3.0f).iterator()); + new DistributedUncenteredZeroAssumingCosineSimilarity().weightOfItemVector(Arrays.asList(2.0f, 3.0f)); - output.collect(new VarLongWritable(12L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 2.0f)); - output.collect(new VarLongWritable(56L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 3.0f)); + context.write(new VarLongWritable(12L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 2.0f)); + context.write(new VarLongWritable(56L), new ItemPrefWithItemVectorWeightWritable(34L, weight, 3.0f)); - JobConf conf = new JobConf(); conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME, "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity"); - EasyMock.replay(output); + EasyMock.replay(context); PreferredItemsPerUserMapper mapper = new PreferredItemsPerUserMapper(); - mapper.configure(conf); - mapper.map(new VarLongWritable(34L), userPrefs, output, null); + mapper.setup(context); + mapper.map(new VarLongWritable(34L), userPrefs, context); - EasyMock.verify(output); + EasyMock.verify(context); } public void testPreferredItemsPerUserReducer() throws Exception { @@ -194,17 +199,16 @@ public final class ItemSimilarityTest ex Arrays.asList(new ItemPrefWithItemVectorWeightWritable(34L, 5.0, 1.0f), new ItemPrefWithItemVectorWeightWritable(56L, 7.0, 2.0f)); - OutputCollector<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable> output = - EasyMock.createMock(OutputCollector.class); + Reducer<VarLongWritable,ItemPrefWithItemVectorWeightWritable,VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable>.Context context = + EasyMock.createMock(Reducer.Context.class); - output.collect(EasyMock.eq(new VarLongWritable(12L)), equalToItemPrefs(itemPrefs)); + context.write(EasyMock.eq(new VarLongWritable(12L)), equalToItemPrefs(itemPrefs)); - EasyMock.replay(output); + EasyMock.replay(context); - new PreferredItemsPerUserReducer().reduce( - new VarLongWritable(12L), itemPrefs.iterator(), output, null); + new PreferredItemsPerUserReducer().reduce(new VarLongWritable(12L), itemPrefs, context); - EasyMock.verify(output); + EasyMock.verify(context); } static ItemPrefWithItemVectorWeightArrayWritable equalToItemPrefs( @@ -239,8 +243,8 @@ public final class ItemSimilarityTest ex } public void testCopreferredItemsMapper() throws Exception { - OutputCollector<ItemPairWritable, CoRating> output = - EasyMock.createMock(OutputCollector.class); + Mapper<VarLongWritable,ItemPrefWithItemVectorWeightArrayWritable,ItemPairWritable,CoRating>.Context context = + EasyMock.createMock(Mapper.Context.class); ItemPrefWithItemVectorWeightArrayWritable itemPrefs = EasyMock.createMock(ItemPrefWithItemVectorWeightArrayWritable.class); @@ -248,36 +252,37 @@ public final class ItemSimilarityTest ex new ItemPrefWithItemVectorWeightWritable(34L, 2.0, 1.0f), new ItemPrefWithItemVectorWeightWritable(56L, 3.0, 2.0f), new ItemPrefWithItemVectorWeightWritable(78L, 4.0, 3.0f) }); - output.collect(new ItemPairWritable(34L, 56L, 2.0, 3.0), new CoRating(1.0f, 2.0f)); - output.collect(new ItemPairWritable(34L, 78L, 2.0, 4.0), new CoRating(1.0f, 3.0f)); - output.collect(new ItemPairWritable(56L, 78L, 3.0, 4.0), new CoRating(2.0f, 3.0f)); + context.write(new ItemPairWritable(34L, 56L, 2.0, 3.0), new CoRating(1.0f, 2.0f)); + context.write(new ItemPairWritable(34L, 78L, 2.0, 4.0), new CoRating(1.0f, 3.0f)); + context.write(new ItemPairWritable(56L, 78L, 3.0, 4.0), new CoRating(2.0f, 3.0f)); - EasyMock.replay(output, itemPrefs); + EasyMock.replay(context, itemPrefs); - new CopreferredItemsMapper().map(new VarLongWritable(), itemPrefs, output, null); + new CopreferredItemsMapper().map(new VarLongWritable(), itemPrefs, context); - EasyMock.verify(output, itemPrefs); + EasyMock.verify(context, itemPrefs); } public void testSimilarityReducer() throws Exception { - OutputCollector<EntityEntityWritable,DoubleWritable> output = - EasyMock.createMock(OutputCollector.class); + Reducer<ItemPairWritable,CoRating,EntityEntityWritable,DoubleWritable>.Context context = + EasyMock.createMock(Reducer.Context.class); + Configuration conf = new Configuration(); + EasyMock.expect(context.getConfiguration()).andStubReturn(conf); - JobConf conf = new JobConf(); conf.set(ItemSimilarityJob.DISTRIBUTED_SIMILARITY_CLASSNAME, "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity"); conf.setInt(ItemSimilarityJob.NUMBER_OF_USERS, 1); - output.collect(new EntityEntityWritable(12L, 34L), new DoubleWritable(0.5)); + context.write(new EntityEntityWritable(12L, 34L), new DoubleWritable(0.5)); - EasyMock.replay(output); + EasyMock.replay(context); SimilarityReducer reducer = new SimilarityReducer(); - reducer.configure(conf); - reducer.reduce(new ItemPairWritable(12L, 34L, 2.0, 10.0), Arrays.asList(new CoRating(2.5f, 2.0f), - new CoRating(2.0f, 2.5f)).iterator(), output, null); + reducer.setup(context); + reducer.reduce(new ItemPairWritable(12L, 34L, 2.0, 10.0), + Arrays.asList(new CoRating(2.5f, 2.0f),new CoRating(2.0f, 2.5f)), context); - EasyMock.verify(output); + EasyMock.verify(context); } public void testCompleteJob() throws Exception { @@ -311,19 +316,25 @@ public final class ItemSimilarityTest ex Configuration conf = new Configuration(); conf.set("mapred.input.dir", inputFile.getAbsolutePath()); conf.set("mapred.output.dir", outputDir.getAbsolutePath()); - conf.set("mapred.output.compress", Boolean.FALSE.toString()); + conf.setBoolean("mapred.output.compress", false); similarityJob.setConf(conf); similarityJob.run(new String[] { "--tempDir", tmpDir.getAbsolutePath(), "--similarityClassname", "org.apache.mahout.cf.taste.hadoop.similarity.DistributedUncenteredZeroAssumingCosineSimilarity"}); - File countUsersPart = new File(new File(tmpDir, "countUsers"), "part-00000"); - int numberOfUsers = ItemSimilarityJob.readNumberOfUsers(new JobConf(), countUsersPart.getAbsolutePath()); + File countUsersPart = new File(tmpDir, "countUsers"); + int numberOfUsers = ItemSimilarityJob.readNumberOfUsers(new Configuration(), + new Path(countUsersPart.getAbsolutePath())); assertEquals(3, numberOfUsers); - File outPart = new File(outputDir, "part-00000"); + File outPart = outputDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.startsWith("part-"); + } + })[0]; BufferedReader reader = new BufferedReader(new FileReader(outPart)); String line; Modified: mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java URL: http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java?rev=948538&r1=948537&r2=948538&view=diff ============================================================================== --- mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java (original) +++ mahout/trunk/utils/src/main/java/org/apache/mahout/text/TextParagraphSplittingJob.java Wed May 26 18:59:02 2010 @@ -18,16 +18,13 @@ package org.apache.mahout.text; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.MapReduceBase; -import org.apache.hadoop.mapred.Mapper; -import org.apache.hadoop.mapred.OutputCollector; -import org.apache.hadoop.mapred.Reducer; -import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.mapred.SequenceFileInputFormat; -import org.apache.hadoop.mapred.SequenceFileOutputFormat; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.hadoop.util.ToolRunner; import org.apache.mahout.common.AbstractJob; @@ -38,36 +35,32 @@ public class TextParagraphSplittingJob e @Override public int run(String[] strings) throws Exception { Configuration originalConf = getConf(); - JobConf conf = prepareJobConf(originalConf.get("mapred.input.dir"), - originalConf.get("mapred.output.dir"), - SequenceFileInputFormat.class, - SplitMap.class, - Text.class, - Text.class, - Reducer.class, - Text.class, - Text.class, - SequenceFileOutputFormat.class); - conf.setNumReduceTasks(0); - - JobClient.runJob(conf).waitForCompletion(); + Job job = prepareJob(new Path(originalConf.get("mapred.input.dir")), + new Path(originalConf.get("mapred.output.dir")), + SequenceFileInputFormat.class, + SplitMap.class, + Text.class, + Text.class, + Reducer.class, + Text.class, + Text.class, + SequenceFileOutputFormat.class); + job.setNumReduceTasks(0); + job.waitForCompletion(true); return 1; } - public static class SplitMap extends MapReduceBase implements Mapper<Text,Text,Text,Text> { + public static class SplitMap extends Mapper<Text,Text,Text,Text> { @Override - public void map(Text key, - Text text, - OutputCollector<Text, Text> out, - Reporter reporter) throws IOException { + public void map(Text key, Text text, Context context) throws IOException, InterruptedException { Text outText = new Text(); int loc = 0; while(loc >= 0 && loc < text.getLength()) { int nextLoc = text.find("\n\n", loc+1); - if(nextLoc > 0) { + if (nextLoc > 0) { outText.set(text.getBytes(), loc, (nextLoc - loc)); - out.collect(key, outText); + context.write(key, outText); } loc = nextLoc; }
