Author: isabel
Date: Wed Mar 23 13:32:26 2011
New Revision: 1084582

URL: http://svn.apache.org/viewvc?rev=1084582&view=rev
Log:
MAHOUT-590 - added support for configurable directory-/ file layout for
SequenceFilesFromDirectory job. Includes a demo for tab separated value
formatted files.

Added:
    mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
    
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
    
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
    
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
Modified:
    
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
    
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java

Added: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java?rev=1084582&view=auto
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java 
(added)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java 
Wed Mar 23 13:32:26 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public final class ChunkedWriter implements Closeable {
+
+  private final int maxChunkSizeInBytes;
+  private final Path output;
+  private SequenceFile.Writer writer;
+  private int currentChunkID;
+  private int currentChunkSize;
+  private final FileSystem fs;
+  private final Configuration conf;
+
+  public ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) 
throws IOException {
+    this.output = output;
+    this.conf = conf;
+    if (chunkSizeInMB > 1984) {
+      chunkSizeInMB = 1984;
+    }
+    maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+    fs = FileSystem.get(conf);
+    currentChunkID = 0;
+    writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), 
Text.class, Text.class);
+  }
+
+  private Path getPath(int chunkID) {
+    return new Path(output, "chunk-" + chunkID);
+  }
+
+  public void write(String key, String value) throws IOException {
+    if (currentChunkSize > maxChunkSizeInBytes) {
+      writer.close();
+      writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++), 
Text.class, Text.class);
+      currentChunkSize = 0;
+    }
+
+    Text keyT = new Text(key);
+    Text valueT = new Text(value);
+    currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // 
Overhead
+    writer.append(keyT, valueT);
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}
+

Added: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java?rev=1084582&view=auto
==============================================================================
--- 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
 (added)
+++ 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
 Wed Mar 23 13:32:26 2011
@@ -0,0 +1,46 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Default parser for parsing text into sequence files.
+ */
+public final class PrefixAdditionFilter extends 
SequenceFilesFromDirectoryFilter {
+  private static final Logger log = 
LoggerFactory.getLogger(PrefixAdditionFilter.class);
+
+  public PrefixAdditionFilter(Configuration conf, String keyPrefix, 
Map<String, String> options, ChunkedWriter writer)
+    throws IOException {
+    super(conf, keyPrefix, options, writer);
+  }
+
+  @Override
+  protected void process(FileStatus fst, Path current) throws IOException {
+    if (fst.isDir()) {
+      fs.listStatus(fst.getPath(),
+                    new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR + 
current.getName(),
+                        options, writer));
+    } else {
+      InputStream in = fs.open(fst.getPath());
+
+      StringBuilder file = new StringBuilder();
+      for (String aFit : new FileLineIterable(in, charset, false)) {
+        file.append(aFit).append('\n');
+      }
+      String name = current.getName().equals(fst.getPath().getName())
+          ? current.getName()
+          : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+      writer.write(prefix + Path.SEPARATOR + name, file.toString());
+    }
+  }
+}

Added: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java?rev=1084582&view=auto
==============================================================================
--- 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
 (added)
+++ 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
 Wed Mar 23 13:32:26 2011
