Author: tcp
Date: Thu Mar  1 00:24:07 2012
New Revision: 1295352

URL: http://svn.apache.org/viewvc?rev=1295352&view=rev
Log:
MAHOUT-980: Fix DistributedCache usage to allow EMR deployment

Modified:
    
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java

Modified: 
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java?rev=1295352&r1=1295351&r2=1295352&view=diff
==============================================================================
--- 
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java 
(original)
+++ 
mahout/trunk/core/src/main/java/org/apache/mahout/fpm/pfpgrowth/PFPGrowth.java 
Thu Mar  1 00:24:07 2012
@@ -96,15 +96,28 @@ public final class PFPGrowth {
    */
   public static List<Pair<String,Long>> readFList(Configuration conf) throws 
IOException {
     List<Pair<String,Long>> list = new ArrayList<Pair<String,Long>>();
-    URI[] files = DistributedCache.getCacheFiles(conf);
+    Path[] files = DistributedCache.getLocalCacheFiles(conf);
     if (files == null) {
       throw new IOException("Cannot read Frequency list from Distributed 
Cache");
     }
     if (files.length != 1) {
       throw new IOException("Cannot read Frequency list from Distributed Cache 
("+files.length+")");
     }
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path fListLocalPath = fs.makeQualified(files[0]);
+    // Fallback if we are running locally.
+    if (! fs.exists(fListLocalPath)) {
+      URI[] filesURIs = DistributedCache.getCacheFiles(conf);
+      if (filesURIs == null) {
+        throw new IOException("Cannot read Frequency list from Distributed 
Cache");
+      }
+      if (filesURIs.length != 1) {
+        throw new IOException("Cannot read Frequency list from Distributed 
Cache ("+files.length+")");
+      }
+      fListLocalPath = new Path(filesURIs[0].getPath());
+    }
     for (Pair<Text,LongWritable> record :
-         new SequenceFileIterable<Text,LongWritable>(new 
Path(files[0].getPath()), true, conf)) {
+         new SequenceFileIterable<Text,LongWritable>(fListLocalPath, true, 
conf)) {
       list.add(new Pair<String,Long>(record.getFirst().toString(), 
record.getSecond().get()));
     }
     return list;


Reply via email to