Author: tcp
Date: Thu Mar 1 00:24:07 2012
New Revision: 1295352
URL: http://svn.apache.org/viewvc?rev=1295352&view=rev
Log:
MAHOUT-980: Fix DistributedCache usage to allow EMR deployment
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1295352&r1=1295351&r2=1295352&view=diff
==============================================================================
---
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
(original)
+++
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
Thu Mar 1 00:24:07 2012
@@ -96,15 +96,28 @@ public final class PFPGrowth {
*/
public static List<Pair<String,Long>> readFList(Configuration conf) throws
IOException {
List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
- URI[] files = DistributedCache.getCacheFiles(conf);
+ Path[] files = DistributedCache.getLocalCacheFiles(conf);
if (files == null) {
throw new IOException("Cannot read Frequency list from Distributed
Cache");
}
if (files.length != 1) {
throw new IOException("Cannot read Frequency list from Distributed Cache
("+files.length+")");
}
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path fListLocalPath = fs.makeQualified(files[0]);
+ // Fallback if we are running locally.
+ if (! fs.exists(fListLocalPath)) {
+ URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+ if (filesURIs == null) {
+ throw new IOException("Cannot read Frequency list from Distributed
Cache");
+ }
+ if (filesURIs.length != 1) {
+ throw new IOException("Cannot read Frequency list from Distributed
Cache ("+files.length+")");
+ }
+ fListLocalPath = new Path(filesURIs[0].getPath());
+ }
for (Pair<Text,LongWritable> record :
- new SequenceFileIterable<Text,LongWritable>(new
Path(files[0].getPath()), true, conf)) {
+ new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true,
conf)) {
list.add(new Pair<String,Long>(record.getFirst().toString(),
record.getSecond().get()));
}
return list;