@@ -0,0 +1,82 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Implements an example csv to sequence file parser.
+ */
+public final class SequenceFilesFromCsvFilter extends 
SequenceFilesFromDirectoryFilter {
+  private static final Logger log = 
LoggerFactory.getLogger(SequenceFilesFromCsvFilter.class);
+  private static final Pattern TAB = Pattern.compile("\\t");
+
+  public static final String[] KEY_COLUMN_OPTION = {"keyColumn", "kcol"};
+  public static final String[] VALUE_COLUMN_OPTION = {"valueColumn", "vcol"};
+
+  private volatile int keyColumn;
+  private volatile int valueColumn;
+
+  private SequenceFilesFromCsvFilter() {
+    // not initializing anything here.
+  }
+
+  public SequenceFilesFromCsvFilter(Configuration conf, String keyPrefix, 
Map<String, String> options, ChunkedWriter writer)
+    throws IOException {
+    super(conf, keyPrefix, options, writer);
+    this.keyColumn = Integer.parseInt(options.get(KEY_COLUMN_OPTION[0]));
+    this.valueColumn = Integer.parseInt(options.get(VALUE_COLUMN_OPTION[0]));
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new SequenceFilesFromCsvFilter(), args);
+  }
+
+  @Override
+  public void addOptions() {
+    super.addOptions();
+    addOption(KEY_COLUMN_OPTION[0], KEY_COLUMN_OPTION[1],
+      "The key column. Default to 0", "0");
+    addOption(VALUE_COLUMN_OPTION[0], VALUE_COLUMN_OPTION[1],
+      "The value column. Default to 1", "1");
+  }
+
+  @Override
+  public Map<String, String> parseOptions() throws IOException {
+    Map<String, String> options = super.parseOptions();
+    options.put(SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION[0], 
this.getClass().getName());
+    options.put(KEY_COLUMN_OPTION[0], getOption(KEY_COLUMN_OPTION[0]));
+    options.put(VALUE_COLUMN_OPTION[0], getOption(VALUE_COLUMN_OPTION[0]));
+    return options;
+  }
+
+  @Override
+  protected final void process(FileStatus fst, Path current) throws 
IOException {
+    if (fst.isDir()) {
+      fs.listStatus(fst.getPath(),
+                    new SequenceFilesFromCsvFilter(conf, prefix + 
Path.SEPARATOR + current.getName(),
+                        this.options, writer));
+    } else {
+      StringBuilder file = new StringBuilder();
+      InputStream in = fs.open(fst.getPath());
+      for (String aFit : new FileLineIterable(in, charset, false)) {
+        String[] columns = TAB.split(aFit);
+        log.info("key : " + columns[keyColumn] + ", value : "
+            + columns[valueColumn]);
+        String key = columns[keyColumn];
+        String value = columns[valueColumn];
+        writer.write(prefix + key, value);
+      }
+    }
+  }
+}

Modified: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1084582&r1=1084581&r2=1084582&view=diff
==============================================================================
--- 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
 (original)
+++ 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
 Wed Mar 23 13:32:26 2011
@@ -17,23 +17,20 @@
 
 package org.apache.mahout.text;
 
-import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.FileLineIterable;
 import org.apache.mahout.common.HadoopUtil;
 import org.apache.mahout.common.commandline.DefaultOptionCreator;
 import org.slf4j.Logger;
@@ -46,7 +43,7 @@ import org.slf4j.LoggerFactory;
  * parent directory prepended with a specified prefix. You can also specify 
the input encoding of the text
  * files. The content of the output SequenceFiles are encoded as UTF-8 text.
  */
-public final class SequenceFilesFromDirectory extends AbstractJob {
+public class SequenceFilesFromDirectory extends AbstractJob {
 
   private static final Logger log = 
LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
 
@@ -58,122 +55,30 @@ public final class SequenceFilesFromDire
   public static final String[] CHARSET_OPTION = {"charset", "c"};
 
   public void run(Configuration conf,
+                  String keyPrefix,
+                  Map<String, String> options,
                   Path input,
-                  Path output,
-                  String prefix,
-                  int chunkSizeInMB,
-                  Charset charset,
-                  String fileFilterClassName)
+                  Path output)
     throws InstantiationException, IllegalAccessException, 
InvocationTargetException, IOException,
            NoSuchMethodException, ClassNotFoundException {
     FileSystem fs = FileSystem.get(conf);
-    ChunkedWriter writer = new ChunkedWriter(conf, chunkSizeInMB, output);
-    
-    PathFilter pathFilter;
+    ChunkedWriter writer = new ChunkedWriter(conf, 
Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
     
+    SequenceFilesFromDirectoryFilter pathFilter;
+
+    String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
     if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
-      pathFilter = new PrefixAdditionFilter(conf, prefix, writer, charset);
+      pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer);
     } else {
-      Class<? extends PathFilter> pathFilterClass = 
Class.forName(fileFilterClassName).asSubclass(PathFilter.class);
-      Constructor<? extends PathFilter> constructor =
-          pathFilterClass.getConstructor(Configuration.class, String.class, 
ChunkedWriter.class, Charset.class);
-      pathFilter = constructor.newInstance(conf, prefix, writer, charset);
+      Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass = 
Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
+      Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
+          pathFilterClass.getConstructor(Configuration.class, String.class, 
Map.class, ChunkedWriter.class);
+      pathFilter = constructor.newInstance(conf, keyPrefix, options, writer);
     }
     fs.listStatus(input, pathFilter);
     writer.close();
   }
   
