Author: isabel
Date: Wed Mar 23 13:32:26 2011
New Revision: 1084582
URL: http://svn.apache.org/viewvc?rev=1084582&view=rev
Log:
MAHOUT-590 - added support for configurable directory-/ file layout for
SequenceFilesFromDirectory job. Includes a demo for tab separated value
formatted files.
Added:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
Modified:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Added:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java?rev=1084582&view=auto
==============================================================================
--- mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
(added)
+++ mahout/trunk/utils/src/main/java/org/apache/mahout/text/ChunkedWriter.java
Wed Mar 23 13:32:26 2011
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+public final class ChunkedWriter implements Closeable {
+
+ private final int maxChunkSizeInBytes;
+ private final Path output;
+ private SequenceFile.Writer writer;
+ private int currentChunkID;
+ private int currentChunkSize;
+ private final FileSystem fs;
+ private final Configuration conf;
+
+ public ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output)
throws IOException {
+ this.output = output;
+ this.conf = conf;
+ if (chunkSizeInMB > 1984) {
+ chunkSizeInMB = 1984;
+ }
+ maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+ fs = FileSystem.get(conf);
+ currentChunkID = 0;
+ writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID),
Text.class, Text.class);
+ }
+
+ private Path getPath(int chunkID) {
+ return new Path(output, "chunk-" + chunkID);
+ }
+
+ public void write(String key, String value) throws IOException {
+ if (currentChunkSize > maxChunkSizeInBytes) {
+ writer.close();
+ writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++),
Text.class, Text.class);
+ currentChunkSize = 0;
+ }
+
+ Text keyT = new Text(key);
+ Text valueT = new Text(value);
+ currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; //
Overhead
+ writer.append(keyT, valueT);
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+}
+
Added:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java?rev=1084582&view=auto
==============================================================================
---
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
(added)
+++
mahout/trunk/utils/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
Wed Mar 23 13:32:26 2011
@@ -0,0 +1,46 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.mahout.common.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Default parser for parsing text into sequence files.
+ */
+public final class PrefixAdditionFilter extends
SequenceFilesFromDirectoryFilter {
+ private static final Logger log =
LoggerFactory.getLogger(PrefixAdditionFilter.class);
+
+ public PrefixAdditionFilter(Configuration conf, String keyPrefix,
Map<String, String> options, ChunkedWriter writer)
+ throws IOException {
+ super(conf, keyPrefix, options, writer);
+ }
+
+ @Override
+ protected void process(FileStatus fst, Path current) throws IOException {
+ if (fst.isDir()) {
+ fs.listStatus(fst.getPath(),
+ new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR +
current.getName(),
+ options, writer));
+ } else {
+ InputStream in = fs.open(fst.getPath());
+
+ StringBuilder file = new StringBuilder();
+ for (String aFit : new FileLineIterable(in, charset, false)) {
+ file.append(aFit).append('\n');
+ }
+ String name = current.getName().equals(fst.getPath().getName())
+ ? current.getName()
+ : current.getName() + Path.SEPARATOR + fst.getPath().getName();
+ writer.write(prefix + Path.SEPARATOR + name, file.toString());
+ }
+ }
+}
Added:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java?rev=1084582&view=auto
==============================================================================
---
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
(added)
+++
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
Wed Mar 23 13:32:26 2011
@@ -0,0 +1,82 @@
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.FileLineIterable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/**
+ * Implements an example csv to sequence file parser.
+ */
+public final class SequenceFilesFromCsvFilter extends
SequenceFilesFromDirectoryFilter {
+ private static final Logger log =
LoggerFactory.getLogger(SequenceFilesFromCsvFilter.class);
+ private static final Pattern TAB = Pattern.compile("\\t");
+
+ public static final String[] KEY_COLUMN_OPTION = {"keyColumn", "kcol"};
+ public static final String[] VALUE_COLUMN_OPTION = {"valueColumn", "vcol"};
+
+ private volatile int keyColumn;
+ private volatile int valueColumn;
+
+ private SequenceFilesFromCsvFilter() {
+ // not initializing anything here.
+ }
+
+ public SequenceFilesFromCsvFilter(Configuration conf, String keyPrefix,
Map<String, String> options, ChunkedWriter writer)
+ throws IOException {
+ super(conf, keyPrefix, options, writer);
+ this.keyColumn = Integer.parseInt(options.get(KEY_COLUMN_OPTION[0]));
+ this.valueColumn = Integer.parseInt(options.get(VALUE_COLUMN_OPTION[0]));
+ }
+
+ public static void main(String[] args) throws Exception {
+ ToolRunner.run(new SequenceFilesFromCsvFilter(), args);
+ }
+
+ @Override
+ public void addOptions() {
+ super.addOptions();
+ addOption(KEY_COLUMN_OPTION[0], KEY_COLUMN_OPTION[1],
+ "The key column. Default to 0", "0");
+ addOption(VALUE_COLUMN_OPTION[0], VALUE_COLUMN_OPTION[1],
+ "The value column. Default to 1", "1");
+ }
+
+ @Override
+ public Map<String, String> parseOptions() throws IOException {
+ Map<String, String> options = super.parseOptions();
+ options.put(SequenceFilesFromDirectory.FILE_FILTER_CLASS_OPTION[0],
this.getClass().getName());
+ options.put(KEY_COLUMN_OPTION[0], getOption(KEY_COLUMN_OPTION[0]));
+ options.put(VALUE_COLUMN_OPTION[0], getOption(VALUE_COLUMN_OPTION[0]));
+ return options;
+ }
+
+ @Override
+ protected final void process(FileStatus fst, Path current) throws
IOException {
+ if (fst.isDir()) {
+ fs.listStatus(fst.getPath(),
+ new SequenceFilesFromCsvFilter(conf, prefix +
Path.SEPARATOR + current.getName(),
+ this.options, writer));
+ } else {
+ StringBuilder file = new StringBuilder();
+ InputStream in = fs.open(fst.getPath());
+ for (String aFit : new FileLineIterable(in, charset, false)) {
+ String[] columns = TAB.split(aFit);
+ log.info("key : " + columns[keyColumn] + ", value : "
+ + columns[valueColumn]);
+ String key = columns[keyColumn];
+ String value = columns[valueColumn];
+ writer.write(prefix + key, value);
+ }
+ }
+ }
+}
Modified:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1084582&r1=1084581&r2=1084582&view=diff
==============================================================================
---
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
(original)
+++
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
Wed Mar 23 13:32:26 2011
@@ -17,23 +17,20 @@
package org.apache.mahout.text;
-import java.io.Closeable;
import java.io.IOException;
-import java.io.InputStream;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.AbstractJob;
-import org.apache.mahout.common.FileLineIterable;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
import org.slf4j.Logger;
@@ -46,7 +43,7 @@ import org.slf4j.LoggerFactory;
* parent directory prepended with a specified prefix. You can also specify
the input encoding of the text
* files. The content of the output SequenceFiles are encoded as UTF-8 text.
*/
-public final class SequenceFilesFromDirectory extends AbstractJob {
+public class SequenceFilesFromDirectory extends AbstractJob {
private static final Logger log =
LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
@@ -58,122 +55,30 @@ public final class SequenceFilesFromDire
public static final String[] CHARSET_OPTION = {"charset", "c"};
public void run(Configuration conf,
+ String keyPrefix,
+ Map<String, String> options,
Path input,
- Path output,
- String prefix,
- int chunkSizeInMB,
- Charset charset,
- String fileFilterClassName)
+ Path output)
throws InstantiationException, IllegalAccessException,
InvocationTargetException, IOException,
NoSuchMethodException, ClassNotFoundException {
FileSystem fs = FileSystem.get(conf);
- ChunkedWriter writer = new ChunkedWriter(conf, chunkSizeInMB, output);
-
- PathFilter pathFilter;
+ ChunkedWriter writer = new ChunkedWriter(conf,
Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
+ SequenceFilesFromDirectoryFilter pathFilter;
+
+ String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
- pathFilter = new PrefixAdditionFilter(conf, prefix, writer, charset);
+ pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options, writer);
} else {
- Class<? extends PathFilter> pathFilterClass =
Class.forName(fileFilterClassName).asSubclass(PathFilter.class);
- Constructor<? extends PathFilter> constructor =
- pathFilterClass.getConstructor(Configuration.class, String.class,
ChunkedWriter.class, Charset.class);
- pathFilter = constructor.newInstance(conf, prefix, writer, charset);
+ Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass =
Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
+ Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
+ pathFilterClass.getConstructor(Configuration.class, String.class,
Map.class, ChunkedWriter.class);
+ pathFilter = constructor.newInstance(conf, keyPrefix, options, writer);
}
fs.listStatus(input, pathFilter);
writer.close();
}
- private static final class ChunkedWriter implements Closeable {
-
- private final int maxChunkSizeInBytes;
- private final Path output;
- private SequenceFile.Writer writer;
- private int currentChunkID;
- private int currentChunkSize;
- private final FileSystem fs;
- private final Configuration conf;
-
- private ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output)
throws IOException {
- this.output = output;
- this.conf = conf;
- if (chunkSizeInMB > 1984) {
- chunkSizeInMB = 1984;
- }
- maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
- fs = FileSystem.get(conf);
- currentChunkID = 0;
- writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID),
Text.class, Text.class);
- }
-
- private Path getPath(int chunkID) {
- return new Path(output, "chunk-" + chunkID);
- }
-
- public void write(String key, String value) throws IOException {
- if (currentChunkSize > maxChunkSizeInBytes) {
- writer.close();
- writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID++),
Text.class, Text.class);
- currentChunkSize = 0;
- }
-
- Text keyT = new Text(key);
- Text valueT = new Text(value);
- currentChunkSize += keyT.getBytes().length + valueT.getBytes().length;
// Overhead
- writer.append(keyT, valueT);
- }
-
- @Override
- public void close() throws IOException {
- writer.close();
- }
- }
-
- private final class PrefixAdditionFilter implements PathFilter {
-
- private final String prefix;
- private final ChunkedWriter writer;
- private final Charset charset;
- private final Configuration conf;
- private final FileSystem fs;
-
- private PrefixAdditionFilter(Configuration conf, String prefix,
ChunkedWriter writer, Charset charset)
- throws IOException {
- this.conf = conf;
- this.prefix = prefix;
- this.writer = writer;
- this.charset = charset;
- this.fs = FileSystem.get(conf);
- }
-
- @Override
- public boolean accept(Path current) {
- log.debug("CURRENT: {}", current.getName());
- try {
- FileStatus[] fstatus = fs.listStatus(current);
- for (FileStatus fst : fstatus) {
- log.debug("CHILD: {}", fst.getPath().getName());
- if (fst.isDir()) {
- fs.listStatus(fst.getPath(),
- new PrefixAdditionFilter(conf, prefix +
Path.SEPARATOR + current.getName(), writer, charset));
- } else {
- StringBuilder file = new StringBuilder();
- InputStream in = fs.open(fst.getPath());
- for (String aFit : new FileLineIterable(in, charset, false)) {
- file.append(aFit).append('\n');
- }
- String name = current.getName().equals(fst.getPath().getName())
- ? current.getName()
- : current.getName() + Path.SEPARATOR + fst.getPath().getName();
- writer.write(prefix + Path.SEPARATOR + name, file.toString());
- }
- }
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- return false;
- }
- }
-
public static void main(String[] args) throws Exception {
ToolRunner.run(new SequenceFilesFromDirectory(), args);
}
@@ -185,32 +90,49 @@ public final class SequenceFilesFromDire
public int run(String[] args)
throws IOException, ClassNotFoundException, InstantiationException,
IllegalAccessException, NoSuchMethodException,
InvocationTargetException {
-
- addInputOption();
- addOutputOption();
- addOption(DefaultOptionCreator.overwriteOption().create());
- addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in
MegaBytes. Defaults to 64", "64");
- addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
- "The name of the class to use for file parsing. Default: " +
PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
- addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be
prepended to the key", "");
- addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
- "The name of the character encoding of the input files. Default to
UTF-8", "UTF-8");
+ addOptions();
if (parseArguments(args) == null) {
return -1;
}
-
+
+ Map<String, String> options = parseOptions();
Path input = getInputPath();
Path output = getOutputPath();
if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
HadoopUtil.overwriteOutput(output);
}
- int chunkSize = Integer.parseInt(getOption(CHUNK_SIZE_OPTION[0]));
- String fileFilterClassName = getOption(FILE_FILTER_CLASS_OPTION[0]);
String keyPrefix = getOption(KEY_PREFIX_OPTION[0]);
- Charset charset = Charset.forName(getOption(CHARSET_OPTION[0]));
-
- run(getConf(), input, output, keyPrefix, chunkSize, charset,
fileFilterClassName);
+
+ run(getConf(), keyPrefix, options, input, output);
return 0;
}
+
+ /**
+ * Override this method in order to add additional options to the command
line of the SequenceFileFromDirectory job.
+ * Do not forget to call super() otherwise all standard options
(input/output dirs etc) will not be available.
+ * */
+ protected void addOptions() {
+ addInputOption();
+ addOutputOption();
+ addOption(DefaultOptionCreator.overwriteOption().create());
+ addOption(CHUNK_SIZE_OPTION[0], CHUNK_SIZE_OPTION[1], "The chunkSize in
MegaBytes. Defaults to 64", "64");
+ addOption(FILE_FILTER_CLASS_OPTION[0], FILE_FILTER_CLASS_OPTION[1],
+ "The name of the class to use for file parsing. Default: " +
PREFIX_ADDITION_FILTER, PREFIX_ADDITION_FILTER);
+ addOption(KEY_PREFIX_OPTION[0], KEY_PREFIX_OPTION[1], "The prefix to be
prepended to the key", "");
+ addOption(CHARSET_OPTION[0], CHARSET_OPTION[1],
+ "The name of the character encoding of the input files. Default to
UTF-8", "UTF-8");
+ }
+
+ /**
+ * Override this method in order to parse your additional options from the
command line. Do not forget to call
+ * super() otherwise standard options (input/output dirs etc) will not be
available.
+ */
+ protected Map<String, String> parseOptions() throws IOException {
+ Map<String, String> options = new HashMap<String, String>();
+ options.put(CHUNK_SIZE_OPTION[0], getOption(CHUNK_SIZE_OPTION[0]));
+ options.put(FILE_FILTER_CLASS_OPTION[0],
getOption(FILE_FILTER_CLASS_OPTION[0]));
+ options.put(CHARSET_OPTION[0], getOption(CHARSET_OPTION[0]));
+ return options;
+ }
}
Added:
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java?rev=1084582&view=auto
==============================================================================
---
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
(added)
+++
mahout/trunk/utils/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
Wed Mar 23 13:32:26 2011
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.text;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implement this interface if you wish to extend SequenceFilesFromDirectory
with your own parsing logic.
+ */
+public abstract class SequenceFilesFromDirectoryFilter extends
SequenceFilesFromDirectory implements PathFilter {
+ private static final Logger log =
LoggerFactory.getLogger(SequenceFilesFromDirectoryFilter.class);
+
+ protected final String prefix;
+ protected final ChunkedWriter writer;
+ protected final Charset charset;
+ protected final Configuration conf;
+ protected final FileSystem fs;
+ protected final Map<String, String> options;
+
+ protected SequenceFilesFromDirectoryFilter() {
+ this.prefix = null;
+ this.writer = null;
+ this.charset = null;
+ this.conf = null;
+ this.fs = null;
+ this.options = null;
+ }
+
+ public SequenceFilesFromDirectoryFilter(Configuration conf, String
keyPrefix, Map<String, String> options, ChunkedWriter writer)
+ throws IOException {
+ this.conf = conf;
+ this.prefix = keyPrefix;
+ this.writer = writer;
+ this.charset =
Charset.forName(options.get(SequenceFilesFromDirectory.CHARSET_OPTION[0]));
+ this.fs = FileSystem.get(conf);
+ this.options = options;
+ }
+
+ protected final Map<String, String> getOptions() {
+ return options;
+ }
+
+ @Override
+ public final boolean accept(Path current) {
+ log.debug("CURRENT: {}", current.getName());
+ try {
+ FileStatus[] fstatus = fs.listStatus(current);
+ for (FileStatus fst : fstatus) {
+ log.debug("CHILD: {}", fst.getPath().getName());
+ process(fst, current);
+ }
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ return false;
+ }
+
+ protected abstract void process(FileStatus in, Path current) throws
IOException;
+}
Modified:
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java?rev=1084582&r1=1084581&r2=1084582&view=diff
==============================================================================
---
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
(original)
+++
mahout/trunk/utils/src/test/java/org/apache/mahout/text/TestSequenceFilesFromDirectory.java
Wed Mar 23 13:32:26 2011
@@ -44,15 +44,19 @@ public final class TestSequenceFilesFrom
{"test3", "This is the third text."}
};
+ private enum ParserType {TEXT, CSV};
+
@Override
@Before
public void setUp() throws Exception {
super.setUp();
}
- /** Story converting text files to SequenceFile */
+ /**
+ * Story converting text files to SequenceFile
+ */
@Test
- public void testSequnceFileFromTsvBasic() throws Exception {
+ public void testSequenceFileFromDirectoryBasic() throws Exception {
// parameters
Configuration conf = new Configuration();
@@ -74,7 +78,41 @@ public final class TestSequenceFilesFrom
UTF8.displayName(Locale.ENGLISH), "--keyPrefix", prefix});
// check output chunk files
- checkChunkFiles(conf, outputDir, DATA1, prefix);
+ checkChunkFiles(conf, outputDir, DATA1, prefix, ParserType.TEXT);
+ }
+
+ /**
+ * Story converting a TSV file to SequenceFile
+ */
+ @Test
+ public void testSequnceFileFromDirectoryTsv() throws Exception {
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.get(conf);
+
+ // parameters
+ final String prefix = "UID";
+ final int chunkSizeInMB = 64;
+ final int keyColumn = 0;
+ final int valueColumn = 1;
+
+ // create
+ Path tmpDir = this.getTestTempDirPath();
+ Path inputDir = new Path(tmpDir, "inputDir");
+ fs.mkdirs(inputDir);
+ Path outputDir = new Path(tmpDir, "outputDir");
+
+ // prepare input TSV file
+ createTsvFilesFromArrays(conf, inputDir, DATA1);
+
+ // convert it to SequenceFile
+ SequenceFilesFromCsvFilter.main(new String[] {"--input",
inputDir.toString(),
+ "--output", outputDir.toString(), "--charset", UTF8.name(),
+ "--chunkSize", Integer.toString(chunkSizeInMB), "--keyPrefix", prefix,
+ "--keyColumn", Integer.toString(keyColumn), "--valueColumn",
+ Integer.toString(valueColumn)});
+
+ // check output chunk files
+ checkChunkFiles(conf, outputDir, DATA1, prefix, ParserType.CSV);
}
private static void createFilesFromArrays(Configuration conf, Path inputDir,
String[][] data) throws IOException {
@@ -86,7 +124,18 @@ public final class TestSequenceFilesFrom
}
}
- private static void checkChunkFiles(Configuration conf, Path outputDir,
String[][] data, String prefix)
+ private static void createTsvFilesFromArrays(Configuration conf,
+ Path inputDir, String[][] data) throws IOException {
+ FileSystem fs = FileSystem.get(conf);
+ OutputStreamWriter osw = new OutputStreamWriter(fs.create(new
Path(inputDir, "inputTsvFile")));
+ for (String[] aData : data) {
+ osw.write(aData[0] + "\t" + aData[1] + "\n");
+ }
+ osw.close();
+ }
+
+ private static void checkChunkFiles(Configuration conf, Path outputDir,
String[][] data, String prefix,
+ ParserType inputType)
throws IOException, InstantiationException, IllegalAccessException {
FileSystem fs = FileSystem.get(conf);
@@ -104,10 +153,15 @@ public final class TestSequenceFilesFrom
Map<String,String> fileToData = new HashMap<String,String>();
for (String[] aData : data) {
- fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+ if (ParserType.CSV == inputType) {
+ fileToData.put(prefix + aData[0], aData[1]);
+ }
+ else {
+ fileToData.put(prefix + Path.SEPARATOR + aData[0], aData[1]);
+ }
}
- for (String[] aData : data) {
+ for (int i=0; i<data.length; ++i) {
assertTrue(reader.next(key, value));
String retrievedData = fileToData.get(key.toString().trim());
assertNotNull(retrievedData);