Author: srowen
Date: Sat Jun  4 11:04:48 2011
New Revision: 1131376

URL: http://svn.apache.org/viewvc?rev=1131376&view=rev
Log:
MAHOUT-700 better use of FileSystem to accommodate S3

Modified:
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
 Sat Jun  4 11:04:48 2011
@@ -43,7 +43,7 @@ public final class ChunkedWriter impleme
       chunkSizeInMB = 1984;
     }
     maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
-    fs = FileSystem.get(conf);
+    fs = FileSystem.get(output.toUri(), conf);
     currentChunkID = 0;
     writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), 
Text.class, Text.class);
   }

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
 Sat Jun  4 11:04:48 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.text;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.mahout.common.iterator.FileLineIterable;
@@ -32,9 +33,12 @@ import java.util.Map;
  */
 public final class PrefixAdditionFilter extends 
SequenceFilesFromDirectoryFilter {
 
-  public PrefixAdditionFilter(Configuration conf, String keyPrefix, 
Map<String, String> options, ChunkedWriter writer)
-    throws IOException {
-    super(conf, keyPrefix, options, writer);
+  public PrefixAdditionFilter(Configuration conf,
+                              String keyPrefix,
+                              Map<String, String> options, 
+                              ChunkedWriter writer,
+                              FileSystem fs) {
+    super(conf, keyPrefix, options, writer, fs);
   }
 
   @Override
@@ -42,7 +46,7 @@ public final class PrefixAdditionFilter 
     if (fst.isDir()) {
       fs.listStatus(fst.getPath(),
                     new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR + 
current.getName(),
-                        options, writer));
+                        options, writer, fs));
     } else {
       InputStream in = null;
       try {

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
 Sat Jun  4 11:04:48 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.text;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.iterator.FileLineIterable;
@@ -51,8 +52,9 @@ public final class SequenceFilesFromCsvF
   public SequenceFilesFromCsvFilter(Configuration conf,
                                     String keyPrefix,
                                     Map<String, String> options,
-                                    ChunkedWriter writer) throws IOException {
-    super(conf, keyPrefix, options, writer);
+                                    ChunkedWriter writer,
+                                    FileSystem fs) {
+    super(conf, keyPrefix, options, writer, fs);
     this.keyColumn = Integer.parseInt(options.get(KEY_COLUMN_OPTION[0]));
     this.valueColumn = Integer.parseInt(options.get(VALUE_COLUMN_OPTION[0]));
   }
@@ -84,7 +86,7 @@ public final class SequenceFilesFromCsvF
     if (fst.isDir()) {
       fs.listStatus(fst.getPath(),
                     new SequenceFilesFromCsvFilter(conf, prefix + 
Path.SEPARATOR + current.getName(),
-                        this.options, writer));
+                        this.options, writer, fs));
     } else {
       InputStream in = fs.open(fst.getPath());
       for (CharSequence aFit : new FileLineIterable(in, charset, false)) {

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
 Sat Jun  4 11:04:48 2011
@@ -32,8 +32,6 @@ import org.apache.hadoop.util.ToolRunner
 import org.apache.mahout.common.AbstractJob;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Converts a directory of text documents into SequenceFiles of Specified 
chunkSize. This class takes in a
@@ -44,8 +42,6 @@ import org.slf4j.LoggerFactory;
  */
 public class SequenceFilesFromDirectory extends AbstractJob {
 
-  private static final Logger log = 
LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
-
   private static final String PREFIX_ADDITION_FILTER = 
PrefixAdditionFilter.class.getName();
   
   public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
@@ -60,21 +56,24 @@ public class SequenceFilesFromDirectory 
                   Path output)
     throws InstantiationException, IllegalAccessException, 
InvocationTargetException, IOException,
            NoSuchMethodException, ClassNotFoundException {
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(input.toUri(), conf);
     ChunkedWriter writer = new ChunkedWriter(conf, 
Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
 
     try {
       SequenceFilesFromDirectoryFilter pathFilter;
-
       String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
       if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
-        pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, 
writer);
+        pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, 
writer, fs);
       } else {
         Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass =
             
Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
         Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
-            pathFilterClass.getConstructor(Configuration.class, String.class, 
Map.class, ChunkedWriter.class);
-        pathFilter = constructor.newInstance(conf, keyPrefix, options, writer);
+            pathFilterClass.getConstructor(Configuration.class,
+                                           String.class,
+                                           Map.class,
+                                           ChunkedWriter.class,
+                                           FileSystem.class);
+        pathFilter = constructor.newInstance(conf, keyPrefix, options, writer, 
fs);
       }
       fs.listStatus(input, pathFilter);
     } finally {

Modified: 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
--- 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
 (original)
+++ 
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
 Sat Jun  4 11:04:48 2011
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.mahout.text;
 
 import org.apache.hadoop.conf.Configuration;
@@ -53,13 +54,13 @@ public abstract class SequenceFilesFromD
   protected SequenceFilesFromDirectoryFilter(Configuration conf,
                                              String keyPrefix,
                                              Map<String, String> options,
-                                             ChunkedWriter writer)
-    throws IOException {
+                                             ChunkedWriter writer,
+                                             FileSystem fs) {
     this.conf = conf;
     this.prefix = keyPrefix;
     this.writer = writer;
     this.charset = 
Charset.forName(options.get(SequenceFilesFromDirectory.CHARSET_OPTION[0]));
-    this.fs = FileSystem.get(conf);
+    this.fs = fs;
     this.options = options;
   }
 


Reply via email to