-  private static final class ChunkedWriter implements Closeable {
-
-    private final int maxChunkSizeInBytes;
-    private final Path output;
-    private SequenceFile.Writer writer;
-    private int currentChunkID;
-    private int currentChunkSize;
-    private final FileSystem fs;
-    private final Configuration conf;
-    
-    private ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) 
throws IOException {
-      this.output = output;
-      this.conf = conf;
-      if (chunkSizeInMB > 1984) {
-        chunkSizeInMB = 1984;
-      }
-      maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
-      fs = FileSystem.get(conf);
-      currentChunkID = 0;
-      writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), 
Text.class, Text.class);
-    }
-    
-    private Path getPath(int chunkID) {
-      return new Path(output, "chunk-" + chunkID);
-    }
-    
-    public void write(String key, String value) throws IOException {
-      if (currentChunkSize > maxChunkSizeInBytes) {
-        writer.close();
-        writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++), 
Text.class, Text.class);
-        currentChunkSize = 0;
-      }
-      
-      Text keyT = new Text(key);
-      Text valueT = new Text(value);
-      currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; 
// Overhead
-      writer.append(keyT, valueT);
-    }
-    
-    @Override
-    public void close() throws IOException {
-      writer.close();
-    }
-  }
-  
-  private final class PrefixAdditionFilter implements PathFilter {
-
-    private final String prefix;
-    private final ChunkedWriter writer;
-    private final Charset charset;
-    private final Configuration conf;
-    private final FileSystem fs;
-    
-    private PrefixAdditionFilter(Configuration conf, String prefix, 
ChunkedWriter writer, Charset charset)
-      throws IOException {
-      this.conf = conf;
-      this.prefix = prefix;
-      this.writer = writer;
-      this.charset = charset;
-      this.fs = FileSystem.get(conf);
-    }
-    
-    @Override
-    public boolean accept(Path current) {
-      log.debug("CURRENT: {}", current.getName());
-      try {
-        FileStatus[] fstatus = fs.listStatus(current);
-        for (FileStatus fst : fstatus) {
-          log.debug("CHILD: {}", fst.getPath().getName());
-          if (fst.isDir()) {
-            fs.listStatus(fst.getPath(),
-                          new PrefixAdditionFilter(conf, prefix + 
Path.SEPARATOR + current.getName(), writer, charset));
-          } else {
-            StringBuilder file = new StringBuilder();
-            InputStream in = fs.open(fst.getPath());
-            for (String aFit : new FileLineIterable(in, charset, false)) {
-              file.append(aFit).append('\n');
-            }
-            String name = current.getName().equals(fst.getPath().getName())
-                ? current.getName()
-                : current.getName() + Path.SEPARATOR + fst.getPath().getName();
-            writer.write(prefix + Path.SEPARATOR + name, file.toString());
-          }
-        }
-      } catch (IOException e) {
-        throw new IllegalStateException(e);
-      }
-      return false;
-    }
-  }
-
   public static void main(String[] args) throws Exception {
     ToolRunner.run(new SequenceFilesFromDirectory(), args);
   }
@@ -185,32 +90,49 @@ public final class SequenceFilesFromDire
   public int run(String[] args)
     throws IOException, ClassNotFoundException, InstantiationException, 
