http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
new file mode 100644
index 0000000..7db836f
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/email/MailProcessor.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.email;
+
+import org.apache.mahout.common.iterator.FileLineIterable;
+import org.apache.mahout.utils.io.ChunkedWriter;
+import org.apache.mahout.utils.io.ChunkedWrapper;
+import org.apache.mahout.utils.io.IOWriterWrapper;
+import org.apache.mahout.utils.io.WrappedWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Converts an mbox mail archive into a group of Hadoop Sequence Files with 
equal size. The archive may optionally be
+ * gzipped or zipped. @see org.apache.mahout.text.SequenceFilesFromMailArchives
+ */
+public class MailProcessor {
+
+  private static final Pattern MESSAGE_START = Pattern.compile("^From 
\\S+@\\S.*\\d{4}$", Pattern.CASE_INSENSITIVE);
+  private static final Pattern MESSAGE_ID_PREFIX = 
Pattern.compile("^message-id: <(.*)>$", Pattern.CASE_INSENSITIVE);
+  // regular expressions used to parse individual messages
+  public static final Pattern SUBJECT_PREFIX = Pattern.compile("^subject: 
(.*)$", Pattern.CASE_INSENSITIVE);
+  //we need to have at least one character
+  public static final Pattern FROM_PREFIX = Pattern.compile("^from: (\\S.*)$", 
Pattern.CASE_INSENSITIVE);
+  public static final Pattern REFS_PREFIX = Pattern.compile("^references: 
(.*)$", Pattern.CASE_INSENSITIVE);
+  public static final Pattern TO_PREFIX = Pattern.compile("^to: (.*)$", 
Pattern.CASE_INSENSITIVE);
+
+  private final String prefix;
+  private final MailOptions options;
+  private final WrappedWriter writer;
+
+  private static final Logger log = 
LoggerFactory.getLogger(MailProcessor.class);
+
+  /**
+   * Creates a {@code MailProcessor} that does not write to sequence files, 
but to a single text file.
+   * This constructor is for debugging and testing purposes.
+   */
+  public MailProcessor(MailOptions options, String prefix, Writer writer) {
+    this.writer = new IOWriterWrapper(writer);
+    this.options = options;
+    this.prefix = prefix;
+  }
+
+  /**
+   * This is the main constructor of {@code MailProcessor}.
+   */
+  public MailProcessor(MailOptions options, String prefix, ChunkedWriter 
writer) {
+    this.writer = new ChunkedWrapper(writer);
+    this.options = options;
+    this.prefix = prefix;
+  }
+
+  /**
+   * Parses one complete mail archive, writing output to the {@code writer} 
constructor parameter.
+   * @param mboxFile  mail archive to parse
+   * @return number of parsed mails
+   * @throws IOException
+   */
+  public long parseMboxLineByLine(File mboxFile) throws IOException {
+    long messageCount = 0;
+    try {
+      StringBuilder contents = new StringBuilder();
+      // tmps used during mail message parsing
+      StringBuilder body = new StringBuilder();
+      Matcher messageIdMatcher = MESSAGE_ID_PREFIX.matcher("");
+      Matcher messageBoundaryMatcher = MESSAGE_START.matcher("");
+      String[] patternResults = new 
String[options.getPatternsToMatch().length];
+      Matcher[] matchers = new Matcher[options.getPatternsToMatch().length];
+      for (int i = 0; i < matchers.length; i++) {
+        matchers[i] = options.getPatternsToMatch()[i].matcher("");
+      }
+
+      String messageId = null;
+      boolean inBody = false;
+      Pattern quotedTextPattern = options.getQuotedTextPattern();
+      for (String nextLine : new FileLineIterable(mboxFile, 
options.getCharset(), false)) {
+        if (options.isStripQuotedText() && 
quotedTextPattern.matcher(nextLine).find()) {
+          continue;
+        }
+        for (int i = 0; i < matchers.length; i++) {
+          Matcher matcher = matchers[i];
+          matcher.reset(nextLine);
+          if (matcher.matches()) {
+            patternResults[i] = matcher.group(1);
+          }
+        }
+
+        // only start appending body content after we've seen a message ID
+        if (messageId != null) {
+          // first, see if we hit the end of the message
+          messageBoundaryMatcher.reset(nextLine);
+          if (messageBoundaryMatcher.matches()) {
+            // done parsing this message ... write it out
+            String key = generateKey(mboxFile, prefix, messageId);
+            //if this ordering changes, then also change 
FromEmailToDictionaryMapper
+            writeContent(options.getSeparator(), contents, body, 
patternResults);
+            writer.write(key, contents.toString());
+            contents.setLength(0); // reset the buffer
+            body.setLength(0);
+
+            messageId = null;
+            inBody = false;
+          } else {
+            if (inBody && options.isIncludeBody()) {
+              if (!nextLine.isEmpty()) {
+                body.append(nextLine).append(options.getBodySeparator());
+              }
+            } else {
+              // first empty line we see after reading the message Id
+              // indicates that we are in the body ...
+              inBody = nextLine.isEmpty();
+            }
+          }
+        } else {
+          if (nextLine.length() > 14) {
+            messageIdMatcher.reset(nextLine);
+            if (messageIdMatcher.matches()) {
+              messageId = messageIdMatcher.group(1);
+              ++messageCount;
+            }
+          }
+        }
+      }
+      // write the last message in the file if available
+      if (messageId != null) {
+        String key = generateKey(mboxFile, prefix, messageId);
+        writeContent(options.getSeparator(), contents, body, patternResults);
+        writer.write(key, contents.toString());
+        contents.setLength(0); // reset the buffer
+      }
+    } catch (FileNotFoundException e) {
+      // Skip file.
+      log.warn("Unable to process non-existing file", e);
+    }
+    // TODO: report exceptions and continue;
+    return messageCount;
+  }
+
+  protected static String generateKey(File mboxFile, String prefix, String 
messageId) {
+    return prefix + File.separator + mboxFile.getName() + File.separator + 
messageId;
+  }
+
+  public String getPrefix() {
+    return prefix;
+  }
+
+  public MailOptions getOptions() {
+    return options;
+  }
+
+  private static void writeContent(String separator, StringBuilder contents, 
CharSequence body, String[] matches) {
+    for (String match : matches) {
+      if (match != null) {
+        contents.append(match).append(separator);
+      } else {
+        contents.append(separator);
+      }
+    }
+    contents.append('\n').append(body);
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
new file mode 100644
index 0000000..473e86a
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWrapper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.io;
+
+import java.io.IOException;
+
+/**
+ * {@link ChunkedWriter} based implementation of the {@link WrappedWriter} 
interface.
+ */
+public class ChunkedWrapper implements WrappedWriter {
+
+  private final ChunkedWriter writer;
+
+  public ChunkedWrapper(ChunkedWriter writer) {
+    this.writer = writer;
+  }
+
+  @Override
+  public void write(String key, String value) throws IOException {
+    writer.write(key, value);
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
new file mode 100644
index 0000000..66cf15f
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/ChunkedWriter.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mahout.utils.io;
+
+import com.google.common.io.Closeables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Writes data splitted in multiple Hadoop sequence files of approximate equal 
size. The data must consist
+ * of key-value pairs, both of them of String type. All sequence files are 
created in the same
+ * directory and named "chunk-0", "chunk-1", etc. 
+ */
+public final class ChunkedWriter implements Closeable {
+
+  private final int maxChunkSizeInBytes;
+  private final Path output;
+  private SequenceFile.Writer writer;
+  private int currentChunkID;
+  private int currentChunkSize;
+  private final FileSystem fs;
+  private final Configuration conf;
+
+  /** 
+   * @param conf    needed by Hadoop to know what filesystem implementation to 
use.
+   * @param chunkSizeInMB approximate size of each file, in Megabytes.
+   * @param output        directory where the sequence files will be created.
+   * @throws IOException
+   */
+  public ChunkedWriter(Configuration conf, int chunkSizeInMB, Path output) 
throws IOException {
+    this.output = output;
+    this.conf = conf;
+    if (chunkSizeInMB > 1984) {
+      chunkSizeInMB = 1984;
+    }
+    maxChunkSizeInBytes = chunkSizeInMB * 1024 * 1024;
+    fs = FileSystem.get(output.toUri(), conf);
+    currentChunkID = 0;
+    writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), 
Text.class, Text.class);
+  }
+
+  private Path getPath(int chunkID) {
+    return new Path(output, "chunk-" + chunkID);
+  }
+
+  /** Writes a new key-value pair, creating a new sequence file if necessary.*/
+  public void write(String key, String value) throws IOException {
+    if (currentChunkSize > maxChunkSizeInBytes) {
+      Closeables.close(writer, false);
+      currentChunkID++;
+      writer = new SequenceFile.Writer(fs, conf, getPath(currentChunkID), 
Text.class, Text.class);
+      currentChunkSize = 0;
+    }
+
+    Text keyT = new Text(key);
+    Text valueT = new Text(value);
+    currentChunkSize += keyT.getBytes().length + valueT.getBytes().length; // 
Overhead
+    writer.append(keyT, valueT);
+  }
+
+  @Override
+  public void close() throws IOException {
+    Closeables.close(writer, false);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
new file mode 100644
index 0000000..b7c3d42
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/IOWriterWrapper.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.io;
+
+import java.io.IOException;
+import java.io.Writer;
+/**
+ * Implementation of the {@link WrappedWriter} interface based on {@link 
java.io.Writer}.
+ */
+public class IOWriterWrapper implements WrappedWriter {
+
+  private final Writer writer;
+
+  public IOWriterWrapper(Writer writer) {
+    this.writer = writer;
+  }
+
+  /** Writes a new key and value, separating them with one space. The value 
must end with a
+   * new line or some other delimiter, as it is not automatically added by 
this method 
+   */
+  @Override
+  public void write(String key, String value) throws IOException {
+    writer.write(key + ' ' + value);
+  }
+
+  @Override
+  public void close() throws IOException {
+    writer.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
new file mode 100644
index 0000000..b9900e9
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/io/WrappedWriter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.io;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Convenience class for wrapping either a java.io.Writer or a 
SequenceFile.Writer with some basic functionality
+ */
+public interface WrappedWriter extends Closeable {
+
+  /** Writes a new key-value pair.*/
+  void write(String key, String value) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java
new file mode 100644
index 0000000..964c8cc
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/nlp/collocations/llr/BloomTokenFilter.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.mahout.utils.nlp.collocations.llr;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CodingErrorAction;
+
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.util.bloom.Filter;
+import org.apache.hadoop.util.bloom.Key;
+import org.apache.lucene.analysis.TokenFilter;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+
+/**
+ * Emits tokens based on bloom filter membership.
+ */
+public final class BloomTokenFilter extends TokenFilter {
+  
+  private final Filter filter;
+  private final CharTermAttribute termAtt;
+  private final CharsetEncoder encoder;
+  private final Key key;
+  private final boolean keepMembers;
+  
+  /** 
+   * @param filter tokens will be checked for membership in this bloom filter
+   * @param in the tokenstream to read.
+   * @param keepMembers keep memoers of the bloom filter? If true works like
+   *   a whitelist and members found in the list are kept and all others are
+   *   dropped. If false works like a stoplist and members found in the 
+   *   filter are dropped all others are kept.
+   */
+  public BloomTokenFilter(Filter filter, boolean keepMembers, TokenStream in) {
+    super(in);
+    this.filter = filter;
+    this.keepMembers = keepMembers;
+    this.key = new Key();
+    this.termAtt = addAttribute(CharTermAttribute.class);
+    this.encoder = Charsets.UTF_8.newEncoder().
+      onMalformedInput(CodingErrorAction.REPORT).
+      onUnmappableCharacter(CodingErrorAction.REPORT);
+  }
+  
+  @Override
+  public boolean incrementToken() throws IOException {
+    while (input.incrementToken()) {
+      ByteBuffer bytes =  encoder.encode(CharBuffer.wrap(termAtt.buffer(), 0, 
termAtt.length()));
+      key.set(bytes.array(), 1.0f);
+      boolean member = filter.membershipTest(key);
+      if ((keepMembers && member) || (!keepMembers && !member)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java
new file mode 100644
index 0000000..4585a0a
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/AnalyzerTransformer.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import java.io.IOException;
+import java.io.StringReader;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.analysis.TokenStream;
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
+import org.apache.mahout.common.lucene.TokenStreamIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AnalyzerTransformer implements RegexTransformer {
+
+  private Analyzer analyzer;
+  private String fieldName = "text";
+
+  private static final Logger log = 
LoggerFactory.getLogger(AnalyzerTransformer.class);
+
+  public AnalyzerTransformer() {
+    this(new StandardAnalyzer(), "text");
+  }
+
+  public AnalyzerTransformer(Analyzer analyzer) {
+    this(analyzer, "text");
+  }
+
+  public AnalyzerTransformer(Analyzer analyzer, String fieldName) {
+    this.analyzer = analyzer;
+    this.fieldName = fieldName;
+  }
+
+  @Override
+  public String transformMatch(String match) {
+    StringBuilder result = new StringBuilder();
+    try (TokenStream ts = analyzer.tokenStream(fieldName, new 
StringReader(match))) {
+      ts.addAttribute(CharTermAttribute.class);
+      ts.reset();
+      TokenStreamIterator iter = new TokenStreamIterator(ts);
+      while (iter.hasNext()) {
+        result.append(iter.next()).append(' ');
+      }
+      ts.end();
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+    return result.toString();
+  }
+
+  public Analyzer getAnalyzer() {
+    return analyzer;
+  }
+
+  public void setAnalyzer(Analyzer analyzer) {
+    this.analyzer = analyzer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java
new file mode 100644
index 0000000..d3e8e06
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/ChainTransformer.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * Chain together several {@link 
org.apache.mahout.utils.regex.RegexTransformer} and apply them to the match
+ * in succession
+ */
+public class ChainTransformer implements RegexTransformer {
+
+  private List<RegexTransformer> chain = Lists.newArrayList();
+
+  public ChainTransformer() {
+  }
+
+  public ChainTransformer(List<RegexTransformer> chain) {
+    this.chain = chain;
+  }
+
+  @Override
+  public String transformMatch(String match) {
+    String result = match;
+    for (RegexTransformer transformer : chain) {
+      result = transformer.transformMatch(result);
+    }
+    return result;
+  }
+
+  public List<RegexTransformer> getChain() {
+    return chain;
+  }
+
+  public void setChain(List<RegexTransformer> chain) {
+    this.chain = chain;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java
new file mode 100644
index 0000000..a0f296d
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/FPGFormatter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import java.util.regex.Pattern;
+
+/**
+ *  Collapses/converts all whitespace to a single tab
+ */
+public class FPGFormatter implements RegexFormatter {
+
+  private static final Pattern WHITESPACE = Pattern.compile("\\W+");
+
+  @Override
+  public String format(String toFormat) {
+    return '\t' + WHITESPACE.matcher(toFormat).replaceAll("|");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java
new file mode 100644
index 0000000..5c1177c
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityFormatter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+public class IdentityFormatter implements RegexFormatter {
+
+  @Override
+  public String format(String toFormat) {
+    return toFormat;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java
new file mode 100644
index 0000000..aea695d
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/IdentityTransformer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+/**
+ * No-op
+ */
+public final class IdentityTransformer implements RegexTransformer {
+
+  @Override
+  public String transformMatch(String match) {
+    return match;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java
new file mode 100644
index 0000000..53be239
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexConverterDriver.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.HadoopUtil;
+import org.apache.mahout.common.commandline.DefaultOptionCreator;
+
+/**
+ * Experimental
+ */
+public class RegexConverterDriver extends AbstractJob {
+
+  @Override
+  public int run(String[] args) throws Exception {
+    addInputOption();
+    addOutputOption();
+    addOption(DefaultOptionCreator.overwriteOption().create());
+    addOption("regex", "regex",
+            "The regular expression to use", true);
+    addOption("groupsToKeep", "g",
+            "The number of the capturing groups to keep", false);
+    addOption("transformerClass", "t",
+            "The optional class specifying the Regex Transformer", false);
+    addOption("formatterClass", "t",
+            "The optional class specifying the Regex Formatter", false);
+    addOption(DefaultOptionCreator.analyzerOption().create());
+
+    if (parseArguments(args) == null) {
+      return -1;
+    }
+
+    Configuration conf = getConf();
+    //TODO: How to deal with command line escaping?
+    conf.set(RegexMapper.REGEX, getOption("regex")); //
+    String gtk = getOption("groupsToKeep");
+    if (gtk != null) {
+      conf.set(RegexMapper.GROUP_MATCHERS, gtk);
+    }
+    String trans = getOption("transformerClass");
+    if (trans != null) {
+      if ("url".equalsIgnoreCase(trans)) {
+        trans = URLDecodeTransformer.class.getName();
+      }
+      conf.set(RegexMapper.TRANSFORMER_CLASS, trans);
+    }
+    String formatter = getOption("formatterClass");
+    if (formatter != null) {
+      if ("fpg".equalsIgnoreCase(formatter)) {
+        formatter = FPGFormatter.class.getName();
+      }
+      conf.set(RegexMapper.FORMATTER_CLASS, formatter);
+    }
+    Path input = getInputPath();
+    Path output = getOutputPath();
+    if (hasOption(DefaultOptionCreator.OVERWRITE_OPTION)) {
+      HadoopUtil.delete(getConf(), output);
+    }
+    Class<? extends Analyzer> analyzerClass = getAnalyzerClassFromOption();
+    if (analyzerClass != null) {
+      conf.set(RegexMapper.ANALYZER_NAME, analyzerClass.getName());
+    }
+    Job job = prepareJob(input, output,
+            TextInputFormat.class,
+            RegexMapper.class,
+            LongWritable.class,
+            Text.class,
+            TextOutputFormat.class);
+    boolean succeeded = job.waitForCompletion(true);
+    return succeeded ? 0 : -1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new RegexConverterDriver(), args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java
new file mode 100644
index 0000000..8ef837b
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexFormatter.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+public interface RegexFormatter {
+
+  String format(String toFormat);
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java
new file mode 100644
index 0000000..04cacaa
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexMapper.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.mahout.common.ClassUtils;
+
+public class RegexMapper extends Mapper<LongWritable, Text, LongWritable, 
Text> {
+
+  public static final String REGEX = "regex";
+  public static final String GROUP_MATCHERS = "regex.groups";
+  public static final String TRANSFORMER_CLASS = "transformer.class";
+  public static final String FORMATTER_CLASS = "formatter.class";
+
+  private Pattern regex;
+  private List<Integer> groupsToKeep;
+  private RegexTransformer transformer = RegexUtils.IDENTITY_TRANSFORMER;
+  private RegexFormatter formatter = RegexUtils.IDENTITY_FORMATTER;
+  public static final String ANALYZER_NAME = "analyzerName";
+
+
+  @Override
+  protected void setup(Context context) throws IOException, 
InterruptedException {
+    groupsToKeep = new ArrayList<>();
+    Configuration config = context.getConfiguration();
+    String regexStr = config.get(REGEX);
+    regex = Pattern.compile(regexStr);
+    String[] groups = config.getStrings(GROUP_MATCHERS);
+    if (groups != null) {
+      for (String group : groups) {
+        groupsToKeep.add(Integer.parseInt(group));
+      }
+    }
+
+    transformer = ClassUtils.instantiateAs(config.get(TRANSFORMER_CLASS, 
IdentityTransformer.class.getName()),
+        RegexTransformer.class);
+    String analyzerName = config.get(ANALYZER_NAME);
+    if (analyzerName != null && transformer instanceof AnalyzerTransformer) {
+      Analyzer analyzer = ClassUtils.instantiateAs(analyzerName, 
Analyzer.class);
+      ((AnalyzerTransformer)transformer).setAnalyzer(analyzer);
+    }
+
+    formatter = ClassUtils.instantiateAs(config.get(FORMATTER_CLASS, 
IdentityFormatter.class.getName()),
+        RegexFormatter.class);
+  }
+
+
+  @Override
+  protected void map(LongWritable key, Text text, Context context) throws 
IOException, InterruptedException {
+    String result = RegexUtils.extract(text.toString(), regex, groupsToKeep, " 
", transformer);
+    if (!result.isEmpty()) {
+      String format = formatter.format(result);
+      context.write(key, new Text(format));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java
new file mode 100644
index 0000000..adbc98f
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexTransformer.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+/**
+ * Transforms the match of a regular expression.
+ */
+public interface RegexTransformer {
+
+  String transformMatch(String match);
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java
new file mode 100644
index 0000000..5e32b99
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/RegexUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import java.util.Collection;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public final class RegexUtils {
+
+  public static final RegexTransformer IDENTITY_TRANSFORMER = new 
IdentityTransformer();
+  public static final RegexFormatter IDENTITY_FORMATTER = new 
IdentityFormatter();
+
+  private RegexUtils() {
+  }
+
+  public static String extract(CharSequence line, Pattern pattern, 
Collection<Integer> groupsToKeep,
+                               String separator, RegexTransformer transformer) 
{
+    StringBuilder bldr = new StringBuilder();
+    extract(line, bldr, pattern, groupsToKeep, separator, transformer);
+    return bldr.toString();
+  }
+
+  public static void extract(CharSequence line, StringBuilder outputBuffer,
+                             Pattern pattern, Collection<Integer> 
groupsToKeep, String separator,
+                             RegexTransformer transformer) {
+    if (transformer == null) {
+      transformer = IDENTITY_TRANSFORMER;
+    }
+    Matcher matcher = pattern.matcher(line);
+    String match;
+    if (groupsToKeep.isEmpty()) {
+      while (matcher.find()) {
+        match = matcher.group();
+        if (match != null) {
+          
outputBuffer.append(transformer.transformMatch(match)).append(separator);
+        }
+      }
+    } else {
+      while (matcher.find()) {
+        for (Integer groupNum : groupsToKeep) {
+          match = matcher.group(groupNum);
+          if (match != null) {
+            
outputBuffer.append(transformer.transformMatch(match)).append(separator);
+          }
+        }
+      }
+    }
+    //trim off the last separator, which is always there
+    if (outputBuffer.length() > 0) {
+      outputBuffer.setLength(outputBuffer.length() - separator.length());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java
new file mode 100644
index 0000000..3eb7fc0
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/regex/URLDecodeTransformer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.regex;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+
+public final class URLDecodeTransformer implements RegexTransformer {
+
+  private final String enc;
+
+  public URLDecodeTransformer() {
+    enc = "UTF-8";
+  }
+
+  public URLDecodeTransformer(String encoding) {
+    this.enc = encoding;
+  }
+
+  @Override
+  public String transformMatch(String match) {
+    try {
+      return URLDecoder.decode(match, enc);
+    } catch (UnsupportedEncodingException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
new file mode 100644
index 0000000..13d61b8
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/RowIdJob.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Converts a vector representation of documents into a {@code document x 
terms} matrix.
+ * The input data is in {@code SequenceFile<Text,VectorWritable>} format (as 
generated by
+ * {@link org.apache.mahout.vectorizer.SparseVectorsFromSequenceFiles 
SparseVectorsFromSequenceFiles}
+ * or by {@link org.apache.mahout.vectorizer.EncodedVectorsFromSequenceFiles 
EncodedVectorsFromSequenceFiles})
+ * and generates the following two files as output:
+ * <ul><li>A file called "matrix" of format {@code 
SequenceFile<IntWritable,VectorWritable>}.</li>
+ * <li>A file called "docIndex" of format {@code 
SequenceFile<IntWritable,Text>}.</li></ul>
+ * The input file can be regenerated by joining the two output files on the 
generated int key.
+ * In other words, {@code RowIdJob} replaces the document text ids by integers.
+ * The original document text ids can still be retrieved from the "docIndex".
+ */
+public class RowIdJob extends AbstractJob {
+  private static final Logger log = LoggerFactory.getLogger(RowIdJob.class);
+
+  @Override
+  public int run(String[] args) throws Exception {
+
+    addInputOption();
+    addOutputOption();
+
+    Map<String, List<String>> parsedArgs = parseArguments(args);
+    if (parsedArgs == null) {
+      return -1;
+    }
+
+    Configuration conf = getConf();
+    FileSystem fs = FileSystem.get(conf);
+
+    Path outputPath = getOutputPath();
+    Path indexPath = new Path(outputPath, "docIndex");
+    Path matrixPath = new Path(outputPath, "matrix");
+
+    try (SequenceFile.Writer indexWriter = SequenceFile.createWriter(fs, conf, 
indexPath,
+        IntWritable.class, Text.class);
+         SequenceFile.Writer matrixWriter = SequenceFile.createWriter(fs, 
conf, matrixPath, IntWritable.class,
+             VectorWritable.class)) {
+      IntWritable docId = new IntWritable();
+      int i = 0;
+      int numCols = 0;
+      for (Pair<Text, VectorWritable> record
+          : new SequenceFileDirIterable<Text, VectorWritable>(getInputPath(), 
PathType.LIST, PathFilters.logsCRCFilter(),
+          null, true, conf)) {
+        VectorWritable value = record.getSecond();
+        docId.set(i);
+        indexWriter.append(docId, record.getFirst());
+        matrixWriter.append(docId, value);
+        i++;
+        numCols = value.get().size();
+      }
+
+      log.info("Wrote out matrix with {} rows and {} columns to {}", i, 
numCols, matrixPath);
+      return 0;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new RowIdJob(), args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java
new file mode 100644
index 0000000..d74803f
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermEntry.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors;
+
+/**
+ * Each entry in a {@link TermInfo} dictionary. Contains information about a 
term.
+ */
+public class TermEntry {
+
+  private final String term;
+  private final int termIdx;
+  private final int docFreq;
+  
+  public TermEntry(String term, int termIdx, int docFreq) {
+    this.term = term;
+    this.termIdx = termIdx;
+    this.docFreq = docFreq;
+  }
+
+  public String getTerm() {
+    return term;
+  }
+
+  public int getTermIdx() {
+    return termIdx;
+  }
+
+  public int getDocFreq() {
+    return docFreq;
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java
new file mode 100644
index 0000000..4fb36a3
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/TermInfo.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors;
+
+import java.util.Iterator;
+
+/**
+ * Contains the term dictionary information associated with a vectorized 
collection of text documents
+ *
+ */
+public interface TermInfo {
+  
+  int totalTerms(String field);
+  
+  TermEntry getTermEntry(String field, String term);
+  
+  Iterator<TermEntry> getAllEntries();
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
new file mode 100644
index 0000000..e1c3fbc
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorDumper.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors;
+
+import com.google.common.collect.Sets;
+import com.google.common.io.Closeables;
+import com.google.common.io.Files;
+import org.apache.commons.io.Charsets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.mahout.clustering.classify.WeightedPropertyVectorWritable;
+import org.apache.mahout.common.AbstractJob;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.sequencefile.PathFilters;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileIterable;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.VectorWritable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * Can read in a {@link org.apache.hadoop.io.SequenceFile} of {@link Vector}s 
and dump
+ * out the results using {@link Vector#asFormatString()} to either the console 
or to a
+ * file.
+ */
+public final class VectorDumper extends AbstractJob {
+
+  private static final Logger log = 
LoggerFactory.getLogger(VectorDumper.class);
+
+  private VectorDumper() {
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    /**
+     Option seqOpt = 
obuilder.withLongName("seqFile").withRequired(false).withArgument(
+     
abuilder.withName("seqFile").withMinimum(1).withMaximum(1).create()).withDescription(
+     "The Sequence File containing the Vectors").withShortName("s").create();
+     Option dirOpt = 
obuilder.withLongName("seqDirectory").withRequired(false).withArgument(
+     abuilder.withName("seqDirectory").withMinimum(1).withMaximum(1).create())
+     .withDescription("The directory containing Sequence File of Vectors")
+     .withShortName("d").create();
+     */
+    addInputOption();
+    addOutputOption();
+    addOption("useKey", "u", "If the Key is a vector than dump that instead");
+    addOption("printKey", "p", "Print out the key as well, delimited by tab 
(or the value if useKey is true");
+    addOption("dictionary", "d", "The dictionary file.", false);
+    addOption("dictionaryType", "dt", "The dictionary file type 
(text|seqfile)", false);
+    addOption("csv", "c", "Output the Vector as CSV.  Otherwise it substitutes 
in the terms for vector cell entries");
+    addOption("namesAsComments", "n", "If using CSV output, optionally add a 
comment line for each NamedVector "
+        + "(if the vector is one) printing out the name");
+    addOption("nameOnly", "N", "Use the name as the value for each NamedVector 
(skip other vectors)");
+    addOption("sortVectors", "sort", "Sort output key/value pairs of the 
vector entries in abs magnitude "
+        + "descending order");
+    addOption("quiet", "q", "Print only file contents");
+    addOption("sizeOnly", "sz", "Dump only the size of the vector");
+    addOption("numItems", "ni", "Output at most <n> vecors", false);
+    addOption("vectorSize", "vs", "Truncate vectors to <vs> length when 
dumping (most useful when in"
+        + " conjunction with -sort", false);
+    addOption(buildOption("filter", "fi", "Only dump out those vectors whose 
name matches the filter."
+        + "  Multiple items may be specified by repeating the argument.", 
true, 1, Integer.MAX_VALUE, false, null));
+
+    if (parseArguments(args, false, true) == null) {
+      return -1;
+    }
+
+    Path[] pathArr;
+    Configuration conf = new Configuration();
+    FileSystem fs = FileSystem.get(conf);
+    Path input = getInputPath();
+    FileStatus fileStatus = fs.getFileStatus(input);
+    if (fileStatus.isDir()) {
+      pathArr = FileUtil.stat2Paths(fs.listStatus(input, 
PathFilters.logsCRCFilter()));
+    } else {
+      FileStatus[] inputPaths = fs.globStatus(input);
+      pathArr = new Path[inputPaths.length];
+      int i = 0;
+      for (FileStatus fstatus : inputPaths) {
+        pathArr[i++] = fstatus.getPath();
+      }
+    }
+
+
+    String dictionaryType = getOption("dictionaryType", "text");
+
+    boolean sortVectors = hasOption("sortVectors");
+    boolean quiet = hasOption("quiet");
+    if (!quiet) {
+      log.info("Sort? {}", sortVectors);
+    }
+
+    String[] dictionary = null;
+    if (hasOption("dictionary")) {
+      String dictFile = getOption("dictionary");
+      switch (dictionaryType) {
+        case "text":
+          dictionary = VectorHelper.loadTermDictionary(new File(dictFile));
+          break;
+        case "sequencefile":
+          dictionary = VectorHelper.loadTermDictionary(conf, dictFile);
+          break;
+        default:
+          //TODO: support Lucene's FST as a dictionary type
+          throw new IOException("Invalid dictionary type: " + dictionaryType);
+      }
+    }
+
+    Set<String> filters;
+    if (hasOption("filter")) {
+      filters = Sets.newHashSet(getOptions("filter"));
+    } else {
+      filters = null;
+    }
+
+    boolean useCSV = hasOption("csv");
+
+    boolean sizeOnly = hasOption("sizeOnly");
+    boolean nameOnly = hasOption("nameOnly");
+    boolean namesAsComments = hasOption("namesAsComments");
+    boolean transposeKeyValue = hasOption("vectorAsKey");
+    Writer writer;
+    boolean shouldClose;
+    File output = getOutputFile();
+    if (output != null) {
+      shouldClose = true;
+      log.info("Output file: {}", output);
+      Files.createParentDirs(output);
+      writer = Files.newWriter(output, Charsets.UTF_8);
+    } else {
+      shouldClose = false;
+      writer = new OutputStreamWriter(System.out, Charsets.UTF_8);
+    }
+    try {
+      boolean printKey = hasOption("printKey");
+      if (useCSV && dictionary != null) {
+        writer.write("#");
+        for (int j = 0; j < dictionary.length; j++) {
+          writer.write(dictionary[j]);
+          if (j < dictionary.length - 1) {
+            writer.write(',');
+          }
+        }
+        writer.write('\n');
+      }
+      Long numItems = null;
+      if (hasOption("numItems")) {
+        numItems = Long.parseLong(getOption("numItems"));
+        if (quiet) {
+          writer.append("#Max Items to dump: 
").append(String.valueOf(numItems)).append('\n');
+        }
+      }
+      int maxIndexesPerVector = hasOption("vectorSize")
+          ? Integer.parseInt(getOption("vectorSize"))
+          : Integer.MAX_VALUE;
+      long itemCount = 0;
+      int fileCount = 0;
+      for (Path path : pathArr) {
+        if (numItems != null && numItems <= itemCount) {
+          break;
+        }
+        if (quiet) {
+          log.info("Processing file '{}' ({}/{})", path, ++fileCount, 
pathArr.length);
+        }
+        SequenceFileIterable<Writable, Writable> iterable = new 
SequenceFileIterable<>(path, true, conf);
+        Iterator<Pair<Writable, Writable>> iterator = iterable.iterator();
+        long i = 0;
+        while (iterator.hasNext() && (numItems == null || itemCount < 
numItems)) {
+          Pair<Writable, Writable> record = iterator.next();
+          Writable keyWritable = record.getFirst();
+          Writable valueWritable = record.getSecond();
+          if (printKey) {
+            Writable notTheVectorWritable = transposeKeyValue ? valueWritable 
: keyWritable;
+            writer.write(notTheVectorWritable.toString());
+            writer.write('\t');
+          }
+          Vector vector;
+          try {
+            vector = ((VectorWritable)
+                (transposeKeyValue ? keyWritable : valueWritable)).get();
+          } catch (ClassCastException e) {
+            if ((transposeKeyValue ? keyWritable : valueWritable)
+                instanceof WeightedPropertyVectorWritable) {
+              vector =
+                  ((WeightedPropertyVectorWritable)
+                      (transposeKeyValue ? keyWritable : 
valueWritable)).getVector();
+            } else {
+              throw e;
+            }
+          }
+          if (filters == null
+              || !(vector instanceof NamedVector)
+              || filters.contains(((NamedVector) vector).getName())) {
+            if (sizeOnly) {
+              if (vector instanceof NamedVector) {
+                writer.write(((NamedVector) vector).getName());
+                writer.write(":");
+              } else {
+                writer.write(String.valueOf(i++));
+                writer.write(":");
+              }
+              writer.write(String.valueOf(vector.size()));
+              writer.write('\n');
+            } else if (nameOnly) {
+              if (vector instanceof NamedVector) {
+                writer.write(((NamedVector) vector).getName());
+                writer.write('\n');
+              }
+            } else {
+              String fmtStr;
+              if (useCSV) {
+                fmtStr = VectorHelper.vectorToCSVString(vector, 
namesAsComments);
+              } else {
+                fmtStr = VectorHelper.vectorToJson(vector, dictionary, 
maxIndexesPerVector,
+                    sortVectors);
+              }
+              writer.write(fmtStr);
+              writer.write('\n');
+            }
+            itemCount++;
+          }
+        }
+      }
+      writer.flush();
+    } finally {
+      if (shouldClose) {
+        Closeables.close(writer, false);
+      }
+    }
+
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    ToolRunner.run(new Configuration(), new VectorDumper(), args);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java
new file mode 100644
index 0000000..66c3fb6
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/VectorHelper.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.lucene.util.PriorityQueue;
+import org.apache.mahout.common.Pair;
+import org.apache.mahout.common.iterator.FileLineIterator;
+import org.apache.mahout.common.iterator.sequencefile.PathType;
+import org.apache.mahout.common.iterator.sequencefile.SequenceFileDirIterable;
+import org.apache.mahout.math.NamedVector;
+import org.apache.mahout.math.Vector;
+import org.apache.mahout.math.Vector.Element;
+import org.apache.mahout.math.map.OpenObjectIntHashMap;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+/** Static utility methods related to vectors. */
+public final class VectorHelper {
+
+  private static final Pattern TAB_PATTERN = Pattern.compile("\t");
+
+
+  private VectorHelper() {
+  }
+
+  public static String vectorToCSVString(Vector vector, boolean 
namesAsComments) throws IOException {
+    Appendable bldr = new StringBuilder(2048);
+    vectorToCSVString(vector, namesAsComments, bldr);
+    return bldr.toString();
+  }
+
+  public static String buildJson(Iterable<Pair<String, Double>> iterable) {
+    return buildJson(iterable, new StringBuilder(2048));
+  }
+
+  public static String buildJson(Iterable<Pair<String, Double>> iterable, 
StringBuilder bldr) {
+    bldr.append('{');
+    for (Pair<String, Double> p : iterable) {
+      bldr.append(p.getFirst());
+      bldr.append(':');
+      bldr.append(p.getSecond());
+      bldr.append(',');
+    }
+    if (bldr.length() > 1) {
+      bldr.setCharAt(bldr.length() - 1, '}');
+    }
+    return bldr.toString();
+  }
+
+  public static List<Pair<Integer, Double>> topEntries(Vector vector, int 
maxEntries) {
+
+    // Get the size of nonZero elements in the input vector
+    int sizeOfNonZeroElementsInVector = vector.getNumNonZeroElements();
+
+    // If the sizeOfNonZeroElementsInVector < maxEntries then set maxEntries = 
sizeOfNonZeroElementsInVector
+    // otherwise the call to queue.pop() returns a Pair(null, null) and the 
subsequent call
+    // to pair.getFirst() throws a NullPointerException
+    if (sizeOfNonZeroElementsInVector < maxEntries) {
+      maxEntries = sizeOfNonZeroElementsInVector;
+    }
+
+    PriorityQueue<Pair<Integer, Double>> queue = new TDoublePQ<>(-1, 
maxEntries);
+    for (Element e : vector.nonZeroes()) {
+      queue.insertWithOverflow(Pair.of(e.index(), e.get()));
+    }
+    List<Pair<Integer, Double>> entries = new ArrayList<>();
+    Pair<Integer, Double> pair;
+    while ((pair = queue.pop()) != null) {
+      if (pair.getFirst() > -1) {
+        entries.add(pair);
+      }
+    }
+    Collections.sort(entries, new Comparator<Pair<Integer, Double>>() {
+      @Override
+      public int compare(Pair<Integer, Double> a, Pair<Integer, Double> b) {
+        return b.getSecond().compareTo(a.getSecond());
+      }
+    });
+    return entries;
+  }
+
+  public static List<Pair<Integer, Double>> firstEntries(Vector vector, int 
maxEntries) {
+    List<Pair<Integer, Double>> entries = new ArrayList<>();
+    Iterator<Vector.Element> it = vector.nonZeroes().iterator();
+    int i = 0;
+    while (it.hasNext() && i++ < maxEntries) {
+      Vector.Element e = it.next();
+      entries.add(Pair.of(e.index(), e.get()));
+    }
+    return entries;
+  }
+
+  public static List<Pair<String, Double>> 
toWeightedTerms(Collection<Pair<Integer, Double>> entries,
+                                                           final String[] 
dictionary) {
+    if (dictionary != null) {
+      return new ArrayList<>(Collections2.transform(entries,
+        new Function<Pair<Integer, Double>, Pair<String, Double>>() {
+          @Override
+          public Pair<String, Double> apply(Pair<Integer, Double> p) {
+            return Pair.of(dictionary[p.getFirst()], p.getSecond());
+          }
+        }));
+    } else {
+      return new ArrayList<>(Collections2.transform(entries,
+        new Function<Pair<Integer, Double>, Pair<String, Double>>() {
+          @Override
+          public Pair<String, Double> apply(Pair<Integer, Double> p) {
+            return Pair.of(Integer.toString(p.getFirst()), p.getSecond());
+          }
+        }));
+    }
+  }
+
+  public static String vectorToJson(Vector vector, String[] dictionary, int 
maxEntries, boolean sort) {
+    return buildJson(toWeightedTerms(sort
+            ? topEntries(vector, maxEntries)
+            : firstEntries(vector, maxEntries), dictionary));
+  }
+
+  public static void vectorToCSVString(Vector vector,
+                                       boolean namesAsComments,
+                                       Appendable bldr) throws IOException {
+    if (namesAsComments && vector instanceof NamedVector) {
+      bldr.append('#').append(((NamedVector) vector).getName()).append('\n');
+    }
+    Iterator<Vector.Element> iter = vector.all().iterator();
+    boolean first = true;
+    while (iter.hasNext()) {
+      if (first) {
+        first = false;
+      } else {
+        bldr.append(',');
+      }
+      Vector.Element elt = iter.next();
+      bldr.append(String.valueOf(elt.get()));
+    }
+    bldr.append('\n');
+  }
+
+  /**
+   * Read in a dictionary file. Format is:
+   * <p/>
+   * <pre>
+   * term DocFreq Index
+   * </pre>
+   */
+  public static String[] loadTermDictionary(File dictFile) throws IOException {
+    try (InputStream in = new FileInputStream(dictFile)) {
+      return loadTermDictionary(in);
+    }
+  }
+
+  /**
+   * Read a dictionary in {@link org.apache.hadoop.io.SequenceFile} generated 
by
+   * {@link org.apache.mahout.vectorizer.DictionaryVectorizer}
+   *
+   * @param filePattern <PATH TO DICTIONARY>/dictionary.file-*
+   */
+  public static String[] loadTermDictionary(Configuration conf, String 
filePattern) {
+    OpenObjectIntHashMap<String> dict = new OpenObjectIntHashMap<>();
+    int maxIndexValue = 0;
+    for (Pair<Text, IntWritable> record
+        : new SequenceFileDirIterable<Text, IntWritable>(new 
Path(filePattern), PathType.GLOB, null, null, true,
+                                                         conf)) {
+      dict.put(record.getFirst().toString(), record.getSecond().get());
+      if (record.getSecond().get() > maxIndexValue) {
+        maxIndexValue = record.getSecond().get();
+      }
+    }
+    // Set dictionary size to greater of (maxIndexValue + 1, dict.size())
+    int maxDictionarySize = maxIndexValue + 1 > dict.size() ? maxIndexValue + 
1 : dict.size();
+    String[] dictionary = new String[maxDictionarySize];
+    for (String feature : dict.keys()) {
+      dictionary[dict.get(feature)] = feature;
+    }
+    return dictionary;
+  }
+
+  /**
+   * Read in a dictionary file. Format is: First line is the number of entries
+   * <p/>
+   * <pre>
+   * term DocFreq Index
+   * </pre>
+   */
+  private static String[] loadTermDictionary(InputStream is) throws 
IOException {
+    FileLineIterator it = new FileLineIterator(is);
+
+    int numEntries = Integer.parseInt(it.next());
+    String[] result = new String[numEntries];
+
+    while (it.hasNext()) {
+      String line = it.next();
+      if (line.startsWith("#")) {
+        continue;
+      }
+      String[] tokens = TAB_PATTERN.split(line);
+      if (tokens.length < 3) {
+        continue;
+      }
+      int index = Integer.parseInt(tokens[2]); // tokens[1] is the doc freq
+      result[index] = tokens[0];
+    }
+    return result;
+  }
+
+  private static final class TDoublePQ<T> extends PriorityQueue<Pair<T, 
Double>> {
+    private final T sentinel;
+
+    private TDoublePQ(T sentinel, int size) {
+      super(size);
+      this.sentinel = sentinel;
+    }
+
+    @Override
+    protected boolean lessThan(Pair<T, Double> a, Pair<T, Double> b) {
+      return a.getSecond().compareTo(b.getSecond()) < 0;
+    }
+
+    @Override
+    protected Pair<T, Double> getSentinelObject() {
+      return Pair.of(sentinel, Double.NEGATIVE_INFINITY);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java
new file mode 100644
index 0000000..f2632a4
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFIterator.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.arff;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.io.Closeables;
+import org.apache.mahout.math.DenseVector;
+import org.apache.mahout.math.RandomAccessSparseVector;
+import org.apache.mahout.math.Vector;
+
+final class ARFFIterator extends AbstractIterator<Vector> {
+
+  // This pattern will make sure a , inside a string is not a point for split.
+  // Ex: "Arizona" , "0:08 PM, PDT" , 110 will be split considering "0:08 PM, 
PDT" as one string
+  private static final Pattern WORDS_WITHOUT_SPARSE = 
Pattern.compile("([\\w[^{]])*");
+  private static final Pattern DATA_PATTERN = 
Pattern.compile("^\\"+ARFFModel.ARFF_SPARSE+"(.*)\\"+ARFFModel.ARFF_SPARSE_END+"$");
+
+  private final BufferedReader reader;
+  private final ARFFModel model;
+
+  ARFFIterator(BufferedReader reader, ARFFModel model) {
+    this.reader = reader;
+    this.model = model;
+  }
+
+  @Override
+  protected Vector computeNext() {
+    String line;
+    try {
+      while ((line = reader.readLine()) != null) {
+        line = line.trim();
+        if (!line.isEmpty() && !line.startsWith(ARFFModel.ARFF_COMMENT)) {
+          break;
+        }
+      }
+    } catch (IOException ioe) {
+      throw new IllegalStateException(ioe);
+    }
+    if (line == null) {
+      try {
+        Closeables.close(reader, true);
+      } catch (IOException e) {
+        throw new IllegalStateException(e);
+      }
+      return endOfData();
+    }
+    Vector result;
+    Matcher contents = DATA_PATTERN.matcher(line);
+    if (contents.find()) {
+      line = contents.group(1);
+      String[] splits = splitCSV(line);
+      result = new RandomAccessSparseVector(model.getLabelSize());
+      for (String split : splits) {
+        int idIndex = split.indexOf(' ');
+        int idx = Integer.parseInt(split.substring(0, idIndex).trim());
+        String data = split.substring(idIndex).trim();
+        if (!"?".equals(data)) {
+          result.setQuick(idx, model.getValue(data, idx));
+        }
+      }
+    } else {
+      result = new DenseVector(model.getLabelSize());
+      String[] splits = splitCSV(line);
+      for (int i = 0; i < splits.length; i++) {
+        String split = splits[i];
+        split = split.trim();
+        if (WORDS_WITHOUT_SPARSE.matcher(split).matches() && 
!"?".equals(split)) {
+          result.setQuick(i, model.getValue(split, i));
+        }
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Splits a string by comma, ignores commas inside quotes and escaped quotes.
+   * As quotes are both double and single possible, because there is no exact 
definition
+   * for ARFF files
+   * @param line -
+   * @return String[]
+   */
+  public static String[] splitCSV(String line) {
+    StringBuilder sb = new StringBuilder(128);
+    List<String> tokens = new ArrayList<>();
+    char escapeChar = '\0';
+    for (int i = 0; i < line.length(); i++) {
+      char c = line.charAt(i);
+      if (c == '\\') {
+        i++;
+        sb.append(line.charAt(i));
+      }
+      else if (c == '"' || c == '\'') {
+        // token is closed
+        if (c == escapeChar) {
+          escapeChar = '\0';
+        }
+        else if (escapeChar == '\0') {
+          escapeChar = c;
+        }
+        sb.append(c);
+      }
+      else if (c == ',') {
+        if (escapeChar == '\0') {
+          tokens.add(sb.toString().trim());
+          sb.setLength(0); // start work on next token
+        }
+        else {
+          sb.append(c);
+        }
+      }
+      else {
+        sb.append(c);
+      }
+    }
+    if (sb.length() > 0) {
+      tokens.add(sb.toString().trim());
+    }
+
+    return tokens.toArray(new String[tokens.size()]);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
new file mode 100644
index 0000000..fc86997
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFModel.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.arff;
+
+import java.text.DateFormat;
+import java.util.Map;
+
+/**
+ * An interface for representing an ARFFModel.  Implementations can decide on 
the best approach
+ * for storing the model, as some approaches will be fine for smaller files, 
while larger
+ * ones may require a better implementation.
+ */
+public interface ARFFModel {
+  String ARFF_SPARSE = "{"; //indicates the vector is sparse
+  String ARFF_SPARSE_END = "}";
+  String ARFF_COMMENT = "%";
+  String ATTRIBUTE = "@attribute";
+  String DATA = "@data";
+  String RELATION = "@relation";
+  
+  
+  String getRelation();
+  
+  void setRelation(String relation);
+  
+  /**
+   * The vector attributes (labels in Mahout speak)
+   * @return the map
+   */
+  Map<String, Integer> getLabelBindings();
+  
+  Integer getNominalValue(String label, String nominal);
+  
+  void addNominal(String label, String nominal, int idx);
+  
+  DateFormat getDateFormat(Integer idx);
+  
+  void addDateFormat(Integer idx, DateFormat format);
+  
+  Integer getLabelIndex(String label);
+  
+  void addLabel(String label, Integer idx);
+  
+  ARFFType getARFFType(Integer idx);
+  
+  void addType(Integer idx, ARFFType type);
+  
+  /**
+   * The count of the number of words seen
+   * @return the count
+   */
+  long getWordCount();
+  
+  double getValue(String data, int idx);
+  
+  Map<String, Map<String, Integer>> getNominalMap();
+  
+  int getLabelSize();
+  
+  Map<String, Long> getWords();
+}

http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java
----------------------------------------------------------------------
diff --git 
a/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java
 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java
new file mode 100644
index 0000000..9ba7c31
--- /dev/null
+++ 
b/community/mahout-mr/integration/src/main/java/org/apache/mahout/utils/vectors/arff/ARFFType.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.utils.vectors.arff;
+
+public enum ARFFType {
+
+  NUMERIC("numeric"),
+  INTEGER("integer"),
+  REAL("real"),
+  NOMINAL("{"),
+  DATE("date"),
+  STRING("string");
+  
+  private final String indicator;
+  
+  ARFFType(String indicator) {
+    this.indicator = indicator;
+  }
+  
+  public String getIndicator() {
+    return indicator;
+  }
+  
+  public String getLabel(String line) {
+    int idx = line.lastIndexOf(indicator);
+    return removeQuotes(line.substring(ARFFModel.ATTRIBUTE.length(), idx));
+  }
+
+  /**
+   * Remove quotes and leading/trailing whitespace from a single or double 
quoted string
+   * @param str quotes from
+   * @return  A string without quotes
+   */
+  public static String removeQuotes(String str) {
+    String cleaned = str;
+    if (cleaned != null) {
+      cleaned = cleaned.trim();
+      boolean isQuoted = cleaned.length() > 1
+          && (cleaned.startsWith("\"") &&  cleaned.endsWith("\"")
+          || cleaned.startsWith("'") &&  cleaned.endsWith("'"));
+      if (isQuoted) {
+        cleaned = cleaned.substring(1, cleaned.length() - 1);
+      }
+    }
+    return cleaned;
+  }
+}

Reply via email to