Author: srowen
Date: Wed Jun 22 22:29:35 2011
New Revision: 1138664
URL: http://svn.apache.org/viewvc?rev=1138664&view=rev
Log:
MAHOUT-632 Robin's fix to use distributed cache
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
Wed Jun 22 22:29:35 2011
@@ -18,33 +18,38 @@
package org.apache.mahout.fpm.pfpgrowth;
import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DefaultStringifier;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.GenericsUtil;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.Parameters;
import org.apache.mahout.common.iterator.sequencefile.PathType;
import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
import org.apache.mahout.fpm.pfpgrowth.convertors.string.TopKStringPatterns;
import org.apache.mahout.fpm.pfpgrowth.fpgrowth.FPGrowth;
import org.slf4j.Logger;
@@ -70,7 +75,7 @@ public final class PFPGrowth {
public static final String FILE_PATTERN = "part-*";
public static final String FPGROWTH = "fpgrowth";
public static final String FREQUENT_PATTERNS = "frequentpatterns";
- public static final String PARALLEL_COUNTING = "parallelcounting";
+ public static final String PARALLEL_COUNTING = "parallelcounting";
public static final String SORTED_OUTPUT = "sortedoutput";
public static final String SPLIT_PATTERN = "splitPattern";
@@ -78,46 +83,89 @@ public final class PFPGrowth {
private static final Logger log = LoggerFactory.getLogger(PFPGrowth.class);
- private PFPGrowth() { }
+ private PFPGrowth() {}
/**
* Generates the fList from the serialized string representation
- *
+ *
* @return Deserialized Feature Frequency List
*/
- public static List<Pair<String,Long>> deserializeList(Parameters params,
- String key,
- Configuration conf)
throws IOException {
- List<Pair<String,Long>> list = Lists.newArrayList();
- conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
- +
"org.apache.hadoop.io.serializer.WritableSerialization");
-
- DefaultStringifier<List<Pair<String,Long>>> listStringifier = new
DefaultStringifier<List<Pair<String,Long>>>(
- conf, GenericsUtil.getClass(list));
- String serializedString = params.get(key, listStringifier.toString(list));
- list = listStringifier.fromString(serializedString);
+ public static List<Pair<String,Long>> readFList(Configuration conf) throws
IOException {
+ List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ if (files == null || files.length < 2) {
+ throw new IOException("Cannot read Frequency list and Grouping list from
Distributed Cache");
+ }
+ for (Pair<Text,LongWritable> record :
+ new SequenceFileIterable<Text,LongWritable>(new
Path(files[0].getPath()), true, conf)) {
+ list.add(new Pair<String,Long>(record.getFirst().toString(),
record.getSecond().get()));
+ }
return list;
}
/**
* Generates the gList(Group ID Mapping of Various frequent Features) Map
from the corresponding serialized
* representation
- *
+ *
* @return Deserialized Group List
*/
- public static Map<String,Long> deserializeMap(Parameters params, String key,
Configuration conf) throws IOException {
- Map<String,Long> map = Maps.newHashMap();
- conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
- +
"org.apache.hadoop.io.serializer.WritableSerialization");
-
- DefaultStringifier<Map<String,Long>> mapStringifier = new
DefaultStringifier<Map<String,Long>>(conf,
- GenericsUtil.getClass(map));
- String gListString = params.get(key, mapStringifier.toString(map));
- map = mapStringifier.fromString(gListString);
+ public static Map<String,Long> readGList(Configuration conf) throws
IOException {
+ Map<String,Long> map = new HashMap<String,Long>();
+ URI[] files = DistributedCache.getCacheFiles(conf);
+ if (files == null || files.length < 2) {
+ throw new IOException("Cannot read Frequency list and Grouping list from
Distributed Cache");
+ }
+ for (Pair<Text,LongWritable> record :
+ new SequenceFileIterable<Text,LongWritable>(new
Path(files[1].getPath()), true, conf)) {
+ map.put(record.getFirst().toString(), record.getSecond().get());
+ }
return map;
}
/**
+ * Serializes the fList and returns the string representation of the List
+ *
+ * @return Serialized String representation of List
+ */
+ private static void saveFList(Iterable<Pair<String,Long>> flist, Parameters
params, Configuration conf)
+ throws IOException {
+ Path flistPath = new Path(params.get(OUTPUT), F_LIST);
+ FileSystem fs = FileSystem.get(conf);
+ flistPath = fs.makeQualified(flistPath);
+ HadoopUtil.delete(conf, flistPath);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath,
Text.class, LongWritable.class);
+ try {
+ for (Pair<String,Long> pair : flist) {
+ writer.append(new Text(pair.getFirst()), new
LongWritable(pair.getSecond()));
+ }
+ } finally {
+ writer.close();
+ }
+ DistributedCache.addCacheFile(flistPath.toUri(), conf);
+ }
+
+ /**
+ * Converts a given Map in to a String using DefaultStringifier of Hadoop
+ *
+ * @return Serialized String representation of the GList Map
+ */
+ private static void saveGList(Map<String,Long> glist, Parameters params,
Configuration conf) throws IOException {
+ Path flistPath = new Path(params.get(OUTPUT), G_LIST);
+ FileSystem fs = FileSystem.get(conf);
+ flistPath = fs.makeQualified(flistPath);
+ HadoopUtil.delete(conf, flistPath);
+ SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, flistPath,
Text.class, LongWritable.class);
+ try {
+ for (Entry<String,Long> pair : glist.entrySet()) {
+ writer.append(new Text(pair.getKey()), new
LongWritable(pair.getValue()));
+ }
+ } finally {
+ writer.close();
+ }
+ DistributedCache.addCacheFile(flistPath.toUri(), conf);
+ }
+
+ /**
* read the feature frequency List which is built at the end of the Parallel
counting job
*
* @return Feature Frequency List
@@ -125,14 +173,24 @@ public final class PFPGrowth {
public static List<Pair<String,Long>> readFList(Parameters params) {
int minSupport = Integer.valueOf(params.get(MIN_SUPPORT, "3"));
Configuration conf = new Configuration();
+
+ Path parallelCountingPath = new Path(params.get(OUTPUT),
PARALLEL_COUNTING);
- PriorityQueue<Pair<String,Long>> queue =
- new PriorityQueue<Pair<String,Long>>(11, new
CountDescendingPairComparator<String,Long>());
+ PriorityQueue<Pair<String,Long>> queue = new
PriorityQueue<Pair<String,Long>>(11,
+ new Comparator<Pair<String,Long>>() {
+ @Override
+ public int compare(Pair<String,Long> o1, Pair<String,Long> o2) {
+ int ret = o2.getSecond().compareTo(o1.getSecond());
+ if (ret != 0) {
+ return ret;
+ }
+ return o1.getFirst().compareTo(o2.getFirst());
+ }
+ });
- Path parallelCountingPath = new Path(params.get(OUTPUT),
PARALLEL_COUNTING);
- Path filesPattern = new Path(parallelCountingPath, FILE_PATTERN);
- for (Pair<Writable,LongWritable> record
- : new SequenceFileDirIterable<Writable,LongWritable>(filesPattern,
PathType.GLOB, null, null, true, conf)) {
+ for (Pair<Text,LongWritable> record :
+ new SequenceFileDirIterable<Text,LongWritable>(new
Path(parallelCountingPath, FILE_PATTERN),
+ PathType.GLOB, null,
null, true, conf)) {
long value = record.getSecond().get();
if (value >= minSupport) {
queue.add(new Pair<String,Long>(record.getFirst().toString(), value));
@@ -153,7 +211,7 @@ public final class PFPGrowth {
public static List<Pair<String,TopKStringPatterns>>
readFrequentPattern(Parameters params) throws IOException {
Configuration conf = new Configuration();
-
+
Path frequentPatternsPath = new Path(params.get(OUTPUT),
FREQUENT_PATTERNS);
FileSystem fs = FileSystem.get(frequentPatternsPath.toUri(), conf);
FileStatus[] outputFiles = fs.globStatus(new Path(frequentPatternsPath,
FILE_PATTERN));
@@ -171,25 +229,26 @@ public final class PFPGrowth {
* params should contain input and output locations as a string
value, the additional parameters
* include minSupport(3), maxHeapSize(50), numGroups(1000)
*/
- public static void runPFPGrowth(Parameters params)
- throws IOException, InterruptedException, ClassNotFoundException {
- startParallelCounting(params);
- startGroupingItems(params);
- startTransactionSorting(params);
- startParallelFPGrowth(params);
- startAggregating(params);
+ public static void runPFPGrowth(Parameters params) throws IOException,
+ InterruptedException,
+ ClassNotFoundException {
+ Configuration conf = new Configuration();
+ conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
+ +
"org.apache.hadoop.io.serializer.WritableSerialization");
+ startParallelCounting(params, conf);
+ startGroupingItems(params, conf);
+ startTransactionSorting(params, conf);
+ startParallelFPGrowth(params, conf);
+ startAggregating(params, conf);
}
/**
* Run the aggregation Job to aggregate the different TopK patterns and
group each Pattern by the features
* present in it and thus calculate the final Top K frequent Patterns for
each feature
*/
- public static void startAggregating(Parameters params)
+ public static void startAggregating(Parameters params, Configuration conf)
throws IOException, InterruptedException, ClassNotFoundException {
- Configuration conf = new Configuration();
- params.set(F_LIST, "");
- params.set(G_LIST, "");
conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
@@ -210,7 +269,7 @@ public final class PFPGrowth {
job.setCombinerClass(AggregatorReducer.class);
job.setReducerClass(AggregatorReducer.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
-
+
HadoopUtil.delete(conf, outPath);
job.waitForCompletion(true);
}
@@ -221,8 +280,7 @@ public final class PFPGrowth {
* @param params
* @throws IOException
*/
- public static void startGroupingItems(Parameters params) throws IOException {
- Configuration conf = new Configuration();
+ public static void startGroupingItems(Parameters params, Configuration conf)
throws IOException {
List<Pair<String,Long>> fList = readFList(params);
Integer numGroups = Integer.valueOf(params.get(NUM_GROUPS, "50"));
@@ -247,17 +305,15 @@ public final class PFPGrowth {
log.info("No of Features: {}", fList.size());
- params.set(G_LIST, serializeMap(gList, conf));
- params.set(F_LIST, serializeList(fList, conf));
+ saveFList(fList, params, conf);
+ saveGList(gList, params, conf);
}
/**
* Count the frequencies of various features in parallel using Map/Reduce
*/
- public static void startParallelCounting(Parameters params)
+ public static void startParallelCounting(Parameters params, Configuration
conf)
throws IOException, InterruptedException, ClassNotFoundException {
-
- Configuration conf = new Configuration();
conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
@@ -289,12 +345,8 @@ public final class PFPGrowth {
/**
* Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features
of group dependent shards
*/
- public static void startTransactionSorting(Parameters params)
+ public static void startTransactionSorting(Parameters params, Configuration
conf)
throws IOException, InterruptedException, ClassNotFoundException {
-
- Configuration conf = new Configuration();
- String gList = params.get(G_LIST);
- params.set(G_LIST, "");
conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
@@ -320,16 +372,13 @@ public final class PFPGrowth {
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.waitForCompletion(true);
- params.set(G_LIST, gList);
}
/**
* Run the Parallel FPGrowth Map/Reduce Job to calculate the Top K features
of group dependent shards
*/
- public static void startParallelFPGrowth(Parameters params)
+ public static void startParallelFPGrowth(Parameters params, Configuration
conf)
throws IOException, InterruptedException, ClassNotFoundException {
-
- Configuration conf = new Configuration();
conf.set(PFP_PARAMETERS, params.toString());
conf.set("mapred.compress.map.output", "true");
conf.set("mapred.output.compression.type", "BLOCK");
@@ -357,30 +406,4 @@ public final class PFPGrowth {
job.waitForCompletion(true);
}
-
- /**
- * Serializes the fList and returns the string representation of the List
- *
- * @return Serialized String representation of List
- */
- private static String serializeList(List<Pair<String,Long>> list,
Configuration conf) throws IOException {
- conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
- +
"org.apache.hadoop.io.serializer.WritableSerialization");
- DefaultStringifier<List<Pair<String,Long>>> listStringifier = new
DefaultStringifier<List<Pair<String,Long>>>(
- conf, GenericsUtil.getClass(list));
- return listStringifier.toString(list);
- }
-
- /**
- * Converts a given Map in to a String using DefaultStringifier of Hadoop
- *
- * @return Serialized String representation of the GList Map
- */
- private static String serializeMap(Map<String,Long> map, Configuration conf)
throws IOException {
- conf.set("io.serializations",
"org.apache.hadoop.io.serializer.JavaSerialization,"
- +
"org.apache.hadoop.io.serializer.WritableSerialization");
- DefaultStringifier<Map<String,Long>> mapStringifier = new
DefaultStringifier<Map<String,Long>>(conf,
- GenericsUtil.getClass(map));
- return mapStringifier.toString(map);
- }
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthMapper.java
Wed Jun 22 22:29:35 2011
@@ -26,7 +26,6 @@ import java.util.Map.Entry;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.mahout.common.Pair;
-import org.apache.mahout.common.Parameters;
import org.apache.mahout.math.map.OpenIntLongHashMap;
import org.apache.mahout.math.map.OpenObjectIntHashMap;
@@ -47,9 +46,8 @@ public class ParallelFPGrowthMapper exte
Integer[] prunedItems = pattern.getFirst().toArray(new
Integer[pattern.getFirst().size()]);
Collection<Long> groups = new HashSet<Long>();
- for (int j = prunedItems.length - 1; j >= 0; j--) { // generate group
- // dependent
- // shards
+ for (int j = prunedItems.length - 1; j >= 0; j--) {
+ // generate group dependent shards
Integer item = prunedItems[j];
Long groupID = gListInt.get(item);
@@ -68,16 +66,13 @@ public class ParallelFPGrowthMapper exte
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
- Parameters params = new
Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
-
OpenObjectIntHashMap<String> fMap = new OpenObjectIntHashMap<String>();
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params,
PFPGrowth.F_LIST, context.getConfiguration())) {
+ for (Pair<String,Long> e :
PFPGrowth.readFList(context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
}
- for (Entry<String,Long> e : PFPGrowth.deserializeMap(params,
PFPGrowth.G_LIST, context.getConfiguration())
- .entrySet()) {
+ for (Entry<String,Long> e :
PFPGrowth.readGList(context.getConfiguration()).entrySet()) {
gListInt.put(fMap.get(e.getKey()), e.getValue());
}
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/ParallelFPGrowthReducer.java
Wed Jun 22 22:29:35 2011
@@ -94,13 +94,13 @@ public class ParallelFPGrowthReducer ext
Parameters params = new
Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params,
PFPGrowth.F_LIST, context.getConfiguration())) {
+ for (Pair<String,Long> e :
PFPGrowth.readFList(context.getConfiguration())) {
featureReverseMap.add(e.getFirst());
fMap.put(e.getFirst(), i++);
}
- Map<String,Long> gList = PFPGrowth.deserializeMap(params,
PFPGrowth.G_LIST, context.getConfiguration());
+ Map<String,Long> gList = PFPGrowth.readGList(context.getConfiguration());
for (Entry<String,Long> entry : gList.entrySet()) {
IntArrayList groupList = groupFeatures.get(entry.getValue());
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/TransactionSortingMapper.java
Wed Jun 22 22:29:35 2011
@@ -71,9 +71,9 @@ public class TransactionSortingMapper ex
protected void setup(Context context) throws IOException,
InterruptedException {
super.setup(context);
Parameters params = new
Parameters(context.getConfiguration().get(PFPGrowth.PFP_PARAMETERS, ""));
-
+
int i = 0;
- for (Pair<String,Long> e : PFPGrowth.deserializeList(params,
PFPGrowth.F_LIST, context.getConfiguration())) {
+ for (Pair<String,Long> e :
PFPGrowth.readFList(context.getConfiguration())) {
fMap.put(e.getFirst(), i++);
}
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthRetailDataTest.java
Wed Jun 22 22:29:35 2011
@@ -32,6 +32,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
+import org.apache.hadoop.conf.Configuration;
import org.apache.mahout.common.MahoutTestCase;
import org.apache.mahout.common.Pair;
import org.apache.mahout.common.Parameters;
@@ -99,17 +100,17 @@ public class PFPGrowthRetailDataTest ext
Long support = Long.parseLong(supportString.substring(1,
supportString.length() - 1));
expectedResults.put(new HashSet<String>(items), support);
}
-
+ Configuration conf = new Configuration();
log.info("Starting Parallel Counting Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startParallelCounting(params);
+ PFPGrowth.startParallelCounting(params, conf);
log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startGroupingItems(params);
+ PFPGrowth.startGroupingItems(params, conf);
log.info("Starting Parallel FPGrowth Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startGroupingItems(params);
- PFPGrowth.startTransactionSorting(params);
- PFPGrowth.startParallelFPGrowth(params);
+ PFPGrowth.startGroupingItems(params, conf);
+ PFPGrowth.startTransactionSorting(params, conf);
+ PFPGrowth.startParallelFPGrowth(params, conf);
log.info("Starting Pattern Aggregation Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startAggregating(params);
+ PFPGrowth.startAggregating(params, conf);
List<Pair<String,TopKStringPatterns>> frequentPatterns =
PFPGrowth.readFrequentPattern(params);
Map<Set<String>,Long> results = Maps.newHashMap();
Modified:
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java?rev=1138664&r1=1138663&r2=1138664&view=diff
==============================================================================
---
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
(original)
+++
mahout/trunk/core/src/test/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowthTest.java
Wed Jun 22 22:29:35 2011
@@ -22,7 +22,6 @@ import java.io.Writer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
@@ -82,25 +81,23 @@ public final class PFPGrowthTest extends
@Test
public void testStartParallelFPGrowth() throws Exception {
+ Configuration conf = new Configuration();
log.info("Starting Parallel Counting Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startParallelCounting(params);
+ PFPGrowth.startParallelCounting(params, conf);
log.info("Reading fList Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
List<Pair<String,Long>> fList = PFPGrowth.readFList(params);
log.info("{}", fList);
assertEquals("[(B,6), (D,6), (A,5), (E,4), (C,3)]", fList.toString());
log.info("Starting Grouping Test: {}", params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startGroupingItems(params);
- Map<String,Long> gList = PFPGrowth.deserializeMap(params,
PFPGrowth.G_LIST, new Configuration());
- log.info("{}", gList);
- assertEquals("{D=0, E=1, A=0, B=0, C=1}", gList.toString());
+ PFPGrowth.startGroupingItems(params, conf);
log.info("Starting Parallel FPGrowth Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startGroupingItems(params);
- PFPGrowth.startTransactionSorting(params);
- PFPGrowth.startParallelFPGrowth(params);
+ PFPGrowth.startGroupingItems(params, conf);
+ PFPGrowth.startTransactionSorting(params, conf);
+ PFPGrowth.startParallelFPGrowth(params, conf);
log.info("Starting Pattern Aggregation Test: {}",
params.get(PFPGrowth.MAX_HEAPSIZE));
- PFPGrowth.startAggregating(params);
+ PFPGrowth.startAggregating(params, conf);
List<Pair<String,TopKStringPatterns>> frequentPatterns =
PFPGrowth.readFrequentPattern(params);
assertEquals("[(A,([A],5), ([D, A],4), ([B, A],4), ([A, E],4)), "
+ "(B,([B],6), ([B, D],4), ([B, A],4), ([B, D, A],3)), "