IllegalAccessException, NoSuchMethodException,
            InvocationTargetException {
-    
-    addInputOption();
-    addOutputOption();
-    addOption(DefaultOptionCreator.overwriteOption().create());
-    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in 
MegaBytes. Defaults to 64", "64");
-    addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
-        "The name of the class to use for file parsing. Default: " + 
PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
-    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be 
prepended to the key", "");
-    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
-        "The name of the character encoding of the input files. Default to 
UTF-8", "UTF-8");
+    addOptions();    
     
     if (parseArguments(args) == null) {
       return -1;
     }
-    
+   
+    Map<String, String> options = parseOptions();
     Path input = getInputPath();
     Path output = getOutputPath();
     if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
       HadoopUtil.overwriteOutput(output);
     }
-    int chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
-    String fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
     String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
-    Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-    
-    run(getConf(), input, output, keyPrefix, chunkSize, charset, 
fileFilterClassName);
+
+    run(getConf(), keyPrefix, options, input, output);
     return 0;
   }
+
+  /**
+   * Override this method in order to add additional options to the command 
line of the SequenceFileFromDirectory job.
+   * Do not forget to call super() otherwise all standard options 
(input/output dirs etc) will not be available.
+   * */
+  protected void addOptions() {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in 
MegaBytes. Defaults to 64", "64");
+    addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+        "The name of the class to use for file parsing. Default: " + 
PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+    addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be 
prepended to the key", "");
+    addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+        "The name of the character encoding of the input files. Default to 
UTF-8", "UTF-8");
+  }
+
+  /**
+   * Override this method in order to parse your additional options from the 
command line. Do not forget to call
+   * super() otherwise standard options (input/output dirs etc) will not be 
available.
+   */
+  protected Map<String, String> parseOptions() throws IOException {
+    Map<String, String> options = new HashMap<String, String>();
+    options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
+    options.put(FILE_FILTER_CLASS_OPTION[0], 
getOption(FILE_FILTER_CLASS_OPTION[0]));
+    options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
+    return options;
+  }
 }

Added: 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java?rev=1084582&view=auto
==============================================================================
--- 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
 (added)
+++ 
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
 Wed Mar 23 13:32:26 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implement this interface if you wish to extend SequenceFilesFromDirectory 
with your own parsing logic.
+ */
+public abstract class SequenceFilesFromDirectoryFilter extends 
SequenceFilesFromDirectory implements PathFilter {
+  private static final Logger log = 
LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
+
+  protected final String prefix;
+  protected final ChunkedWriter writer;
+  protected final Charset charset;
+  protected final Configuration conf;
+  protected final FileSystem fs;
+  protected final Map<String, String> options;
+
+  protected SequenceFilesFromDirectoryFilter() {
+    this.prefix = null;
+    this.writer = null;
+    this.charset = null;
+    this.conf = null;
+    this.fs = null;
+    this.options = null;
+  }
+
+  public SequenceFilesFromDirectoryFilter(Configuration conf, String 
keyPrefix, Map<String, String> options, ChunkedWriter writer)
+    throws IOException {
+    this.conf = conf;
+    this.prefix = keyPrefix;
+    this.writer = writer;
+    this.charset = 
Charset.forName(options.get(SequenceFilesFromDirectory.CHARSET_OPTION[0]));
+    this.fs = FileSystem.get(conf);
+    this.options = options;
+  }
+
+  protected final Map<String, String> getOptions() {
+    return options;
+  }
+
+  @Override
+  public final boolean accept(Path current) {
+    log.debug("CURRENT: {}", current.getName());
+    try {
+      FileStatus[] fstatus = fs.listStatus(current);
+      for (FileStatus fst : fstatus) {
+        log.debug("CHILD: {}", fst.getPath().getName());
+        process(fst, current);
+      }
+    } catch (Exception e) {
+      throw new IllegalStateException(e);
+    }
+    return false;
+  }
+
+  protected abstract void process(FileStatus in, Path current) throws 
IOException;
+}

Modified: 
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL: 
http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1084582&r1=1084581&r2=1084582&view=diff
==============================================================================
--- 
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
 (original)
