Author: srowen
Date: Sat Jun 4 11:04:48 2011
New Revision: 1131376
URL: http://svn.apache.org/viewvc?rev=1131376&view=rev
Log:
MAHOUT-700 better use of FileSystem to accommodate S3
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/ChunkedWriter.java
Sat Jun 4 11:04:48 2011
@@ -43,7 +43,7 @@ public final class ChunkedWriter impleme
chunkSizeInMB = 1984;
}
maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
- fs = FileSystem.get(conf);
+ fs = FileSystem.get(output.toUri(), conf);
currentChunkID = 0;
writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID),
Text.class, Text.class);
}
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/PrefixAdditionFilter.java
Sat Jun 4 11:04:48 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.mahout.common.iterator.FileLineIterable;
@@ -32,9 +33,12 @@ import java.util.Map;
*/
public final class PrefixAdditionFilter extends
SequenceFilesFromDirectoryFilter {
- public PrefixAdditionFilter(Configuration conf, String keyPrefix,
Map<String, String> options, ChunkedWriter writer)
- throws IOException {
- super(conf, keyPrefix, options, writer);
+ public PrefixAdditionFilter(Configuration conf,
+ String keyPrefix,
+ Map<String, String> options,
+ ChunkedWriter writer,
+ FileSystem fs) {
+ super(conf, keyPrefix, options, writer, fs);
}
@Override
@@ -42,7 +46,7 @@ public final class PrefixAdditionFilter
if (fst.isDir()) {
fs.listStatus(fst.getPath(),
new PrefixAdditionFilter(conf, prefix + Path.SEPARATOR +
current.getName(),
- options, writer));
+ options, writer, fs));
} else {
InputStream in = null;
try {
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromCsvFilter.java
Sat Jun 4 11:04:48 2011
@@ -19,6 +19,7 @@ package org.apache.mahout.text;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.ToolRunner;
import org.apache.mahout.common.iterator.FileLineIterable;
@@ -51,8 +52,9 @@ public final class SequenceFilesFromCsvF
public SequenceFilesFromCsvFilter(Configuration conf,
String keyPrefix,
Map<String, String> options,
- ChunkedWriter writer) throws IOException {
- super(conf, keyPrefix, options, writer);
+ ChunkedWriter writer,
+ FileSystem fs) {
+ super(conf, keyPrefix, options, writer, fs);
this.keyColumn = Integer.parseInt(options.get(KEY_COLUMN_OPTION[0]));
this.valueColumn = Integer.parseInt(options.get(VALUE_COLUMN_OPTION[0]));
}
@@ -84,7 +86,7 @@ public final class SequenceFilesFromCsvF
if (fst.isDir()) {
fs.listStatus(fst.getPath(),
new SequenceFilesFromCsvFilter(conf, prefix +
Path.SEPARATOR + current.getName(),
- this.options, writer));
+ this.options, writer, fs));
} else {
InputStream in = fs.open(fst.getPath());
for (CharSequence aFit : new FileLineIterable(in, charset, false)) {
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectory.java
Sat Jun 4 11:04:48 2011
@@ -32,8 +32,6 @@ import org.apache.hadoop.util.ToolRunner
import org.apache.mahout.common.AbstractJob;
import org.apache.mahout.common.HadoopUtil;
import org.apache.mahout.common.commandline.DefaultOptionCreator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Converts a directory of text documents into SequenceFiles of Specified
chunkSize. This class takes in a
@@ -44,8 +42,6 @@ import org.slf4j.LoggerFactory;
*/
public class SequenceFilesFromDirectory extends AbstractJob {
- private static final Logger log =
LoggerFactory.getLogger(SequenceFilesFromDirectory.class);
-
private static final String PREFIX_ADDITION_FILTER =
PrefixAdditionFilter.class.getName();
public static final String[] CHUNK_SIZE_OPTION = {"chunkSize", "chunk"};
@@ -60,21 +56,24 @@ public class SequenceFilesFromDirectory
Path output)
throws InstantiationException, IllegalAccessException,
InvocationTargetException, IOException,
NoSuchMethodException, ClassNotFoundException {
- FileSystem fs = FileSystem.get(conf);
+ FileSystem fs = FileSystem.get(input.toUri(), conf);
ChunkedWriter writer = new ChunkedWriter(conf,
Integer.parseInt(options.get(CHUNK_SIZE_OPTION[0])), output);
try {
SequenceFilesFromDirectoryFilter pathFilter;
-
String fileFilterClassName = options.get(FILE_FILTER_CLASS_OPTION[0]);
if (PrefixAdditionFilter.class.getName().equals(fileFilterClassName)) {
- pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options,
writer);
+ pathFilter = new PrefixAdditionFilter(conf, keyPrefix, options,
writer, fs);
} else {
Class<? extends SequenceFilesFromDirectoryFilter> pathFilterClass =
Class.forName(fileFilterClassName).asSubclass(SequenceFilesFromDirectoryFilter.class);
Constructor<? extends SequenceFilesFromDirectoryFilter> constructor =
- pathFilterClass.getConstructor(Configuration.class, String.class,
Map.class, ChunkedWriter.class);
- pathFilter = constructor.newInstance(conf, keyPrefix, options, writer);
+ pathFilterClass.getConstructor(Configuration.class,
+ String.class,
+ Map.class,
+ ChunkedWriter.class,
+ FileSystem.class);
+ pathFilter = constructor.newInstance(conf, keyPrefix, options, writer,
fs);
}
fs.listStatus(input, pathFilter);
} finally {
Modified:
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
URL:
http://svn.apache.org/viewvc/mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java?rev=1131376&r1=1131375&r2=1131376&view=diff
==============================================================================
---
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
(original)
+++
mahout/trunk/integration/src/main/java/org/apache/mahout/text/SequenceFilesFromDirectoryFilter.java
Sat Jun 4 11:04:48 2011
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.mahout.text;
import org.apache.hadoop.conf.Configuration;
@@ -53,13 +54,13 @@ public abstract class SequenceFilesFromD
protected SequenceFilesFromDirectoryFilter(Configuration conf,
String keyPrefix,
Map<String, String> options,
- ChunkedWriter writer)
- throws IOException {
+ ChunkedWriter writer,
+ FileSystem fs) {
this.conf = conf;
this.prefix = keyPrefix;
this.writer = writer;
this.charset =
Charset.forName(options.get(SequenceFilesFromDirectory.CHARSET_OPTION[0]));
- this.fs = FileSystem.get(conf);
+ this.fs = fs;
this.options = options;
}