+++ 
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
 Wed Mar 23 13:32:26 2011
@@ -44,15 +44,19 @@ public final class TestSequenceFilesFrom
       {"test3", "This is the third text."}
   };
 
+  private enum ParserType {TEXT, CSV};
+
   @Override
   @Before
   public void setUp() throws Exception {
     super.setUp();
   }
   
-  /** Story converting text files to SequenceFile */
+  /**
+   * Story converting text files to SequenceFile
+   */
   @Test
-  public void testSequnceFileFromTsvBasic() throws Exception {
+  public void testSequenceFileFromDirectoryBasic() throws Exception {
     // parameters
     Configuration conf = new Configuration();
     
@@ -74,7 +78,41 @@ public final class TestSequenceFilesFrom
         UTF8.displayName(Locale.ENGLISH), "--keyPrefix", prefix});
     
     // check output chunk files
-    checkChunkFiles(conf, outputDir, DATA1, prefix);
+    checkChunkFiles(conf, outputDir, DATA1, prefix, ParserType.TEXT);
+  }
+
+  /**
+   * Story converting a TSV file to SequenceFile
+   */
+  @Test
+  public void testSequnceFileFromDirectoryTsv() throws Exception {
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    
+    // parameters
+    final String prefix = "UID";
+    final int chunkSizeInMB = 64;
+    final int keyColumn = 0;
+    final int valueColumn = 1;
+    
+    // create
+    Path tmpDir = this.getTestTempDirPath();
+    Path inputDir = new Path(tmpDir, "inputDir");
+    fs.mkdirs(inputDir);
+    Path outputDir = new Path(tmpDir, "outputDir");
+    
+    // prepare input TSV file
+    createTsvFilesFromArrays(conf, inputDir, DATA1);
+    
+    // convert it to SequenceFile
+    SequenceFilesFromCsvFilter.main(new String[] {"--input", 
inputDir.toString(),
+        "--output", outputDir.toString(), "--charset", UTF8.name(),
+        "--chunkSize", Integer.toString(chunkSizeInMB), "--keyPrefix", prefix,
+        "--keyColumn", Integer.toString(keyColumn), "--valueColumn",
+        Integer.toString(valueColumn)});
+    
+    // check output chunk files
+    checkChunkFiles(conf, outputDir, DATA1, prefix, ParserType.CSV);
   }
 
   private static void createFilesFromArrays(Configuration conf, Path inputDir, 
String[][] data) throws IOException {
@@ -86,7 +124,18 @@ public final class TestSequenceFilesFrom
     }
   }
 
-  private static void checkChunkFiles(Configuration conf, Path outputDir, 
String[][] data, String prefix)
+  private static void createTsvFilesFromArrays(Configuration conf,
+      Path inputDir, String[][] data) throws IOException {
+    FileSystem fs = FileSystem.get(conf);
+    OutputStreamWriter osw = new OutputStreamWriter(fs.create(new 
Path(inputDir, "inputTsvFile")));
+    for (String[] aData : data) {
+      osw.write(aData[0] + "\t" + aData[1] + "\n");
+    }
+    osw.close();
+  }
+
+  private static void checkChunkFiles(Configuration conf, Path outputDir, 
String[][] data, String prefix,
+      ParserType inputType)
     throws IOException, InstantiationException, IllegalAccessException {
     FileSystem fs = FileSystem.get(conf);
     
@@ -104,10 +153,15 @@ public final class TestSequenceFilesFrom
     
     Map<String,String> fileToData = new HashMap<String,String>();
     for (String[] aData : data) {
-      fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+      if (ParserType.CSV == inputType) {
+        fileToData.put(prefix + aData[0], aData[1]);
+      }
+      else {
+        fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+      }
     }
 
-    for (String[] aData : data) {
+    for (int i=0; i<data.length; ++i) {
       assertTrue(reader.next(key, value));
       String retrievedData = fileToData.get(key.toString().trim());
       assertNotNull(retrievedData);


Reply via email to